Browse Source

Progress on IPC rewrite

new-arch
David Ludwig 4 years ago
parent
commit
821ef2f07f
18 changed files with 179 additions and 417 deletions
  1. +11
    -0
      packages/ipc/jest.config.ts
  2. +7
    -4
      packages/ipc/package.json
  3. +8
    -0
      packages/ipc/src/AbstractClient.ts
  4. +43
    -0
      packages/ipc/src/AbstractConnection.ts
  5. +0
    -67
      packages/ipc/src/AbstractIpcService.ts
  6. +16
    -0
      packages/ipc/src/AbstractMethodMap.ts
  7. +11
    -0
      packages/ipc/src/AbstractServer.ts
  8. +0
    -194
      packages/ipc/src/IpcClientService.ts
  9. +0
    -39
      packages/ipc/src/IpcError.ts
  10. +0
    -93
      packages/ipc/src/IpcServerService.ts
  11. +27
    -0
      packages/ipc/src/Request.ts
  12. +1
    -4
      packages/ipc/src/index.ts
  13. +8
    -16
      packages/ipc/src/schema.ts
  14. +0
    -0
      packages/ipc/test/unit/AbstractClient.spec.ts
  15. +19
    -0
      packages/ipc/test/unit/AbstractConnection.spec.ts
  16. +23
    -0
      packages/ipc/test/unit/AbstractMethodMap.ts
  17. +0
    -0
      packages/ipc/test/unit/AbstractServer.spec.ts
  18. +5
    -0
      packages/ipc/test/unit/Request.spec.ts

+ 11
- 0
packages/ipc/jest.config.ts View File

@ -0,0 +1,11 @@
import base from "../../jest.config";
export default {
...base,
name: "@autoplex/microservice",
displayName: "Package: Micoservice",
moduleNameMapper: {
'^@src/(.*)$': '<rootDir>/src/$1',
'^@test@/(.*)$': '<rootDir>/test/$1'
}
}

+ 7
- 4
packages/ipc/package.json View File

@ -1,18 +1,21 @@
{
"name": "@autoplex/ipc",
"version": "0.0.0",
"version": "0.1.0",
"main": "dist/lib/index.js",
"types": "dist/typings/index.d.ts",
"license": "MIT",
"scripts": {
"build": "yarn run clean && tsc",
"clean": "rimraf ./dist"
"build": "yarn run clean && ttsc",
"clean": "rimraf ./coverage ./dist",
"coverage": "yarn test --coverage",
"test": "jest --silent",
"test:verbose": "jest"
},
"devDependencies": {
"@types/node-ipc": "^9.1.3"
},
"dependencies": {
"@autoplex/microservice": "^0.0.0",
"@autoplex/microservice": "^0.1.0",
"node-ipc": "^9.1.4"
}
}

+ 8
- 0
packages/ipc/src/AbstractClient.ts View File

@ -0,0 +1,8 @@
import { AbstractMethodMap } from "./AbstractMethodMap";
import { IMethodMap } from "./schema";
export abstract class AbstractClient<T extends IMethodMap> extends AbstractMethodMap<T>
{
}

+ 43
- 0
packages/ipc/src/AbstractConnection.ts View File

@ -0,0 +1,43 @@
import EventEmitter from "events";
import { IMethodMap, IPacket } from "./schema";
export abstract class AbstractConnection<T extends IMethodMap> extends EventEmitter
{
/**
* Write the message
*/
protected abstract write(packet: IPacket): void;
// ---------------------------------------------------------------------------------------------
/**
* Encode the packet to send
*/
protected serializePacket(packet: any) {
return packet;
}
/**
* Deserialize the received packet
*/
protected deserializePacket(packet: string) {
return packet;
}
/**
* Send a message
*/
public send<K extends keyof T>(method: K, ...args: Parameters<T[K]>) {
this.write({
method: method.toString(),
args: args.length > 0 ? args : undefined
})
}
/**
* Send a request and return a promise of the result
*/
public request<K extends keyof T>(method: K, ...args: Parameters<T[K]>) {
return new Promise<>;
}
}

+ 0
- 67
packages/ipc/src/AbstractIpcService.ts View File

