From ee762a96de353a735ddb20ce21370076e71b5bcf Mon Sep 17 00:00:00 2001 From: David Ludwig Date: Tue, 20 Apr 2021 14:18:06 -0500 Subject: [PATCH] Grouped IPC services and fixed logging issues/improved error reporting --- .../{SeekerIpc.ts => Ipc/IpcClient.ts} | 73 ++++---- src/server/services/Ipc/SeekerIpcClient.ts | 23 +++ src/server/services/Ipc/TorrentIpcClient.ts | 63 +++++++ src/server/services/TorrentClientIpc.ts | 164 ------------------ src/server/services/WebServer/routes/api.ts | 2 +- src/server/services/index.ts | 8 +- 6 files changed, 133 insertions(+), 200 deletions(-) rename src/server/services/{SeekerIpc.ts => Ipc/IpcClient.ts} (64%) create mode 100644 src/server/services/Ipc/SeekerIpcClient.ts create mode 100644 src/server/services/Ipc/TorrentIpcClient.ts delete mode 100644 src/server/services/TorrentClientIpc.ts diff --git a/src/server/services/SeekerIpc.ts b/src/server/services/Ipc/IpcClient.ts similarity index 64% rename from src/server/services/SeekerIpc.ts rename to src/server/services/Ipc/IpcClient.ts index 0a03451..b4738c4 100644 --- a/src/server/services/SeekerIpc.ts +++ b/src/server/services/Ipc/IpcClient.ts @@ -1,21 +1,35 @@ -import ipc from "node-ipc"; import { Socket } from "net"; -import Service from "./Service"; -import Application from "../Application"; +import Service from "../Service"; +import Application from "../../Application"; import RawIPC = require("node-ipc"); -interface IResponse { +export interface IIpcResponse { response?: any, error?: string | Error } -export default class SeekerIpc extends Service +/** + * IPC connection error type + */ +export class IpcConnectionError extends Error { + constructor(...args: any[]) { + super(...args); + Object.setPrototypeOf(this, IpcConnectionError.prototype); + } +} + +export default class IpcClient extends Service { /** * Indicate if there is an active connection to the IPC */ private __isConnected: boolean; + /** + * The IPC ID to connect to + */ + private __targetIpc: string; + /** * HOLY @#$@% WHOEVER MADE THE TYPES FOR node-ipc SHOULDB BE HANGED */ @@ -29,13 +43,13 @@ export default class SeekerIpc extends Service /** * Create a new IPC client for the Seeker */ - constructor(app: Application) { - super("Seeker IPC", app); + constructor(name: string, app: Application, ipcId: string, targetIpc: string) { + super(name, app); this.ipc = new RawIPC.IPC(); - this.ipc.config.id = "request"; + this.ipc.config.id = ipcId; this.ipc.config.retry = 1500; this.ipc.config.silent = true; - + this.__targetIpc = targetIpc; this.__isConnected = false; } @@ -44,8 +58,8 @@ export default class SeekerIpc extends Service */ public boot() { return new Promise((resolve, reject) => { - this.ipc.connectTo("seeker", process.env["SEEKER_IPC_SOCKET"], () => { - this.socket = this.ipc.of["seeker"]; + this.ipc.connectTo(this.__targetIpc, process.env["SEEKER_IPC_SOCKET"], () => { + this.socket = this.ipc.of[this.__targetIpc]; this.installSocketEventHandlers(this.socket); this.installSocketMessageHandlers(this.socket); resolve(); @@ -57,7 +71,7 @@ export default class SeekerIpc extends Service * Shutdown the seeker IPC service */ public async shutdown() { - this.ipc.disconnect("seeker"); + this.ipc.disconnect(this.__targetIpc); } /** @@ -70,38 +84,44 @@ export default class SeekerIpc extends Service socket.on("destroy", () => this.onDestroy()); } - protected installSocketMessageHandlers(socket: Socket) { - } + /** + * Install the event handlers for receiving on the IPC socket + */ + protected installSocketMessageHandlers(socket: Socket) {} // Socket Event Handlers ----------------------------------------------------------------------- protected onConnect() { - this.log("Seeker IPC: Connection established"); + this.log("IPC: Connection established"); this.__isConnected = true; } protected onError(error: string | Error) { - this.log("Seeker IPC: Error occurred:", error); + if (this.__isConnected) { + this.log("IPC: Error occurred:", error); + } } protected onDisconnect() { - this.log("Seeker IPC: Disconnected"); + if (this.__isConnected) { + this.log("IPC: Disconnected"); + } this.__isConnected = false; } protected onDestroy() { - this.log("Seeker IPC: Destroyed"); + this.log("IPC: Destroyed"); } // Methods ------------------------------------------------------------------------------------- /** - * Perform a general request to the Seeker + * Perform a general request */ protected async request(method: string, message?: any) { - return new Promise((resolve, reject) => { + return new Promise((resolve, reject) => { if (!this.isConnected) { - throw new Error("Not connected to Seeker"); + reject(new IpcConnectionError("Not connected to Seeker")); } let respond = (response: any) => { clearTimeout(timeout); @@ -110,22 +130,13 @@ export default class SeekerIpc extends Service // Include timeout mechanism in the off chance something breaks let timeout = setTimeout(() => { this.socket.off(method, respond); - reject("Seeker IPC request timeout") + reject(new IpcConnectionError("IPC request timeout")); }, 1000); this.socket.once(method, respond); this.socket.emit(method, message); }); } - /** - * Notify Seeker that a movie was added - */ - public notifyMovieRequested(ticketId: number) { - this.request("search_movie", ticketId).catch((e) => { - this.log("No response from seeker notifying added movie"); - }); - } - // Accessors ----------------------------------------------------------------------------------- get isConnected() { diff --git a/src/server/services/Ipc/SeekerIpcClient.ts b/src/server/services/Ipc/SeekerIpcClient.ts new file mode 100644 index 0000000..f37f142 --- /dev/null +++ b/src/server/services/Ipc/SeekerIpcClient.ts @@ -0,0 +1,23 @@ +import IpcClient from "./IpcClient"; +import Application from "../../Application"; + +export default class SeekerIpcClient extends IpcClient +{ + /** + * Create a new IPC client for the Seeker + */ + constructor(app: Application) { + super("Seeker IPC", app, "request", "seeker"); + } + + // Methods ------------------------------------------------------------------------------------- + + /** + * Notify Seeker that a movie was added + */ + public notifyMovieRequested(ticketId: number) { + this.request("search_movie", ticketId).catch((e) => { + this.log("No response from seeker notifying added movie"); + }); + } +} diff --git a/src/server/services/Ipc/TorrentIpcClient.ts b/src/server/services/Ipc/TorrentIpcClient.ts new file mode 100644 index 0000000..af94936 --- /dev/null +++ b/src/server/services/Ipc/TorrentIpcClient.ts @@ -0,0 +1,63 @@ +import { ISerializedTorrent, ITorrent } from "../../common"; +import Application from "../../Application"; +import IpcClient from "./IpcClient"; + +export default class TorrentIpcClient extends IpcClient +{ + /** + * Create a new IPC client for the torrent client + */ + constructor(app: Application) { + super("Torrent IPC", app, "request", "torrent_client"); + } + + // Methods ------------------------------------------------------------------------------------- + + /** + * Add a torrent to the client + * @param torrent Magnet URI or file buffer + */ + public async add(torrent: string | Buffer) { + let response = await this.request("add", torrent); + if (response.error) { + throw new Error("Failed to add torrent"); + } + return response.response; + } + + /** + * Remove a torrent from the client + * @param torrent Torrent info hash + */ + public async remove(torrent: string) { + let response = await this.request("remove", torrent); + if (response.error) { + throw new Error("Failed to remove torrent"); + } + } + + /** + * Get a list of all torrents in the client + */ + public async list() { + let response = await this.request("list"); + if (response.error) { + console.error(response.error); + throw new Error("Failed to obtain torrent list"); + } + return response.response; + } + + /** + * Get full details of each of the provided torrents + * @param torrentIds Array of torrent info hashes + */ + public async details(...torrentIds: string[]) { + let response = await this.request("details", torrentIds); + if (response.error) { + console.error(response.error); + throw new Error("Failed to retrieve torrent details"); + } + return response.response; + } +} diff --git a/src/server/services/TorrentClientIpc.ts b/src/server/services/TorrentClientIpc.ts deleted file mode 100644 index 38d050b..0000000 --- a/src/server/services/TorrentClientIpc.ts +++ /dev/null @@ -1,164 +0,0 @@ -import ipc from "node-ipc"; -import { Socket } from "net"; -import { ISerializedTorrent, ITorrent } from "../common"; -import Service from "./Service"; -import Application from "../Application"; - -interface IResponse { - response?: any, - error?: string | Error -} - -export default class TorrentClientIpc extends Service -{ - /** - * Indicate if there is an active connection to the IPC - */ - private __isConnected: boolean; - - /** - * The active IPC socket - */ - protected socket!: Socket; - - /** - * Create a new IPC client for the torrent client - */ - constructor(app: Application) { - super("Torrent Client IPC", app); - ipc.config.id = "request"; - ipc.config.retry = 1500; - ipc.config.silent = true; - - this.__isConnected = false; - } - - /** - * Boot the torrent client IPC service - */ - public boot() { - return new Promise((resolve, reject) => { - ipc.connectTo("torrent_client", process.env["TORRENT_CLIENT_IPC_SOCKET"], () => { - this.socket = ipc.of["torrent_client"]; - this.installSocketEventHandlers(this.socket); - this.installSocketMessageHandlers(this.socket); - resolve(); - }); - }); - } - - public async shutdown() { - ipc.disconnect("torrent_client"); - } - - /** - * Install the event handlers for the IPC socket - */ - protected installSocketEventHandlers(socket: Socket) { - socket.on("connect", () => this.onConnect()); - socket.on("error", (error: any) => this.onError(error)); - socket.on("disconnect", () => this.onDisconnect()); - socket.on("destroy", () => this.onDestroy()); - } - - protected installSocketMessageHandlers(socket: Socket) { - } - - // Socket Event Handlers ----------------------------------------------------------------------- - - protected onConnect() { - this.log("IPC: Connection established"); - this.__isConnected = true; - } - - protected onError(error: string | Error) { - this.log("IPC: Error occurred:", error); - } - - protected onDisconnect() { - this.log("IPC: Disconnected"); - this.__isConnected = false; - } - - protected onDestroy() { - this.log("IPC: Destroyed"); - } - - // Methods ------------------------------------------------------------------------------------- - - /** - * Perform a general request to the torrent client - */ - protected async request(method: string, message?: any) { - return new Promise((resolve, reject) => { - if (!this.isConnected) { - throw new Error("Not connected to torrent client"); - } - let respond = (response: any) => { - clearTimeout(timeout); - resolve(response); - } - // Include timeout mechanism in the off chance something breaks - let timeout = setTimeout(() => { - this.socket.off(method, respond); - reject("Torrent client IPC request timeout") - }, 1000); - this.socket.once(method, respond); - this.socket.emit(method, message); - }); - } - - /** - * Add a torrent to the client - * @param torrent Magnet URI or file buffer - */ - public async add(torrent: string | Buffer) { - let response = await this.request("add", torrent); - if (response.error) { - throw new Error("Failed to add torrent"); - } - return response.response; - } - - /** - * Remove a torrent from the client - * @param torrent Torrent info hash - */ - public async remove(torrent: string) { - let response = await this.request("remove", torrent); - if (response.error) { - throw new Error("Failed to remove torrent"); - } - } - - /** - * Get a list of all torrents in the client - */ - public async list() { - let response = await this.request("list"); - if (response.error) { - console.error(response.error); - throw new Error("Failed to obtain torrent list"); - } - return response.response; - } - - /** - * Get full details of each of the provided torrents - * @param torrentIds Array of torrent info hashes - */ - public async details(...torrentIds: string[]) { - let response = await this.request("details", torrentIds); - if (response.error) { - console.error(response.error); - throw new Error("Failed to retrieve torrent details"); - } - return response.response; - } - - // Accessors ----------------------------------------------------------------------------------- - - get isConnected() { - return this.__isConnected; - } -} diff --git a/src/server/services/WebServer/routes/api.ts b/src/server/services/WebServer/routes/api.ts index 1bb41c0..a223d87 100644 --- a/src/server/services/WebServer/routes/api.ts +++ b/src/server/services/WebServer/routes/api.ts @@ -1,5 +1,5 @@ import Application from "@server/Application"; -import SeekerIpc from "@server/services/SeekerIpc"; +import SeekerIpc from "@server/services/Ipc/SeekerIpcClient"; import MovieSearch from "@server/services/MovieSearch"; import { auth } from "../middleware/auth"; import RouteRegisterFactory from "./RouteRegisterFactory"; diff --git a/src/server/services/index.ts b/src/server/services/index.ts index 640729e..0cd5aa0 100644 --- a/src/server/services/index.ts +++ b/src/server/services/index.ts @@ -1,8 +1,8 @@ import Database from "./Database"; import DiscordBot from "./DiscordBot"; import MovieSearch from "./MovieSearch"; -import SeekerIpc from "./SeekerIpc"; -import TorrentClientIpc from "./TorrentClientIpc"; +import SeekerIpcClient from "./Ipc/SeekerIpcClient"; +import TorrentIpcClient from "./Ipc/TorrentIpcClient"; import TvDb from "./TvDb"; import WebServer from "./WebServer"; @@ -10,8 +10,8 @@ export default { Database, // DiscordBot, MovieSearch, - // TorrentClientIpc, - SeekerIpc, + // TorrentIpcClient, + SeekerIpcClient, TvDb, WebServer }