Browse Source

Add generic IPC services to microservice package

dev
David Ludwig 4 years ago
parent
commit
f459f4ed93
9 changed files with 454 additions and 1 deletions
  1. +4
    -0
      packages/microservice/package.json
  2. +1
    -0
      packages/microservice/src/index.ts
  3. +68
    -0
      packages/microservice/src/ipc/AbstractIpcService.ts
  4. +190
    -0
      packages/microservice/src/ipc/IpcClientService.ts
  5. +29
    -0
      packages/microservice/src/ipc/IpcError.ts
  6. +92
    -0
      packages/microservice/src/ipc/IpcServerService.ts
  7. +4
    -0
      packages/microservice/src/ipc/index.ts
  8. +27
    -0
      packages/microservice/src/ipc/schema.ts
  9. +39
    -1
      packages/microservice/yarn.lock

+ 4
- 0
packages/microservice/package.json View File

@ -10,7 +10,11 @@
}, },
"devDependencies": { "devDependencies": {
"@types/node": "^15.0.1", "@types/node": "^15.0.1",
"@types/node-ipc": "^9.1.3",
"rimraf": "^3.0.2", "rimraf": "^3.0.2",
"typescript": "^4.2.4" "typescript": "^4.2.4"
},
"dependencies": {
"node-ipc": "^9.1.4"
} }
} }

+ 1
- 0
packages/microservice/src/index.ts View File

@ -1,2 +1,3 @@
export * from "./Microservice"; export * from "./Microservice";
export * from "./InternalService"; export * from "./InternalService";
export * from "./ipc";

+ 68
- 0
packages/microservice/src/ipc/AbstractIpcService.ts View File

@ -0,0 +1,68 @@
import RawIPC = require("node-ipc");
import { InternalService } from "../InternalService";
import { Microservice } from "../Microservice";
import { IPC, IpcMessageHandler } from "./schema";
/**
* An abstract IPC service containing common properties/methods among the server and client
*/
export default abstract class AbstractIpcService<M extends Microservice = Microservice> extends InternalService<M>
{
/**
* The IPC instance
*/
private __ipc: IPC|null = null;
// Implementation Requirements -----------------------------------------------------------------
/**
* The path to the socket file
*/
protected abstract get socketPath(): string;
/**
* Add a message handler for the service
*/
protected abstract addMessageHandler(method: string, handle: IpcMessageHandler): void;
/**
* Boot the IPC service after configuration is complete
*/
protected abstract bootIpc(ipc: IPC): Promise<void>;
/**
* Shutdown the IPC service before it is destroyed
*/
protected abstract shutdownIpc(ipc: IPC|null): Promise<void>;
// Service Management --------------------------------------------------------------------------
/**
* Boot the IPC service
*/
public async boot() {
// Create the IPC socket
this.__ipc = new RawIPC.IPC();
this.__ipc.config.id = this.name;
this.__ipc.config.retry = 1500;
this.__ipc.config.silent = true;
await this.bootIpc(this.__ipc);
}
/**
* Shutdown the IPC service
*/
public async shutdown() {
await this.shutdownIpc(this.__ipc);
this.__ipc = null;
}
// Accessors -----------------------------------------------------------------------------------
/**
* Get the raw IPC instance
*/
protected get rawIpcInstance(): IPC|null {
return this.__ipc;
}
}

+ 190
- 0
packages/microservice/src/ipc/IpcClientService.ts View File