@ -1,67 +0,0 @@
import RawIPC = require("node-ipc");
import { InternalService, Microservice } from "@autoplex/microservice";
import { IPC, IpcMessageHandler } from "./schema";
/**
* An abstract IPC service containing common properties/methods among the server and client
*/
export default abstract class AbstractIpcService<M extends Microservice = Microservice> extends InternalService<M>
{
/**
* The IPC instance
*/
private __ipc: IPC|null = null;
// Implementation Requirements -----------------------------------------------------------------
/**
* The path to the socket file
*/
protected abstract readonly SOCKET_PATH: string;
/**
* Add a message handler for the service
*/
protected abstract addMessageHandler(method: string, handle: IpcMessageHandler): void;
/**
* Boot the IPC service after configuration is complete
*/
protected abstract bootIpc(ipc: IPC): Promise<void>;
/**
* Shutdown the IPC service before it is destroyed
*/
protected abstract shutdownIpc(ipc: IPC|null): Promise<void>;
// Service Management --------------------------------------------------------------------------
/**
* Boot the IPC service
*/
public override async boot() {
// Create the IPC socket
this.__ipc = new RawIPC.IPC();
this.__ipc.config.id = this.NAME;
this.__ipc.config.retry = 1500;
this.__ipc.config.silent = true;
await this.bootIpc(this.__ipc);
}
/**
* Shutdown the IPC service
*/
public override async shutdown() {
await this.shutdownIpc(this.__ipc);
this.__ipc = null;
}
// Accessors -----------------------------------------------------------------------------------
/**
* Get the raw IPC instance
*/
protected get rawIpcInstance(): IPC|null {
return this.__ipc;
}
}

+ 16
- 0
packages/ipc/src/AbstractMethodMap.ts View File

@ -0,0 +1,16 @@
import { IMethodMap } from "./schema";
export abstract class AbstractMethodMap<T extends IMethodMap>
{
/**
* The method mapping
*/
protected abstract methodMap: T;
/**
* Invoke a mapped method
*/
protected invoke<K extends keyof T>(name: K, args: Parameters<T[K]>) {
return this.methodMap[name].apply(this, args);
}
}

+ 11
- 0
packages/ipc/src/AbstractServer.ts View File

@ -0,0 +1,11 @@
import { AbstractConnection } from "./AbstractConnection";
import { AbstractMethodMap } from "./AbstractMethodMap";
import { IMethodMap } from "./schema";
type PrependConnection<T extends IMethodMap> = {
[K in keyof T]: (connection: string, ...args: Parameters<T[K]>) => ReturnType<T[K]>
};
export abstract class AbstractServer<T extends IMethodMap, C extends AbstractConnection<any>> extends AbstractMethodMap<PrependConnection<T>>
{
}

+ 0
- 194
packages/ipc/src/IpcClientService.ts View File

