From f459f4ed9300ca918f93996c8ee4379a28af416d Mon Sep 17 00:00:00 2001 From: David Ludwig Date: Sun, 2 May 2021 21:57:21 -0500 Subject: [PATCH] Add generic IPC services to microservice package --- packages/microservice/package.json | 4 + packages/microservice/src/index.ts | 1 + .../src/ipc/AbstractIpcService.ts | 68 +++++++ .../microservice/src/ipc/IpcClientService.ts | 190 ++++++++++++++++++ packages/microservice/src/ipc/IpcError.ts | 29 +++ .../microservice/src/ipc/IpcServerService.ts | 92 +++++++++ packages/microservice/src/ipc/index.ts | 4 + packages/microservice/src/ipc/schema.ts | 27 +++ packages/microservice/yarn.lock | 40 +++- 9 files changed, 454 insertions(+), 1 deletion(-) create mode 100644 packages/microservice/src/ipc/AbstractIpcService.ts create mode 100644 packages/microservice/src/ipc/IpcClientService.ts create mode 100644 packages/microservice/src/ipc/IpcError.ts create mode 100644 packages/microservice/src/ipc/IpcServerService.ts create mode 100644 packages/microservice/src/ipc/index.ts create mode 100644 packages/microservice/src/ipc/schema.ts diff --git a/packages/microservice/package.json b/packages/microservice/package.json index 11e2b0a..0a52f39 100644 --- a/packages/microservice/package.json +++ b/packages/microservice/package.json @@ -10,7 +10,11 @@ }, "devDependencies": { "@types/node": "^15.0.1", + "@types/node-ipc": "^9.1.3", "rimraf": "^3.0.2", "typescript": "^4.2.4" + }, + "dependencies": { + "node-ipc": "^9.1.4" } } diff --git a/packages/microservice/src/index.ts b/packages/microservice/src/index.ts index 91c2b81..9664c6c 100644 --- a/packages/microservice/src/index.ts +++ b/packages/microservice/src/index.ts @@ -1,2 +1,3 @@ export * from "./Microservice"; export * from "./InternalService"; +export * from "./ipc"; diff --git a/packages/microservice/src/ipc/AbstractIpcService.ts b/packages/microservice/src/ipc/AbstractIpcService.ts new file mode 100644 index 0000000..2b5d490 --- /dev/null +++ b/packages/microservice/src/ipc/AbstractIpcService.ts @@ -0,0 +1,68 @@ +import RawIPC = require("node-ipc"); +import { InternalService } from "../InternalService"; +import { Microservice } from "../Microservice"; +import { IPC, IpcMessageHandler } from "./schema"; + +/** + * An abstract IPC service containing common properties/methods among the server and client + */ +export default abstract class AbstractIpcService extends InternalService +{ + /** + * The IPC instance + */ + private __ipc: IPC|null = null; + + // Implementation Requirements ----------------------------------------------------------------- + + /** + * The path to the socket file + */ + protected abstract get socketPath(): string; + + /** + * Add a message handler for the service + */ + protected abstract addMessageHandler(method: string, handle: IpcMessageHandler): void; + + /** + * Boot the IPC service after configuration is complete + */ + protected abstract bootIpc(ipc: IPC): Promise; + + /** + * Shutdown the IPC service before it is destroyed + */ + protected abstract shutdownIpc(ipc: IPC|null): Promise; + + // Service Management -------------------------------------------------------------------------- + + /** + * Boot the IPC service + */ + public async boot() { + // Create the IPC socket + this.__ipc = new RawIPC.IPC(); + this.__ipc.config.id = this.name; + this.__ipc.config.retry = 1500; + this.__ipc.config.silent = true; + await this.bootIpc(this.__ipc); + } + + /** + * Shutdown the IPC service + */ + public async shutdown() { + await this.shutdownIpc(this.__ipc); + this.__ipc = null; + } + + // Accessors ----------------------------------------------------------------------------------- + + /** + * Get the raw IPC instance + */ + protected get rawIpcInstance(): IPC|null { + return this.__ipc; + } +} diff --git a/packages/microservice/src/ipc/IpcClientService.ts b/packages/microservice/src/ipc/IpcClientService.ts new file mode 100644 index 0000000..dce956a --- /dev/null +++ b/packages/microservice/src/ipc/IpcClientService.ts @@ -0,0 +1,190 @@ +import { Socket } from "net"; +import assert from "assert"; +import { Microservice } from "../Microservice"; +import AbstractIpcService from "./AbstractIpcService"; +import { IIpcResponse, IIpcRequest, IPC } from "./schema"; +import { IpcConnectionError, IpcTimeoutError } from "./IpcError"; + +export abstract class IpcClientService extends AbstractIpcService +{ + /** + * Indicate if there is an active connection to the IPC + */ + private __isConnected: boolean = false; + + /** + * The most recent request ID + */ + private __requestId!: number; + + /** + * The active IPC socket + */ + private __socket: Socket|null = null; + + // Service Implementation ---------------------------------------------------------------------- + + /** + * Install the event handlers for receiving on the IPC socket + * + * Example: this.addMessageHandler("some_event", this.onSomeEvent) + */ + protected installMessageHandlers() { + // no-op + } + + // Service Management -------------------------------------------------------------------------- + + /** + * Boot the IPC client service + */ + public bootIpc(ipc: IPC) { + // Connect to the server + return new Promise((resolve, reject) => { + this.rawIpcInstance!.connectTo(this.name, this.socketPath, () => { + this.__isConnected = true; + this.__requestId = 0; + this.__socket = ipc!.of[this.name]; + this.installEventHandlers(this.__socket!); + this.installMessageHandlers(); + resolve(); + }); + }); + } + + /** + * Install the event handlers for the IPC socket + */ + protected installEventHandlers(socket: Socket) { + socket.on("connect", () => this.onConnect()); + socket.on("error", (error: any) => this.onError(error)); + socket.on("disconnect", () => this.onDisconnect()); + socket.on("destroy", () => this.onDestroy()); + } + + /** + * Add a handler for an event broadcasted by the server + */ + protected addMessageHandler(method: string, handle: (...args: any[]) => Promise) { + assert(this.__socket !== null, "Attempted to add events to null socket"); + this.__socket.on(method, async (data: any) => handle.apply(this, [data])); + } + + /** + * Shutdown the IPC service + */ + public async shutdownIpc(ipc: IPC|null) { + ipc?.disconnect(this.name); + this.__socket?.removeAllListeners(); + this.__socket?.destroy(); + this.__socket = null; + } + + // Socket Event Handlers ----------------------------------------------------------------------- + + /** + * Invoked when the client established a connection to an IPC server + */ + protected onConnect() { + this.log("IPC: Connection established"); + this.__isConnected = true; + + } + + /** + * Invoked when an IPC error occurs + */ + protected onError(error: string | Error) { + if (this.__isConnected) { + this.log("IPC: Error occurred:", error); + } + } + + /** + * Invoked when disconnected from an IPC server + */ + protected onDisconnect() { + if (this.__isConnected) { + this.log("IPC: Disconnected"); + } + this.__isConnected = false; + } + + /** + * Invoked when the IPC socket has been destroyed + */ + protected onDestroy() { + this.log("IPC: Destroyed"); + this.__isConnected = false; + } + + // Methods ------------------------------------------------------------------------------------- + + /** + * Perform a general request and wait for a response + */ + protected async request(method: string, data?: any, timeout: number|null = null) { + return new Promise((resolve, reject) => { + // If the client is not connected to a server, reject immediately + if (!this.__isConnected || this.__socket === null) { + reject(new IpcConnectionError("Not connected")); + return; + } + // Clean up event listeners + let cleanUp = () => { + if (responseTimeout !== null) { + clearTimeout(responseTimeout); + } + if (this.__socket === null) { + return; + } + this.__socket.off(responseMethod, respond); + this.__socket.off("disconnect", respond); + this.__socket.off("destroy", respond); + }; + // Handle the response + let respond = (response: IIpcResponse) => { + cleanUp(); + resolve(response); + }; + // Abort the request + let abort = (error: Error) => { + cleanUp(); + reject(error); + }; + // Fetch a request ID and declare a timeout + const requestId = this.__requestId++; + const responseMethod = `method_response_${requestId}`; + // Include timeout mechanism in the off chance something breaks + let responseTimeout: NodeJS.Timeout|null = null; + if (timeout !== null) { + responseTimeout = setTimeout(() => abort( + new IpcTimeoutError("Timeout") + ), timeout); + } + this.__socket.once("disconnect", () => abort(new IpcConnectionError("Disconnected"))); + this.__socket.once("destroy", () => abort(new IpcConnectionError("Destroyed"))); + this.__socket.once(responseMethod, respond); + this.__socket.emit(method, { id: requestId, data }); + }); + } + + /** + * Send a message over IPC without waiting for a response + */ + protected send(method: string, data?: any) { + if (!this.__isConnected || this.__socket === null) { + throw new IpcConnectionError("Not connected"); + } + this.__socket.emit(method, { id: null, data }); + } + + // Accessors ----------------------------------------------------------------------------------- + + /** + * Get the connection status of the IPC connection + */ + public get isConnected() { + return this.__isConnected; + } +} diff --git a/packages/microservice/src/ipc/IpcError.ts b/packages/microservice/src/ipc/IpcError.ts new file mode 100644 index 0000000..ec31523 --- /dev/null +++ b/packages/microservice/src/ipc/IpcError.ts @@ -0,0 +1,29 @@ +/** + * Generic IPC Error type + */ + export class IpcError extends Error { + constructor(...args: any[]) { + super(...args); + Object.setPrototypeOf(this, IpcError.prototype); + } +} + +/** + * IPC connection error type + */ +export class IpcConnectionError extends IpcError { + constructor(...args: any[]) { + super(...args); + Object.setPrototypeOf(this, IpcConnectionError.prototype); + } +} + +/** + * IPC timeout error type + */ +export class IpcTimeoutError extends IpcError { + constructor(...args: any[]) { + super(...args); + Object.setPrototypeOf(this, IpcTimeoutError.prototype); + } +} diff --git a/packages/microservice/src/ipc/IpcServerService.ts b/packages/microservice/src/ipc/IpcServerService.ts new file mode 100644 index 0000000..553a74e --- /dev/null +++ b/packages/microservice/src/ipc/IpcServerService.ts @@ -0,0 +1,92 @@ +import assert from "assert"; +import { mkdir } from "fs/promises"; +import { Socket } from "net"; +import { dirname } from "path"; +import { Microservice } from "../Microservice"; +import { IIpcRequest, IPC } from "./schema"; +import AbstractIpcService from "./AbstractIpcService"; + +type IpcServer = IPC["server"]; + +export abstract class IpcServerService extends AbstractIpcService +{ + /** + * The IPC server instance + */ + private __server!: IpcServer|null; + + // Service Implementation ---------------------------------------------------------------------- + + /** + * Install the event handlers for receiving on the IPC socket + * + * Example: this.addMessageHandler("some_event", this.onSomeEvent) + */ + protected installMessageHandlers() { + console.log("Installing from parent"); + // no-op + } + + // Service Management -------------------------------------------------------------------------- + + /** + * Boot the IPC service + */ + public bootIpc(ipc: IPC) { + return new Promise(async (resolve) => { + // Create the socket directory if it doesn't exist + await mkdir(dirname(this.socketPath), { recursive: true }); + // Serve the IPC server + ipc.serve(this.socketPath, () => { + this.__server = ipc.server; + this.installMessageHandlers(); + resolve(); + }); + ipc.server.start(); + }); + } + + /** + * Add a message/request handler for the server + */ + protected addMessageHandler(method: string, handle: (...args: any[]) => Promise) { + assert(this.__server !== null, "Attempted to add events to null server"); + this.__server.on(method, async (request: IIpcRequest, socket: Socket) => { + let handlerPromise = handle.apply(this, [request.data]); + if (request.id === null) { + handlerPromise.catch(error => this.log("Error:", method, error, request)); + return; + } + const responseMethod = `method_response_${request.id}`; + try { + this.__server!.emit(socket, responseMethod, { data: await handlerPromise }); + } catch(error) { + this.log(this.log("Error:", method, error, request)); + this.__server!.emit(socket, responseMethod, { error }); + } + }); + } + + /** + * Shutdown the IPC service + */ + public async shutdownIpc(ipc: IPC|null) { + this.__server?.stop(); + this.__server = null; + for (let socket of Object.values(ipc?.of ?? [])) { + socket.destroy(); + } + } + + // Methods ------------------------------------------------------------------------------------- + + /** + * Broadcast a message to all connected clients + */ + public broadcast(method: string, data?: any) { + if (this.__server === null) { + return; + } + this.__server.emit(method, data); + } +} diff --git a/packages/microservice/src/ipc/index.ts b/packages/microservice/src/ipc/index.ts new file mode 100644 index 0000000..e01bf9a --- /dev/null +++ b/packages/microservice/src/ipc/index.ts @@ -0,0 +1,4 @@ +export * from "./schema"; +export * from "./IpcError"; +export * from "./IpcClientService"; +export * from "./IpcServerService"; diff --git a/packages/microservice/src/ipc/schema.ts b/packages/microservice/src/ipc/schema.ts new file mode 100644 index 0000000..3e7c673 --- /dev/null +++ b/packages/microservice/src/ipc/schema.ts @@ -0,0 +1,27 @@ +import type RawIPC = require("node-ipc"); + +/** + * The IPC request structure + */ +export interface IIpcRequest { + id: number|null, + data ?: any +} + +/** + * The IPC response structure + */ +export interface IIpcResponse { + data ?: any, + error?: string | Error +} + +/** + * The IPC message handler type + */ +export type IpcMessageHandler = (...args: any[]) => Promise + + /** + * HOLY @#$@% WHOEVER MADE THE TYPES FOR `node-ipc` SHOULDB BE HANGED + */ +export type IPC = InstanceType<(typeof RawIPC)["IPC"]>; diff --git a/packages/microservice/yarn.lock b/packages/microservice/yarn.lock index f175550..2f84ca0 100644 --- a/packages/microservice/yarn.lock +++ b/packages/microservice/yarn.lock @@ -2,7 +2,14 @@ # yarn lockfile v1 -"@types/node@^15.0.1": +"@types/node-ipc@^9.1.3": + version "9.1.3" + resolved "https://registry.yarnpkg.com/@types/node-ipc/-/node-ipc-9.1.3.tgz#5381fbc910071083b28dd43225727877c108b361" + integrity sha512-ka7CPX9Dk2lwe4PxoZMLOwcQrtdcYe/7OKmH75fQbmt0jdKltWVkdGA81D5l55d0wNhkweHa3XmzFbt5C0ieOQ== + dependencies: + "@types/node" "*" + +"@types/node@*", "@types/node@^15.0.1": version "15.0.1" resolved "https://registry.yarnpkg.com/@types/node/-/node-15.0.1.tgz#ef34dea0881028d11398be5bf4e856743e3dc35a" integrity sha512-TMkXt0Ck1y0KKsGr9gJtWGjttxlZnnvDtphxUOSd0bfaR6Q1jle+sPvrzNR1urqYTWMinoKvjKfXUGsumaO1PA== @@ -25,6 +32,16 @@ concat-map@0.0.1: resolved "https://registry.yarnpkg.com/concat-map/-/concat-map-0.0.1.tgz#d8a96bd77fd68df7793a73036a3ba0d5405d477b" integrity sha1-2Klr13/Wjfd5OnMDajug1UBdR3s= +easy-stack@^1.0.1: + version "1.0.1" + resolved "https://registry.yarnpkg.com/easy-stack/-/easy-stack-1.0.1.tgz#8afe4264626988cabb11f3c704ccd0c835411066" + integrity sha512-wK2sCs4feiiJeFXn3zvY0p41mdU5VUgbgs1rNsc/y5ngFUijdWd+iIN8eoyuZHKB8xN6BL4PdWmzqFmxNg6V2w== + +event-pubsub@4.3.0: + version "4.3.0" + resolved "https://registry.yarnpkg.com/event-pubsub/-/event-pubsub-4.3.0.tgz#f68d816bc29f1ec02c539dc58c8dd40ce72cb36e" + integrity sha512-z7IyloorXvKbFx9Bpie2+vMJKKx1fH1EN5yiTfp8CiLOTptSYy1g8H4yDpGlEdshL1PBiFtBHepF2cNsqeEeFQ== + fs.realpath@^1.0.0: version "1.0.0" resolved "https://registry.yarnpkg.com/fs.realpath/-/fs.realpath-1.0.0.tgz#1504ad2523158caa40db4a2787cb01411994ea4f" @@ -55,6 +72,18 @@ inherits@2: resolved "https://registry.yarnpkg.com/inherits/-/inherits-2.0.4.tgz#0fa2c64f932917c3433a0ded55363aae37416b7c" integrity sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ== +js-message@1.0.7: + version "1.0.7" + resolved "https://registry.yarnpkg.com/js-message/-/js-message-1.0.7.tgz#fbddd053c7a47021871bb8b2c95397cc17c20e47" + integrity sha512-efJLHhLjIyKRewNS9EGZ4UpI8NguuL6fKkhRxVuMmrGV2xN/0APGdQYwLFky5w9naebSZ0OwAGp0G6/2Cg90rA== + +js-queue@2.0.2: + version "2.0.2" + resolved "https://registry.yarnpkg.com/js-queue/-/js-queue-2.0.2.tgz#0be590338f903b36c73d33c31883a821412cd482" + integrity sha512-pbKLsbCfi7kriM3s1J4DDCo7jQkI58zPLHi0heXPzPlj0hjUsm+FesPUbE0DSbIVIK503A36aUBoCN7eMFedkA== + dependencies: + easy-stack "^1.0.1" + minimatch@^3.0.4: version "3.0.4" resolved "https://registry.yarnpkg.com/minimatch/-/minimatch-3.0.4.tgz#5166e286457f03306064be5497e8dbb0c3d32083" @@ -62,6 +91,15 @@ minimatch@^3.0.4: dependencies: brace-expansion "^1.1.7" +node-ipc@^9.1.4: + version "9.1.4" + resolved "https://registry.yarnpkg.com/node-ipc/-/node-ipc-9.1.4.tgz#2acf962681afdac2602876d98fe6434d54d9bd3c" + integrity sha512-A+f0mn2KxUt1uRTSd5ktxQUsn2OEhj5evo7NUi/powBzMSZ0vocdzDjlq9QN2v3LH6CJi3e5xAenpZ1QwU5A8g== + dependencies: + event-pubsub "4.3.0" + js-message "1.0.7" + js-queue "2.0.2" + once@^1.3.0: version "1.4.0" resolved "https://registry.yarnpkg.com/once/-/once-1.4.0.tgz#583b1aa775961d4b113ac17d9c50baef9dd76bd1"