diff --git a/services/seeker/src/services/IpcInterface.ts b/services/seeker/src/services/IpcInterface.ts index ee72356..70b860b 100644 --- a/services/seeker/src/services/IpcInterface.ts +++ b/services/seeker/src/services/IpcInterface.ts @@ -1,30 +1,11 @@ -import { IPC } from "node-ipc"; -import { Socket } from "net"; -import { mkdir } from "fs/promises"; -import { dirname } from "path"; -import { InternalService } from "@autoplex/microservice"; +import { IpcServerService } from "@autoplex/microservice"; import Application from "../Application"; import Supervisor from "./Supervisor"; import { MovieTicket } from "@autoplex/database"; +import { env } from "@autoplex/utils"; -export default class IpcInterface extends InternalService +export default class IpcInterface extends IpcServerService { - /** - * The IPC instance - */ - private __ipc: InstanceType; - - /** - * Create a new IPC interface - */ - public constructor(app: Application) { - super(app); - this.__ipc = new IPC(); - this.__ipc.config.id = "seeker"; - this.__ipc.config.retry = 1500; - this.__ipc.config.silent = true; - } - /** * The service name */ @@ -33,47 +14,17 @@ export default class IpcInterface extends InternalService } /** - * Boot the IPC interface + * The path to the socket file */ - public boot() { - return new Promise(async (resolve, reject) => { - console.log("Serving:", process.env["IPC_SOCKET_PATH"]); - await mkdir(dirname(process.env["IPC_SOCKET_PATH"]), { recursive: true }); - this.__ipc.serve(process.env["IPC_SOCKET_PATH"], () => { - this.installEventHandlers(); - resolve(); - }); - this.__ipc.server.start(); - }); + public get socketPath() { + return env("IPC_SOCKET_PATH"); } - public async shutdown() { - this.__ipc.server.stop(); - } - /** * Install the the event handlers */ - protected installEventHandlers() { - this.addEventHandler("movie_ticket_added", this.onMovieTicketAdded); - } - - /** - * Handle a specific event - */ - protected addEventHandler(method: string, handle: (...args: any[]) => Promise) { - this.__ipc.server.on(method, async (message: any, socket: Socket) => { - try { - let response = await handle.apply(this, [message]); - this.__ipc.server.emit(socket, method, { response }); - } catch (error) { - console.log("Error:", method, error); - this.__ipc.server.emit(socket, method, { - response: undefined, - error - }); - } - }); + protected installMessageHandlers() { + this.addMessageHandler("movie_ticket_added", this.onMovieTicketAdded); } // Interface Methods --------------------------------------------------------------------------- diff --git a/services/seeker/src/services/TorrentManager/TorrentClientIpc.ts b/services/seeker/src/services/TorrentManager/TorrentClientIpc.ts index 4c57733..bd096ba 100644 --- a/services/seeker/src/services/TorrentManager/TorrentClientIpc.ts +++ b/services/seeker/src/services/TorrentManager/TorrentClientIpc.ts @@ -1,7 +1,7 @@ import { Socket } from "net"; import Application from "../../Application"; -import ipc = require("node-ipc"); -import { InternalService } from "@autoplex/microservice"; +import { IpcClientService } from "@autoplex/microservice"; +import { env } from "@autoplex/utils"; interface IResponse { response?: any, @@ -58,54 +58,13 @@ export class TorrentClientConnectionError extends Error { /** * The torrent client IPC service */ -export default abstract class TorrentClientIpc extends InternalService +export default abstract class TorrentClientIpc extends IpcClientService { /** - * Indicate if there is an active connection to the IPC + * The path to the socket file */ - private __isConnected: boolean; - - /** - * IPC instance - */ - private __ipc = new ipc.IPC(); - - /** - * The active IPC socket - */ - protected socket!: Socket; - - /** - * Create a new IPC client for the torrent client - */ - constructor(app: Application) { - super(app); - this.__ipc.config.id = "seeker"; - this.__ipc.config.retry = 1500; - this.__ipc.config.silent = true; - - this.__isConnected = false; - } - - /** - * Boot the torrent client IPC service - */ - public boot() { - return new Promise((resolve, reject) => { - this.__ipc.connectTo("torrent_client", process.env["TORRENT_CLIENT_IPC_SOCKET"], () => { - this.socket = this.__ipc.of["torrent_client"]; - this.installSocketEventHandlers(this.socket); - this.installSocketMessageHandlers(this.socket); - resolve(); - }); - }); - } - - /** - * Shutdown the service - */ - public async shutdown() { - this.__ipc.disconnect("torrent_client"); + public get socketPath() { + return env("TORRENT_CLIENT_IPC_SOCKET"); } /** @@ -118,43 +77,13 @@ export default abstract class TorrentClientIpc extends InternalService this.onDestroy()); } - protected installSocketMessageHandlers(socket: Socket) { - this.installSocketMessageHandler(socket, "torrent_finished", this.onTorrentFinished); - } - /** - * Install a socket message handler + * Install the message event handlers */ - protected installSocketMessageHandler(socket: Socket, method: string, handler: (...args: any[]) => void) { - socket.on(method, (args: any[]) => handler.apply(this, args)); + protected installMessageHandlers() { + this.addListener("torrent_finished", this.onTorrentFinished); } - // Socket Event Handlers ----------------------------------------------------------------------- - - protected onConnect() { - this.log("IPC: Connection established"); - this.__isConnected = true; - } - - protected onError(error: string | Error) { - if (this.__isConnected) { - this.log("IPC: Error occurred:", error); - } - } - - protected onDisconnect() { - if (this.__isConnected) { - this.log("IPC: Disconnected"); - } - this.__isConnected = false; - } - - protected onDestroy() { - this.log("IPC: Destroyed"); - } - - // Socket Message Handlers --------------------------------------------------------------------- - /** * Invoked when a torrent has finished downloading */ @@ -162,27 +91,12 @@ export default abstract class TorrentClientIpc extends InternalService((resolve, reject) => { - if (!this.isConnected) { - reject(new TorrentClientConnectionError("Not connected to torrent client")); - return; - } - let respond = (response: any) => { - clearTimeout(timeout); - resolve(response); - } - // Include timeout mechanism in the off chance something breaks - let timeout = setTimeout(() => { - this.socket.off(method, respond); - reject(new TorrentClientConnectionError("Torrent client IPC request timeout")); - }, 1000); - this.socket.once(method, respond); - this.socket.emit(method, message); - }); + protected async test(value: string) { + let response = await this.request("test", value); + if (response.error) { + throw new Error("Failed to complete test"); + } + return response.data; } /** @@ -190,11 +104,11 @@ export default abstract class TorrentClientIpc extends InternalServiceresponse.response; + return response.data; } /** @@ -217,7 +131,7 @@ export default abstract class TorrentClientIpc extends InternalServiceresponse.response; + return response.data; } /** @@ -230,12 +144,6 @@ export default abstract class TorrentClientIpc extends InternalServiceresponse.response; - } - - // Accessors ----------------------------------------------------------------------------------- - - get isConnected() { - return this.__isConnected; + return response.data; } } diff --git a/services/seeker/src/services/TorrentManager/TorrentManager.ts b/services/seeker/src/services/TorrentManager/TorrentManager.ts index 4b88072..f3c07a6 100644 --- a/services/seeker/src/services/TorrentManager/TorrentManager.ts +++ b/services/seeker/src/services/TorrentManager/TorrentManager.ts @@ -49,6 +49,8 @@ export default class TorrentManager extends TorrentClientIpc this.log("Booting the torrent manager"); await super.boot(); await this.loadDisks(); + console.log("Sending test request...") + console.log("Response is:", await this.test("test value")); } // Interface methods ---------------------------------------------------------------------------