@ -1,194 +0,0 @@
import { Socket } from "net";
import assert from "assert";
import { Microservice } from "@autoplex/microservice";
import AbstractIpcService from "./AbstractIpcService";
import { IIpcResponse, IIpcRequest, IPC } from "./schema";
import { IpcConnectionError, IpcError, IpcResponseError, IpcTimeoutError } from "./IpcError";
export abstract class IpcClientService<M extends Microservice = Microservice> extends AbstractIpcService<M>
{
/**
* Indicate if there is an active connection to the IPC
*/
private __isConnected: boolean = false;
/**
* The most recent request ID
*/
private __requestId!: number;
/**
* The active IPC socket
*/
private __socket: Socket|null = null;
// Service Implementation ----------------------------------------------------------------------
/**
* Install the event handlers for receiving on the IPC socket
*
* Example: this.addMessageHandler("some_event", this.onSomeEvent)
*/
protected installMessageHandlers() {
// no-op
}
// Service Management --------------------------------------------------------------------------
/**
* Boot the IPC client service
*/
public bootIpc(ipc: IPC) {
// Connect to the server
return new Promise<void>((resolve, _) => {
this.rawIpcInstance!.connectTo(this.NAME, this.SOCKET_PATH, () => {
this.__isConnected = false;
this.__requestId = 0;
this.__socket = <Socket>ipc!.of[this.NAME];
this.installEventHandlers(this.__socket!);
this.installMessageHandlers();
this.__socket!.once("connect", resolve);
});
});
}
/**
* Install the event handlers for the IPC socket
*/
protected installEventHandlers(socket: Socket) {
socket.on("connect", () => this.onConnect());
socket.on("error", (error: any) => this.onError(error));
socket.on("disconnect", () => this.onDisconnect());
socket.on("destroy", () => this.onDestroy());
}
/**
* Add a handler for an event broadcasted by the server
*/
protected addMessageHandler(method: string, handle: (...args: any[]) => Promise<any>) {
assert(this.__socket !== null, "Attempted to add events to null socket");
this.__socket.on(method, async (data: any) => handle.apply(this, [data]));
}
/**
* Shutdown the IPC service
*/
public async shutdownIpc(ipc: IPC|null) {
ipc?.disconnect(this.NAME);
this.__socket?.removeAllListeners();
this.__socket?.destroy();
this.__socket = null;
}
// Socket Event Handlers -----------------------------------------------------------------------
/**
* Invoked when the client established a connection to an IPC server
*/
protected onConnect() {
this.log("IPC: Connection established");
this.__isConnected = true;
}
/**
* Invoked when an IPC error occurs
*/
protected onError(error: string | Error) {
if (this.__isConnected) {
this.log("IPC: Error occurred:", error);
}
}
/**
* Invoked when disconnected from an IPC server
*/
protected onDisconnect() {
if (this.__isConnected) {
this.log("IPC: Disconnected");
}
this.__isConnected = false;
}
/**
* Invoked when the IPC socket has been destroyed
*/
protected onDestroy() {
this.log("IPC: Destroyed");
this.__isConnected = false;
}
// Methods -------------------------------------------------------------------------------------
/**
* Perform a general request and wait for a response
*/
protected async request<T = any>(method: string, data?: any, timeout: number|null = null) {
return new Promise<T>((resolve, reject) => {
// If the client is not connected to a server, reject immediately
if (!this.__isConnected || this.__socket === null) {
reject(new IpcConnectionError("Not connected"));
return;
}
// Clean up event listeners
let cleanUp = () => {
if (responseTimeout !== null) {
clearTimeout(responseTimeout);
}
if (this.__socket === null) {
return;
}
this.__socket.off(responseMethod, respond);
this.__socket.off("disconnect", respond);
this.__socket.off("destroy", respond);
};
// Handle the response
let respond = (response: IIpcResponse<T>) => {
cleanUp();
if (response.error !== undefined) {
reject(new IpcResponseError<T>(response.error));
return;
}
resolve(response.data);
};
// Abort the request
let abort = (error: IpcError) => {
cleanUp();
reject(error);
};
// Fetch a request ID and declare a timeout
const requestId = this.__requestId++;
const responseMethod = `method_response_${requestId}`;
// Include timeout mechanism in the off chance something breaks
let responseTimeout: NodeJS.Timeout|null = null;
if (timeout !== null) {
responseTimeout = setTimeout(() => abort(
new IpcTimeoutError("Timeout")
), timeout);
}
this.__socket.once("disconnect", () => abort(new IpcConnectionError("Disconnected")));
this.__socket.once("destroy", () => abort(new IpcConnectionError("Destroyed")));
this.__socket.once(responseMethod, respond);
this.__socket.emit(method, <IIpcRequest>{ id: requestId, data });
});
}
/**
* Send a message over IPC without waiting for a response
*/
protected send(method: string, data?: any) {
if (!this.__isConnected || this.__socket === null) {
throw new IpcConnectionError("Not connected");
}
this.__socket.emit(method, <IIpcRequest>{ id: null, data });
}
// Accessors -----------------------------------------------------------------------------------
/**
* Get the connection status of the IPC connection
*/
public get isConnected() {
return this.__isConnected;
}
}

+ 0
- 39
packages/ipc/src/IpcError.ts View File

@ -1,39 +0,0 @@
/**
* Generic IPC Error type
*/
export class IpcError extends Error {
constructor(...args: any[]) {
super(...args);
Object.setPrototypeOf(this, IpcError.prototype);
}
}
/**
* IPC connection error type
*/
export class IpcConnectionError extends IpcError {
constructor(...args: any[]) {
super(...args);
Object.setPrototypeOf(this, IpcConnectionError.prototype);
}
}
/**
* IPC timeout error type
*/
export class IpcTimeoutError extends IpcError {
constructor(...args: any[]) {
super(...args);
Object.setPrototypeOf(this, IpcTimeoutError.prototype);
}
}
/**
* IPC response error type
*/
export class IpcResponseError<T> extends IpcError {
constructor(...args: any[]) {
super(...args);
Object.setPrototypeOf(this, IpcResponseError.prototype);
}
}

+ 0
- 93
packages/ipc/src/IpcServerService.ts View File