@ -0,0 +1,190 @@
import { Socket } from "net";
import assert from "assert";
import { Microservice } from "../Microservice";
import AbstractIpcService from "./AbstractIpcService";
import { IIpcResponse, IIpcRequest, IPC } from "./schema";
import { IpcConnectionError, IpcTimeoutError } from "./IpcError";
export abstract class IpcClientService<M extends Microservice = Microservice> extends AbstractIpcService<M>
{
/**
* Indicate if there is an active connection to the IPC
*/
private __isConnected: boolean = false;
/**
* The most recent request ID
*/
private __requestId!: number;
/**
* The active IPC socket
*/
private __socket: Socket|null = null;
// Service Implementation ----------------------------------------------------------------------
/**
* Install the event handlers for receiving on the IPC socket
*
* Example: this.addMessageHandler("some_event", this.onSomeEvent)
*/
protected installMessageHandlers() {
// no-op
}
// Service Management --------------------------------------------------------------------------
/**
* Boot the IPC client service
*/
public bootIpc(ipc: IPC) {
// Connect to the server
return new Promise<void>((resolve, reject) => {
this.rawIpcInstance!.connectTo(this.name, this.socketPath, () => {
this.__isConnected = true;
this.__requestId = 0;
this.__socket = ipc!.of[this.name];
this.installEventHandlers(this.__socket!);
this.installMessageHandlers();
resolve();
});
});
}
/**
* Install the event handlers for the IPC socket
*/
protected installEventHandlers(socket: Socket) {
socket.on("connect", () => this.onConnect());
socket.on("error", (error: any) => this.onError(error));
socket.on("disconnect", () => this.onDisconnect());
socket.on("destroy", () => this.onDestroy());
}
/**
* Add a handler for an event broadcasted by the server
*/
protected addMessageHandler(method: string, handle: (...args: any[]) => Promise<any>) {
assert(this.__socket !== null, "Attempted to add events to null socket");
this.__socket.on(method, async (data: any) => handle.apply(this, [data]));
}
/**
* Shutdown the IPC service
*/
public async shutdownIpc(ipc: IPC|null) {
ipc?.disconnect(this.name);
this.__socket?.removeAllListeners();
this.__socket?.destroy();
this.__socket = null;
}
// Socket Event Handlers -----------------------------------------------------------------------
/**
* Invoked when the client established a connection to an IPC server
*/
protected onConnect() {
this.log("IPC: Connection established");
this.__isConnected = true;
}
/**
* Invoked when an IPC error occurs
*/
protected onError(error: string | Error) {
if (this.__isConnected) {
this.log("IPC: Error occurred:", error);
}
}
/**
* Invoked when disconnected from an IPC server
*/
protected onDisconnect() {
if (this.__isConnected) {
this.log("IPC: Disconnected");
}
this.__isConnected = false;
}
/**
* Invoked when the IPC socket has been destroyed
*/
protected onDestroy() {
this.log("IPC: Destroyed");
this.__isConnected = false;
}
// Methods -------------------------------------------------------------------------------------
/**
* Perform a general request and wait for a response
*/
protected async request(method: string, data?: any, timeout: number|null = null) {
return new Promise<IIpcResponse>((resolve, reject) => {
// If the client is not connected to a server, reject immediately
if (!this.__isConnected || this.__socket === null) {
reject(new IpcConnectionError("Not connected"));
return;
}
// Clean up event listeners
let cleanUp = () => {
if (responseTimeout !== null) {
clearTimeout(responseTimeout);
}
if (this.__socket === null) {
return;
}
this.__socket.off(responseMethod, respond);
this.__socket.off("disconnect", respond);
this.__socket.off("destroy", respond);
};
// Handle the response
let respond = (response: IIpcResponse) => {
cleanUp();
resolve(response);
};
// Abort the request
let abort = (error: Error) => {
cleanUp();
reject(error);
};
// Fetch a request ID and declare a timeout
const requestId = this.__requestId++;
const responseMethod = `method_response_${requestId}`;
// Include timeout mechanism in the off chance something breaks
let responseTimeout: NodeJS.Timeout|null = null;
if (timeout !== null) {
responseTimeout = setTimeout(() => abort(
new IpcTimeoutError("Timeout")
), timeout);
}
this.__socket.once("disconnect", () => abort(new IpcConnectionError("Disconnected")));
this.__socket.once("destroy", () => abort(new IpcConnectionError("Destroyed")));
this.__socket.once(responseMethod, respond);
this.__socket.emit(method, <IIpcRequest>{ id: requestId, data });
});
}
/**
* Send a message over IPC without waiting for a response
*/
protected send(method: string, data?: any) {
if (!this.__isConnected || this.__socket === null) {
throw new IpcConnectionError("Not connected");
}
this.__socket.emit(method, <IIpcRequest>{ id: null, data });
}
// Accessors -----------------------------------------------------------------------------------
/**
* Get the connection status of the IPC connection
*/
public get isConnected() {
return this.__isConnected;
}
}

+ 29
- 0
packages/microservice/src/ipc/IpcError.ts View File

@ -0,0 +1,29 @@
/**
* Generic IPC Error type
*/
export class IpcError extends Error {
constructor(...args: any[]) {
super(...args);
Object.setPrototypeOf(this, IpcError.prototype);
}
}
/**
* IPC connection error type
*/
export class IpcConnectionError extends IpcError {
constructor(...args: any[]) {
super(...args);
Object.setPrototypeOf(this, IpcConnectionError.prototype);
}
}
/**
* IPC timeout error type
*/
export class IpcTimeoutError extends IpcError {
constructor(...args: any[]) {
super(...args);
Object.setPrototypeOf(this, IpcTimeoutError.prototype);
}
}

+ 92
- 0
packages/microservice/src/ipc/IpcServerService.ts View File

