From 446e000d8fef73d9faf9d1fc637c023276359625 Mon Sep 17 00:00:00 2001 From: David Ludwig Date: Sat, 24 Jul 2021 06:02:11 -0500 Subject: [PATCH] IPC abstract client/server implementation ready for use I think --- packages/ipc/package.json | 1 + packages/ipc/src/AbstractClient.ts | 35 ++++- packages/ipc/src/AbstractConnection.ts | 132 ++++++++++++++++-- packages/ipc/src/AbstractMethodMap.ts | 8 +- packages/ipc/src/AbstractServer.ts | 58 +++++++- packages/ipc/src/Request.ts | 27 ---- packages/ipc/src/errors.ts | 14 ++ packages/ipc/src/index.ts | 4 + packages/ipc/src/schema.ts | 8 +- packages/ipc/test/unit/AbstractClient.spec.ts | 0 .../ipc/test/unit/AbstractConnection.spec.ts | 40 +++++- ...MethodMap.ts => AbstractMethodMap.spec.ts} | 4 +- packages/ipc/test/unit/AbstractServer.spec.ts | 0 packages/ipc/test/unit/Request.spec.ts | 5 - packages/ipc/yarn.lock | 23 +-- 15 files changed, 290 insertions(+), 69 deletions(-) delete mode 100644 packages/ipc/src/Request.ts create mode 100644 packages/ipc/src/errors.ts delete mode 100644 packages/ipc/test/unit/AbstractClient.spec.ts rename packages/ipc/test/unit/{AbstractMethodMap.ts => AbstractMethodMap.spec.ts} (85%) delete mode 100644 packages/ipc/test/unit/AbstractServer.spec.ts delete mode 100644 packages/ipc/test/unit/Request.spec.ts diff --git a/packages/ipc/package.json b/packages/ipc/package.json index 9b63789..a1c0140 100644 --- a/packages/ipc/package.json +++ b/packages/ipc/package.json @@ -16,6 +16,7 @@ }, "dependencies": { "@autoplex/microservice": "^0.1.0", + "cancelable-promise": "^4.2.1", "node-ipc": "^9.1.4" } } diff --git a/packages/ipc/src/AbstractClient.ts b/packages/ipc/src/AbstractClient.ts index 8a47d5f..859c986 100644 --- a/packages/ipc/src/AbstractClient.ts +++ b/packages/ipc/src/AbstractClient.ts @@ -1,8 +1,39 @@ +import { AbstractConnection } from "./AbstractConnection"; import { AbstractMethodMap } from "./AbstractMethodMap"; import { IMethodMap } from "./schema"; - -export abstract class AbstractClient extends AbstractMethodMap +export abstract class AbstractClient< + LocalMethods extends IMethodMap, + RemoteMethods extends IMethodMap, + Connection extends AbstractConnection +> extends AbstractMethodMap { + /** + * The client connection instance + */ + protected connection?: Connection; + + /** + * Connect to the server + */ + public abstract connect(): void; + + /** + * Disconnect from the server + */ + public abstract disconnect(): void; + + /** + * Invoked when a connection is established + */ + protected onConnect() { + // no-op + } + /** + * Invoked when a connection is dropped + */ + protected onDisconnect() { + // no-op + } } diff --git a/packages/ipc/src/AbstractConnection.ts b/packages/ipc/src/AbstractConnection.ts index 175c16f..c206de4 100644 --- a/packages/ipc/src/AbstractConnection.ts +++ b/packages/ipc/src/AbstractConnection.ts @@ -1,8 +1,35 @@ import EventEmitter from "events"; +import { AbstractMethodMap } from "./AbstractMethodMap"; +import { RequestTimeoutError } from "./errors"; import { IMethodMap, IPacket } from "./schema"; -export abstract class AbstractConnection extends EventEmitter +/** + * Declare EventEmitter types + */ +interface Events { + "disconnect": () => void +} + +/** + * Torrent IPC events + */ +export declare interface AbstractConnection { + on(event: U, listener: Events[U]): this, + emit(event: U, ...args: Parameters): boolean +} + +export abstract class AbstractConnection extends EventEmitter { + /** + * Disconnect from the server + */ + protected abstract disconnect(): void; + + /** + * Read the received packet + */ + protected abstract read(rawPacket: any): IPacket; + /** * Write the message */ @@ -11,33 +38,114 @@ export abstract class AbstractConnection extends EventEmit // --------------------------------------------------------------------------------------------- /** - * Encode the packet to send + * Store a reference to the request handler + */ + #requestHandler: AbstractMethodMap; + + /** + * The current request ID + */ + #requestId: number = 0; + + /** + * Store a map of the requests + */ + #pendingRequests: Map void> = new Map(); + + /** + * Create a new connection + */ + public constructor(requestHandler: AbstractMethodMap) { + super(); + this.#requestHandler = requestHandler; + } + + /** + * Get the next request ID + */ + #nextRequestId() { + this.#requestId = (this.#requestId + 1) % Number.MAX_SAFE_INTEGER; + return this.#requestId; + } + + /** + * Receive and handle the packet */ - protected serializePacket(packet: any) { - return packet; + protected receive(rawPacket: any) { + let packet = this.read(rawPacket); + if (packet.method !== undefined) { + this.fulfillRequest(packet.method, packet.args, packet.requestId); + return; + } + if (packet.requestId !== undefined) { + this.fulfillResponse(packet.requestId, packet.result); + return; + } } /** - * Deserialize the received packet + * Handle the received request + */ + protected async fulfillRequest(method: string, args?: any[], requestId?: number) { + let result = await this.#requestHandler.invoke(method, args ?? []); + if (requestId !== undefined) { + this.write({ requestId, result }); + } + } + + /** + * Fulfill a response + */ + protected fulfillResponse(requestId: number, result?: any) { + let resolve = this.#pendingRequests.get(requestId); + if (resolve === undefined) { + return; + } + } + + // Public Interface ---------------------------------------------------------------------------- + + /** + * Close the current connection */ - protected deserializePacket(packet: string) { - return packet; + public close() { + this.disconnect(); + this.emit("disconnect"); } /** * Send a message */ - public send(method: K, ...args: Parameters) { + public send(method: K, args: Parameters) { this.write({ method: method.toString(), args: args.length > 0 ? args : undefined - }) + }); } /** - * Send a request and return a promise of the result + * Create a promise to send a request */ - public request(method: K, ...args: Parameters) { - return new Promise<>; + public request(method: K, args: Parameters, timeout: number|null = 5000) { + return new Promise>((resolve, reject) => { + let timeoutId: NodeJS.Timeout; + const requestId = this.#nextRequestId(); + const cleanup = () => { + clearTimeout(timeoutId); + this.#pendingRequests.delete(requestId); + }; + const fulfill = (result: ReturnType) => { + cleanup(); + resolve(result); + }; + if (timeout !== null) { + timeoutId = setTimeout(() => { + cleanup(); + reject(new RequestTimeoutError()); + }, timeout); + } + this.#pendingRequests.set(requestId, fulfill); + this.send(method, args); + }); } } diff --git a/packages/ipc/src/AbstractMethodMap.ts b/packages/ipc/src/AbstractMethodMap.ts index 04853a4..ade598b 100644 --- a/packages/ipc/src/AbstractMethodMap.ts +++ b/packages/ipc/src/AbstractMethodMap.ts @@ -1,6 +1,7 @@ +import { MethodNotFoundError } from "./errors"; import { IMethodMap } from "./schema"; -export abstract class AbstractMethodMap +export abstract class AbstractMethodMap { /** * The method mapping @@ -10,7 +11,10 @@ export abstract class AbstractMethodMap /** * Invoke a mapped method */ - protected invoke(name: K, args: Parameters) { + public invoke(name: K, args: Parameters) { + if (this.methodMap[name] === undefined) { + throw new MethodNotFoundError(name); + } return this.methodMap[name].apply(this, args); } } diff --git a/packages/ipc/src/AbstractServer.ts b/packages/ipc/src/AbstractServer.ts index 54cb122..77efe37 100644 --- a/packages/ipc/src/AbstractServer.ts +++ b/packages/ipc/src/AbstractServer.ts @@ -6,6 +6,62 @@ type PrependConnection = { [K in keyof T]: (connection: string, ...args: Parameters) => ReturnType }; -export abstract class AbstractServer> extends AbstractMethodMap> +export abstract class AbstractServer< + Connection extends AbstractConnection, + LocalMethods extends IMethodMap, + RemoteMethods extends IMethodMap +> extends AbstractMethodMap> { + /** + * The list of active connections + */ + protected connections: Connection[] = []; + + /** + * Start accepting new connections + */ + public abstract start(): void; + + /** + * Stop accepting new connections + */ + public abstract stop(): void; + + /** + * Close all established connections + */ + public close() { + for (let connection of this.connections) { + connection.close(); + } + this.connections = []; + } + + /** + * Invoked when a client connects to the server + */ + public acceptConnection(connection: Connection) { + connection.on("disconnect", () => this.onDisconnect(connection)); + this.connections.push(connection); + } + + /** + * Invoked when the client disconnects from the server + */ + protected onDisconnect(connection: Connection) { + let index: number; + while (-1 !== (index = this.connections.indexOf(connection))) { + this.connections[index].removeAllListeners(); + this.connections.splice(index, 1); + } + } + + /** + * Broadcast a message to all clients + */ + public broadcast(method: K, args: Parameters) { + for (let connection of this.connections) { + connection.send(method, args); + } + } } diff --git a/packages/ipc/src/Request.ts b/packages/ipc/src/Request.ts deleted file mode 100644 index 465440b..0000000 --- a/packages/ipc/src/Request.ts +++ /dev/null @@ -1,27 +0,0 @@ -export class Request implements PromiseLike -{ - public constructor(requestId: number, method: string, args: any[], timeout?: number) { - - } - - /** - * Abort the current request - */ - public abort() { - - } - - /** - * Successful completion of the request - */ - public then() { - - } - - /** - * Caught error in the request - */ - public catch() { - - } -} diff --git a/packages/ipc/src/errors.ts b/packages/ipc/src/errors.ts new file mode 100644 index 0000000..dfe7998 --- /dev/null +++ b/packages/ipc/src/errors.ts @@ -0,0 +1,14 @@ +export class MethodNotFoundError extends Error { + public constructor(method: string|number|symbol) { + super(`Attempted to invoke undefined method: '${method.toString()}'`); + Object.setPrototypeOf(this, MethodNotFoundError.prototype); + } +} + + +export class RequestTimeoutError extends Error { + public constructor() { + super("timeout"); + Object.setPrototypeOf(this, RequestTimeoutError.prototype); + } +} diff --git a/packages/ipc/src/index.ts b/packages/ipc/src/index.ts index e678560..627f868 100644 --- a/packages/ipc/src/index.ts +++ b/packages/ipc/src/index.ts @@ -1 +1,5 @@ export * from "./AbstractConnection"; +export * from "./AbstractClient"; +export * from "./AbstractMethodMap"; +export * from "./errors"; +export * from "./schema"; diff --git a/packages/ipc/src/schema.ts b/packages/ipc/src/schema.ts index e558c57..740a875 100644 --- a/packages/ipc/src/schema.ts +++ b/packages/ipc/src/schema.ts @@ -7,10 +7,14 @@ export interface IMethodMap { [name: string]: (...args: any[]) => any } +/** + * The packet structure + */ export interface IPacket { requestId?: number, - method: string, - args?: any[] + method? : string, + args? : any[], + result? : any } /** diff --git a/packages/ipc/test/unit/AbstractClient.spec.ts b/packages/ipc/test/unit/AbstractClient.spec.ts deleted file mode 100644 index e69de29..0000000 diff --git a/packages/ipc/test/unit/AbstractConnection.spec.ts b/packages/ipc/test/unit/AbstractConnection.spec.ts index 927adf9..696a348 100644 --- a/packages/ipc/test/unit/AbstractConnection.spec.ts +++ b/packages/ipc/test/unit/AbstractConnection.spec.ts @@ -1,19 +1,45 @@ -import { AbstractConnection } from "../../src"; +import { AbstractConnection, AbstractMethodMap, IPacket } from "../../src"; type MethodMap = { sendTest(value: number): void }; +class MockRequestHandler extends AbstractMethodMap { + methodMap = {}; +} + class MockConnection extends AbstractConnection { + public disconnect = () => {}; + public read = (rawPacket: any) => rawPacket; public write = jest.fn(); } -describe("Abstract IPC Connections", () => { - it("Should write packet", () => { - let connection = new MockConnection(); - connection.send("sendTest", 10); - // expect(connection.write.mock.calls[0][0]).toMatchObject({ +class MockConnectionWithReply extends AbstractConnection { + public disconnect = () => {}; + public read = (rawPacket: any) => rawPacket; + public write(packet: IPacket) { + this.receive({ + requestId: packet.requestId, + result: 100 + }); + } +} - // }); +describe("Abstract IPC Connections", () => { + it("Should write complete packet", () => { + let connection = new MockConnection(new MockRequestHandler()); + connection.send("sendTest", [10]); + expect(connection.write).toBeCalledWith(expect.objectContaining({ + method: "sendTest", + args: [10] + })); + }); + it("Request packet should timeout", async () => { + let connection = new MockConnection(new MockRequestHandler()); + expect(connection.request("sendTest", [25])).rejects.toMatch("timeout"); + }); + it("Request should be resolved", async () => { + let connection = new MockConnectionWithReply(new MockRequestHandler()); + expect(connection.request("sendTest", [25])).resolves.toEqual(100); }); }); diff --git a/packages/ipc/test/unit/AbstractMethodMap.ts b/packages/ipc/test/unit/AbstractMethodMap.spec.ts similarity index 85% rename from packages/ipc/test/unit/AbstractMethodMap.ts rename to packages/ipc/test/unit/AbstractMethodMap.spec.ts index 74fdf47..e9bc88f 100644 --- a/packages/ipc/test/unit/AbstractMethodMap.ts +++ b/packages/ipc/test/unit/AbstractMethodMap.spec.ts @@ -1,4 +1,4 @@ -import { AbstractMethodMap } from "../../src/AbstractMethodMap"; +import { AbstractMethodMap } from "../../src"; class TestMap extends AbstractMethodMap<{ doSomething(a: number, b: number): number }> { public mockFn = jest.fn(); @@ -17,7 +17,7 @@ class TestMap extends AbstractMethodMap<{ doSomething(a: number, b: number): num describe("Abstract Method Map", () => { it("Invoke mapped method", () => { let map = new TestMap(); - map.publicInvoke(); + map.invoke("doSomething", [5, 10]); expect(map.mockFn).toHaveBeenCalled() }); }); diff --git a/packages/ipc/test/unit/AbstractServer.spec.ts b/packages/ipc/test/unit/AbstractServer.spec.ts deleted file mode 100644 index e69de29..0000000 diff --git a/packages/ipc/test/unit/Request.spec.ts b/packages/ipc/test/unit/Request.spec.ts deleted file mode 100644 index 3470e29..0000000 --- a/packages/ipc/test/unit/Request.spec.ts +++ /dev/null @@ -1,5 +0,0 @@ -describe("Request", () => { - it("", () => { - - }); -}); diff --git a/packages/ipc/yarn.lock b/packages/ipc/yarn.lock index 6866c47..e32e7b8 100644 --- a/packages/ipc/yarn.lock +++ b/packages/ipc/yarn.lock @@ -3,16 +3,21 @@ "@types/node-ipc@^9.1.3": - version "9.1.3" - resolved "https://registry.yarnpkg.com/@types/node-ipc/-/node-ipc-9.1.3.tgz#5381fbc910071083b28dd43225727877c108b361" - integrity sha512-ka7CPX9Dk2lwe4PxoZMLOwcQrtdcYe/7OKmH75fQbmt0jdKltWVkdGA81D5l55d0wNhkweHa3XmzFbt5C0ieOQ== + version "9.1.5" + resolved "https://registry.yarnpkg.com/@types/node-ipc/-/node-ipc-9.1.5.tgz#0f9dac03fe6c96b6ff379725faf20d8a97eb00fa" + integrity sha512-xxYUVj/Y8fNkxQlvndVeWlL99wAF4KwISsRy21RSAAT/SKyrh+X3/BZXHcM/ZJPNri9h1JWw58wDKT1zr2pXVw== dependencies: "@types/node" "*" "@types/node@*": - version "15.0.1" - resolved "https://registry.yarnpkg.com/@types/node/-/node-15.0.1.tgz#ef34dea0881028d11398be5bf4e856743e3dc35a" - integrity sha512-TMkXt0Ck1y0KKsGr9gJtWGjttxlZnnvDtphxUOSd0bfaR6Q1jle+sPvrzNR1urqYTWMinoKvjKfXUGsumaO1PA== + version "16.3.2" + resolved "https://registry.yarnpkg.com/@types/node/-/node-16.3.2.tgz#655432817f83b51ac869c2d51dd8305fb8342e16" + integrity sha512-jJs9ErFLP403I+hMLGnqDRWT0RYKSvArxuBVh2veudHV7ifEC1WAmjJADacZ7mRbA2nWgHtn8xyECMAot0SkAw== + +cancelable-promise@^4.2.1: + version "4.2.1" + resolved "https://registry.yarnpkg.com/cancelable-promise/-/cancelable-promise-4.2.1.tgz#b02f79c5dde2704acfff1bc1ac2b4090f55541fe" + integrity sha512-PJZ/000ocWhPZQBAuNewAOMA2WEkJ8RhXI6AxeGLiGdW8EYDmumzo9wKyNgjDgxc1q/HbXuTdlcI+wXrOe/jMw== easy-stack@^1.0.1: version "1.0.1" @@ -37,9 +42,9 @@ js-queue@2.0.2: easy-stack "^1.0.1" node-ipc@^9.1.4: - version "9.1.4" - resolved "https://registry.yarnpkg.com/node-ipc/-/node-ipc-9.1.4.tgz#2acf962681afdac2602876d98fe6434d54d9bd3c" - integrity sha512-A+f0mn2KxUt1uRTSd5ktxQUsn2OEhj5evo7NUi/powBzMSZ0vocdzDjlq9QN2v3LH6CJi3e5xAenpZ1QwU5A8g== + version "9.2.1" + resolved "https://registry.yarnpkg.com/node-ipc/-/node-ipc-9.2.1.tgz#b32f66115f9d6ce841dc4ec2009d6a733f98bb6b" + integrity sha512-mJzaM6O3xHf9VT8BULvJSbdVbmHUKRNOH7zDDkCrA1/T+CVjq2WVIDfLt0azZRXpgArJtl3rtmEozrbXPZ9GaQ== dependencies: event-pubsub "4.3.0" js-message "1.0.7"