@ -1,93 +0,0 @@
import assert from "assert";
import { mkdir } from "fs/promises";
import { Socket } from "net";
import { dirname } from "path";
import { Microservice } from "@autoplex/microservice";
import { IIpcRequest, IPC } from "./schema";
import AbstractIpcService from "./AbstractIpcService";
type IpcServer = IPC["server"];
export abstract class IpcServerService<M extends Microservice = Microservice> extends AbstractIpcService<M>
{
/**
* The IPC server instance
*/
private __server!: IpcServer|null;
// Service Implementation ----------------------------------------------------------------------
/**
* Install the event handlers for receiving on the IPC socket
*
* Example: this.addMessageHandler("some_event", this.onSomeEvent)
*/
protected installMessageHandlers() {
// no-op
}
// Service Management --------------------------------------------------------------------------
/**
* Boot the IPC service
*/
public bootIpc(ipc: IPC) {
return new Promise<void>(async (resolve) => {
// Create the socket directory if it doesn't exist
await mkdir(dirname(this.SOCKET_PATH), { recursive: true });
// Serve the IPC server
ipc.serve(this.SOCKET_PATH, () => {
this.__server = ipc.server;
this.installMessageHandlers();
resolve();
});
ipc.server.start();
});
}
/**
* Add a message/request handler for the server
*/
protected addMessageHandler(method: string, handle: (...args: any[]) => Promise<any>) {
assert(this.__server !== null, "Attempted to add events to null server");
this.__server.on(method, async (request: IIpcRequest, socket: Socket) => {
let handlerPromise = handle.apply(this, [request.data]);
if (request.id === null) {
handlerPromise.catch(error => this.log("Error:", method, error, request));
return;
}
const responseMethod = `method_response_${request.id}`;
try {
this.__server!.emit(socket, responseMethod, { data: await handlerPromise });
} catch(error) {
this.log(this.log("Error:", method, error, request));
this.__server!.emit(socket, responseMethod, { error });
}
});
}
/**
* Shutdown the IPC service
*/
public async shutdownIpc(ipc: IPC|null) {
this.__server?.stop();
this.__server = null;
for (let socket of <Socket[]>Object.values(ipc?.of ?? [])) {
socket.destroy();
}
}
// Methods -------------------------------------------------------------------------------------
/**
* Broadcast a message to all connected clients
*/
public broadcast(method: string, data?: any) {
if (this.__server === null) {
return;
}
for (let socket of <Socket[]>(<any>this.__server).sockets) {
this.__server.emit(socket, method, data);
}
}
}

+ 27
- 0
packages/ipc/src/Request.ts View File

@ -0,0 +1,27 @@
export class Request<T = unknown> implements PromiseLike<T>
{
public constructor(requestId: number, method: string, args: any[], timeout?: number) {
}
/**
* Abort the current request
*/
public abort() {
}
/**
* Successful completion of the request
*/
public then() {
}
/**
* Caught error in the request
*/
public catch() {
}
}

+ 1
- 4
packages/ipc/src/index.ts View File

@ -1,4 +1 @@
export * from "./schema";
export * from "./IpcError";
export * from "./IpcClientService";
export * from "./IpcServerService";
export * from "./AbstractConnection";

+ 8
- 16
packages/ipc/src/schema.ts View File

@ -1,27 +1,19 @@
import type RawIPC = require("node-ipc");
import RawIPC from "node-ipc";
/**
* The IPC request structure
* A generic function/method mapping
*/
export interface IIpcRequest {
id : number|null,
data?: any
export interface IMethodMap {
[name: string]: (...args: any[]) => any
}
/**
* The IPC response structure
*/
export interface IIpcResponse<T> {
data : T,
error?: string
export interface IPacket {
requestId?: number,
method: string,
args?: any[]
}
/**
* The IPC message handler type
*/
export type IpcMessageHandler = (...args: any[]) => Promise<any>
/**
* HOLY @#$@% WHOEVER MADE THE TYPES FOR `node-ipc` SHOULDB BE HANGED
*/
export type IPC = InstanceType<(typeof RawIPC)["IPC"]>;

+ 0
- 0
packages/ipc/test/unit/AbstractClient.spec.ts View File


+ 19
- 0
packages/ipc/test/unit/AbstractConnection.spec.ts View File

@ -0,0 +1,19 @@
import { AbstractConnection } from "../../src";
type MethodMap = {
sendTest(value: number): void
};
class MockConnection extends AbstractConnection<MethodMap> {
public write = jest.fn();
}
describe("Abstract IPC Connections", () => {
it("Should write packet", () => {
let connection = new MockConnection();
connection.send("sendTest", 10);
// expect(connection.write.mock.calls[0][0]).toMatchObject({
// });
});
});

+ 23
- 0
packages/ipc/test/unit/AbstractMethodMap.ts View File

@ -0,0 +1,23 @@
import { AbstractMethodMap } from "../../src/AbstractMethodMap";
class TestMap extends AbstractMethodMap<{ doSomething(a: number, b: number): number }> {
public mockFn = jest.fn();
public methodMap = {
doSomething: this.doSomething
};
public doSomething(a: number, b: number) {
this.mockFn(); // Ensure the correct 'this' context
return a + b;
}
public publicInvoke() {
this.invoke("doSomething", [5, 10]);
}
}
describe("Abstract Method Map", () => {
it("Invoke mapped method", () => {
let map = new TestMap();
map.publicInvoke();
expect(map.mockFn).toHaveBeenCalled()
});
});

+ 0
- 0
packages/ipc/test/unit/AbstractServer.spec.ts View File


+ 5
- 0
packages/ipc/test/unit/Request.spec.ts View File

@ -0,0 +1,5 @@
describe("Request", () => {
it("", () => {
});
});

Loading…
Cancel
Save