@ -0,0 +1,92 @@
import assert from "assert";
import { mkdir } from "fs/promises";
import { Socket } from "net";
import { dirname } from "path";
import { Microservice } from "../Microservice";
import { IIpcRequest, IPC } from "./schema";
import AbstractIpcService from "./AbstractIpcService";
type IpcServer = IPC["server"];
export abstract class IpcServerService<M extends Microservice = Microservice> extends AbstractIpcService<M>
{
/**
* The IPC server instance
*/
private __server!: IpcServer|null;
// Service Implementation ----------------------------------------------------------------------
/**
* Install the event handlers for receiving on the IPC socket
*
* Example: this.addMessageHandler("some_event", this.onSomeEvent)
*/
protected installMessageHandlers() {
console.log("Installing from parent");
// no-op
}
// Service Management --------------------------------------------------------------------------
/**
* Boot the IPC service
*/
public bootIpc(ipc: IPC) {
return new Promise<void>(async (resolve) => {
// Create the socket directory if it doesn't exist
await mkdir(dirname(this.socketPath), { recursive: true });
// Serve the IPC server
ipc.serve(this.socketPath, () => {
this.__server = ipc.server;
this.installMessageHandlers();
resolve();
});
ipc.server.start();
});
}
/**
* Add a message/request handler for the server
*/
protected addMessageHandler(method: string, handle: (...args: any[]) => Promise<any>) {
assert(this.__server !== null, "Attempted to add events to null server");
this.__server.on(method, async (request: IIpcRequest, socket: Socket) => {
let handlerPromise = handle.apply(this, [request.data]);
if (request.id === null) {
handlerPromise.catch(error => this.log("Error:", method, error, request));
return;
}
const responseMethod = `method_response_${request.id}`;
try {
this.__server!.emit(socket, responseMethod, { data: await handlerPromise });
} catch(error) {
this.log(this.log("Error:", method, error, request));
this.__server!.emit(socket, responseMethod, { error });
}
});
}
/**
* Shutdown the IPC service
*/
public async shutdownIpc(ipc: IPC|null) {
this.__server?.stop();
this.__server = null;
for (let socket of <Socket[]>Object.values(ipc?.of ?? [])) {
socket.destroy();
}
}
// Methods -------------------------------------------------------------------------------------
/**
* Broadcast a message to all connected clients
*/
public broadcast(method: string, data?: any) {
if (this.__server === null) {
return;
}
this.__server.emit(method, data);
}
}

+ 4
- 0
packages/microservice/src/ipc/index.ts View File

@ -0,0 +1,4 @@
export * from "./schema";
export * from "./IpcError";
export * from "./IpcClientService";
export * from "./IpcServerService";

+ 27
- 0
packages/microservice/src/ipc/schema.ts View File

@ -0,0 +1,27 @@
import type RawIPC = require("node-ipc");
/**
* The IPC request structure
*/
export interface IIpcRequest {
id: number|null,
data ?: any
}
/**
* The IPC response structure
*/
export interface IIpcResponse {
data ?: any,
error?: string | Error
}
/**
* The IPC message handler type
*/
export type IpcMessageHandler = (...args: any[]) => Promise<any>
/**
* HOLY @#$@% WHOEVER MADE THE TYPES FOR `node-ipc` SHOULDB BE HANGED
*/
export type IPC = InstanceType<(typeof RawIPC)["IPC"]>;

+ 39
- 1
packages/microservice/yarn.lock View File

@ -2,7 +2,14 @@
# yarn lockfile v1 # yarn lockfile v1
"@types/node@^15.0.1":
"@types/node-ipc@^9.1.3":
version "9.1.3"
resolved "https://registry.yarnpkg.com/@types/node-ipc/-/node-ipc-9.1.3.tgz#5381fbc910071083b28dd43225727877c108b361"
integrity sha512-ka7CPX9Dk2lwe4PxoZMLOwcQrtdcYe/7OKmH75fQbmt0jdKltWVkdGA81D5l55d0wNhkweHa3XmzFbt5C0ieOQ==
dependencies:
"@types/node" "*"
"@types/node@*", "@types/node@^15.0.1":
version "15.0.1" version "15.0.1"
resolved "https://registry.yarnpkg.com/@types/node/-/node-15.0.1.tgz#ef34dea0881028d11398be5bf4e856743e3dc35a" resolved "https://registry.yarnpkg.com/@types/node/-/node-15.0.1.tgz#ef34dea0881028d11398be5bf4e856743e3dc35a"
integrity sha512-TMkXt0Ck1y0KKsGr9gJtWGjttxlZnnvDtphxUOSd0bfaR6Q1jle+sPvrzNR1urqYTWMinoKvjKfXUGsumaO1PA== integrity sha512-TMkXt0Ck1y0KKsGr9gJtWGjttxlZnnvDtphxUOSd0bfaR6Q1jle+sPvrzNR1urqYTWMinoKvjKfXUGsumaO1PA==
@ -25,6 +32,16 @@ concat-map@0.0.1:
resolved "https://registry.yarnpkg.com/concat-map/-/concat-map-0.0.1.tgz#d8a96bd77fd68df7793a73036a3ba0d5405d477b" resolved "https://registry.yarnpkg.com/concat-map/-/concat-map-0.0.1.tgz#d8a96bd77fd68df7793a73036a3ba0d5405d477b"
integrity sha1-2Klr13/Wjfd5OnMDajug1UBdR3s= integrity sha1-2Klr13/Wjfd5OnMDajug1UBdR3s=
easy-stack@^1.0.1:
version "1.0.1"
resolved "https://registry.yarnpkg.com/easy-stack/-/easy-stack-1.0.1.tgz#8afe4264626988cabb11f3c704ccd0c835411066"
integrity sha512-wK2sCs4feiiJeFXn3zvY0p41mdU5VUgbgs1rNsc/y5ngFUijdWd+iIN8eoyuZHKB8xN6BL4PdWmzqFmxNg6V2w==
event-pubsub@4.3.0:
version "4.3.0"
resolved "https://registry.yarnpkg.com/event-pubsub/-/event-pubsub-4.3.0.tgz#f68d816bc29f1ec02c539dc58c8dd40ce72cb36e"
integrity sha512-z7IyloorXvKbFx9Bpie2+vMJKKx1fH1EN5yiTfp8CiLOTptSYy1g8H4yDpGlEdshL1PBiFtBHepF2cNsqeEeFQ==
fs.realpath@^1.0.0: fs.realpath@^1.0.0:
version "1.0.0" version "1.0.0"
resolved "https://registry.yarnpkg.com/fs.realpath/-/fs.realpath-1.0.0.tgz#1504ad2523158caa40db4a2787cb01411994ea4f" resolved "https://registry.yarnpkg.com/fs.realpath/-/fs.realpath-1.0.0.tgz#1504ad2523158caa40db4a2787cb01411994ea4f"
@ -55,6 +72,18 @@ inherits@2:
resolved "https://registry.yarnpkg.com/inherits/-/inherits-2.0.4.tgz#0fa2c64f932917c3433a0ded55363aae37416b7c" resolved "https://registry.yarnpkg.com/inherits/-/inherits-2.0.4.tgz#0fa2c64f932917c3433a0ded55363aae37416b7c"
integrity sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ== integrity sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==
js-message@1.0.7:
version "1.0.7"
resolved "https://registry.yarnpkg.com/js-message/-/js-message-1.0.7.tgz#fbddd053c7a47021871bb8b2c95397cc17c20e47"
integrity sha512-efJLHhLjIyKRewNS9EGZ4UpI8NguuL6fKkhRxVuMmrGV2xN/0APGdQYwLFky5w9naebSZ0OwAGp0G6/2Cg90rA==
js-queue@2.0.2:
version "2.0.2"
resolved "https://registry.yarnpkg.com/js-queue/-/js-queue-2.0.2.tgz#0be590338f903b36c73d33c31883a821412cd482"
integrity sha512-pbKLsbCfi7kriM3s1J4DDCo7jQkI58zPLHi0heXPzPlj0hjUsm+FesPUbE0DSbIVIK503A36aUBoCN7eMFedkA==
dependencies:
easy-stack "^1.0.1"
minimatch@^3.0.4: minimatch@^3.0.4:
version "3.0.4" version "3.0.4"
resolved "https://registry.yarnpkg.com/minimatch/-/minimatch-3.0.4.tgz#5166e286457f03306064be5497e8dbb0c3d32083" resolved "https://registry.yarnpkg.com/minimatch/-/minimatch-3.0.4.tgz#5166e286457f03306064be5497e8dbb0c3d32083"
@ -62,6 +91,15 @@ minimatch@^3.0.4:
dependencies: dependencies:
brace-expansion "^1.1.7" brace-expansion "^1.1.7"
node-ipc@^9.1.4:
version "9.1.4"
resolved "https://registry.yarnpkg.com/node-ipc/-/node-ipc-9.1.4.tgz#2acf962681afdac2602876d98fe6434d54d9bd3c"
integrity sha512-A+f0mn2KxUt1uRTSd5ktxQUsn2OEhj5evo7NUi/powBzMSZ0vocdzDjlq9QN2v3LH6CJi3e5xAenpZ1QwU5A8g==
dependencies:
event-pubsub "4.3.0"
js-message "1.0.7"
js-queue "2.0.2"
once@^1.3.0: once@^1.3.0:
version "1.4.0" version "1.4.0"
resolved "https://registry.yarnpkg.com/once/-/once-1.4.0.tgz#583b1aa775961d4b113ac17d9c50baef9dd76bd1" resolved "https://registry.yarnpkg.com/once/-/once-1.4.0.tgz#583b1aa775961d4b113ac17d9c50baef9dd76bd1"


Loading…
Cancel
Save