Browse Source

IPC abstract client/server implementation ready for use I think

new-arch
David Ludwig 4 years ago
parent
commit
446e000d8f
15 changed files with 290 additions and 69 deletions
  1. +1
    -0
      packages/ipc/package.json
  2. +33
    -2
      packages/ipc/src/AbstractClient.ts
  3. +120
    -12
      packages/ipc/src/AbstractConnection.ts
  4. +6
    -2
      packages/ipc/src/AbstractMethodMap.ts
  5. +57
    -1
      packages/ipc/src/AbstractServer.ts
  6. +0
    -27
      packages/ipc/src/Request.ts
  7. +14
    -0
      packages/ipc/src/errors.ts
  8. +4
    -0
      packages/ipc/src/index.ts
  9. +6
    -2
      packages/ipc/src/schema.ts
  10. +0
    -0
      packages/ipc/test/unit/AbstractClient.spec.ts
  11. +33
    -7
      packages/ipc/test/unit/AbstractConnection.spec.ts
  12. +2
    -2
      packages/ipc/test/unit/AbstractMethodMap.spec.ts
  13. +0
    -0
      packages/ipc/test/unit/AbstractServer.spec.ts
  14. +0
    -5
      packages/ipc/test/unit/Request.spec.ts
  15. +14
    -9
      packages/ipc/yarn.lock

+ 1
- 0
packages/ipc/package.json View File

@ -16,6 +16,7 @@
},
"dependencies": {
"@autoplex/microservice": "^0.1.0",
"cancelable-promise": "^4.2.1",
"node-ipc": "^9.1.4"
}
}

+ 33
- 2
packages/ipc/src/AbstractClient.ts View File

@ -1,8 +1,39 @@
import { AbstractConnection } from "./AbstractConnection";
import { AbstractMethodMap } from "./AbstractMethodMap";
import { IMethodMap } from "./schema";
export abstract class AbstractClient<T extends IMethodMap> extends AbstractMethodMap<T>
export abstract class AbstractClient<
LocalMethods extends IMethodMap,
RemoteMethods extends IMethodMap,
Connection extends AbstractConnection<RemoteMethods>
> extends AbstractMethodMap<LocalMethods>
{
/**
* The client connection instance
*/
protected connection?: Connection;
/**
* Connect to the server
*/
public abstract connect(): void;
/**
* Disconnect from the server
*/
public abstract disconnect(): void;
/**
* Invoked when a connection is established
*/
protected onConnect() {
// no-op
}
/**
* Invoked when a connection is dropped
*/
protected onDisconnect() {
// no-op
}
}

+ 120
- 12
packages/ipc/src/AbstractConnection.ts View File

@ -1,8 +1,35 @@
import EventEmitter from "events";
import { AbstractMethodMap } from "./AbstractMethodMap";
import { RequestTimeoutError } from "./errors";
import { IMethodMap, IPacket } from "./schema";
export abstract class AbstractConnection<T extends IMethodMap> extends EventEmitter
/**
* Declare EventEmitter types
*/
interface Events {
"disconnect": () => void
}
/**
* Torrent IPC events
*/
export declare interface AbstractConnection<RemoteMethods extends IMethodMap> {
on<U extends keyof Events>(event: U, listener: Events[U]): this,
emit<U extends keyof Events>(event: U, ...args: Parameters<Events[U]>): boolean
}
export abstract class AbstractConnection<RemoteMethods extends IMethodMap> extends EventEmitter
{
/**
* Disconnect from the server
*/
protected abstract disconnect(): void;
/**
* Read the received packet
*/
protected abstract read(rawPacket: any): IPacket;
/**
* Write the message
*/
@ -11,33 +38,114 @@ export abstract class AbstractConnection<T extends IMethodMap> extends EventEmit
// ---------------------------------------------------------------------------------------------
/**
* Encode the packet to send
* Store a reference to the request handler
*/
#requestHandler: AbstractMethodMap<any>;
/**
* The current request ID
*/
#requestId: number = 0;
/**
* Store a map of the requests
*/
#pendingRequests: Map<number, (result: any) => void> = new Map();
/**
* Create a new connection
*/
public constructor(requestHandler: AbstractMethodMap<any>) {
super();
this.#requestHandler = requestHandler;
}
/**
* Get the next request ID
*/
#nextRequestId() {
this.#requestId = (this.#requestId + 1) % Number.MAX_SAFE_INTEGER;
return this.#requestId;
}
/**
* Receive and handle the packet
*/
protected serializePacket(packet: any) {
return packet;
protected receive(rawPacket: any) {
let packet = this.read(rawPacket);
if (packet.method !== undefined) {
this.fulfillRequest(packet.method, packet.args, packet.requestId);
return;
}
if (packet.requestId !== undefined) {
this.fulfillResponse(packet.requestId, packet.result);
return;
}
}
/**
* Deserialize the received packet
* Handle the received request
*/
protected async fulfillRequest(method: string, args?: any[], requestId?: number) {
let result = await this.#requestHandler.invoke(method, args ?? []);
if (requestId !== undefined) {
this.write({ requestId, result });
}
}
/**
* Fulfill a response
*/
protected fulfillResponse(requestId: number, result?: any) {
let resolve = this.#pendingRequests.get(requestId);
if (resolve === undefined) {
return;
}
}
// Public Interface ----------------------------------------------------------------------------
/**
* Close the current connection
*/
protected deserializePacket(packet: string) {
return packet;
public close() {
this.disconnect();
this.emit("disconnect");
}
/**
* Send a message
*/
public send<K extends keyof T>(method: K, ...args: Parameters<T[K]>) {
public send<K extends keyof RemoteMethods>(method: K, args: Parameters<RemoteMethods[K]>) {
this.write({
method: method.toString(),
args: args.length > 0 ? args : undefined
})
});
}
/**
* Send a request and return a promise of the result
* Create a promise to send a request
*/
public request<K extends keyof T>(method: K, ...args: Parameters<T[K]>) {
return new Promise<>;
public request<K extends keyof RemoteMethods>(method: K, args: Parameters<RemoteMethods[K]>, timeout: number|null = 5000) {
return new Promise<ReturnType<RemoteMethods[K]>>((resolve, reject) => {
let timeoutId: NodeJS.Timeout;
const requestId = this.#nextRequestId();
const cleanup = () => {
clearTimeout(timeoutId);
this.#pendingRequests.delete(requestId);
};
const fulfill = (result: ReturnType<RemoteMethods[K]>) => {
cleanup();
resolve(result);
};
if (timeout !== null) {
timeoutId = setTimeout(() => {
cleanup();
reject(new RequestTimeoutError());
}, timeout);
}
this.#pendingRequests.set(requestId, fulfill);
this.send(method, args);
});
}
}

+ 6
- 2
packages/ipc/src/AbstractMethodMap.ts View File

@ -1,6 +1,7 @@
import { MethodNotFoundError } from "./errors";
import { IMethodMap } from "./schema";
export abstract class AbstractMethodMap<T extends IMethodMap>
export abstract class AbstractMethodMap<T extends IMethodMap = {}>
{
/**
* The method mapping
@ -10,7 +11,10 @@ export abstract class AbstractMethodMap<T extends IMethodMap>
/**
* Invoke a mapped method
*/
protected invoke<K extends keyof T>(name: K, args: Parameters<T[K]>) {
public invoke<K extends keyof T>(name: K, args: Parameters<T[K]>) {
if (this.methodMap[name] === undefined) {
throw new MethodNotFoundError(name);
}
return this.methodMap[name].apply(this, args);
}
}

+ 57
- 1
packages/ipc/src/AbstractServer.ts View File

@ -6,6 +6,62 @@ type PrependConnection<T extends IMethodMap> = {
[K in keyof T]: (connection: string, ...args: Parameters<T[K]>) => ReturnType<T[K]>
};
export abstract class AbstractServer<T extends IMethodMap, C extends AbstractConnection<any>> extends AbstractMethodMap<PrependConnection<T>>
export abstract class AbstractServer<
Connection extends AbstractConnection<RemoteMethods>,
LocalMethods extends IMethodMap,
RemoteMethods extends IMethodMap
> extends AbstractMethodMap<PrependConnection<LocalMethods>>
{
/**
* The list of active connections
*/
protected connections: Connection[] = [];
/**
* Start accepting new connections
*/
public abstract start(): void;
/**
* Stop accepting new connections
*/
public abstract stop(): void;
/**
* Close all established connections
*/
public close() {
for (let connection of this.connections) {
connection.close();
}
this.connections = [];
}
/**
* Invoked when a client connects to the server
*/
public acceptConnection(connection: Connection) {
connection.on("disconnect", () => this.onDisconnect(connection));
this.connections.push(connection);
}
/**
* Invoked when the client disconnects from the server
*/
protected onDisconnect(connection: Connection) {
let index: number;
while (-1 !== (index = this.connections.indexOf(connection))) {
this.connections[index].removeAllListeners();
this.connections.splice(index, 1);
}
}
/**
* Broadcast a message to all clients
*/
public broadcast<K extends keyof RemoteMethods>(method: K, args: Parameters<RemoteMethods[K]>) {
for (let connection of this.connections) {
connection.send(method, args);
}
}
}

+ 0
- 27
packages/ipc/src/Request.ts View File

@ -1,27 +0,0 @@
export class Request<T = unknown> implements PromiseLike<T>
{
public constructor(requestId: number, method: string, args: any[], timeout?: number) {
}
/**
* Abort the current request
*/
public abort() {
}
/**
* Successful completion of the request
*/
public then() {
}
/**
* Caught error in the request
*/
public catch() {
}
}

+ 14
- 0
packages/ipc/src/errors.ts View File

@ -0,0 +1,14 @@
export class MethodNotFoundError extends Error {
public constructor(method: string|number|symbol) {
super(`Attempted to invoke undefined method: '${method.toString()}'`);
Object.setPrototypeOf(this, MethodNotFoundError.prototype);
}
}
export class RequestTimeoutError extends Error {
public constructor() {
super("timeout");
Object.setPrototypeOf(this, RequestTimeoutError.prototype);
}
}

+ 4
- 0
packages/ipc/src/index.ts View File

@ -1 +1,5 @@
export * from "./AbstractConnection";
export * from "./AbstractClient";
export * from "./AbstractMethodMap";
export * from "./errors";
export * from "./schema";

+ 6
- 2
packages/ipc/src/schema.ts View File

@ -7,10 +7,14 @@ export interface IMethodMap {
[name: string]: (...args: any[]) => any
}
/**
* The packet structure
*/
export interface IPacket {
requestId?: number,
method: string,
args?: any[]
method? : string,
args? : any[],
result? : any
}
/**


+ 0
- 0
packages/ipc/test/unit/AbstractClient.spec.ts View File


+ 33
- 7
packages/ipc/test/unit/AbstractConnection.spec.ts View File

@ -1,19 +1,45 @@
import { AbstractConnection } from "../../src";
import { AbstractConnection, AbstractMethodMap, IPacket } from "../../src";
type MethodMap = {
sendTest(value: number): void
};
class MockRequestHandler extends AbstractMethodMap {
methodMap = {};
}
class MockConnection extends AbstractConnection<MethodMap> {
public disconnect = () => {};
public read = (rawPacket: any) => <IPacket>rawPacket;
public write = jest.fn();
}
describe("Abstract IPC Connections", () => {
it("Should write packet", () => {
let connection = new MockConnection();
connection.send("sendTest", 10);
// expect(connection.write.mock.calls[0][0]).toMatchObject({
class MockConnectionWithReply extends AbstractConnection<MethodMap> {
public disconnect = () => {};
public read = (rawPacket: any) => <IPacket>rawPacket;
public write(packet: IPacket) {
this.receive({
requestId: packet.requestId,
result: 100
});
}
}
// });
describe("Abstract IPC Connections", () => {
it("Should write complete packet", () => {
let connection = new MockConnection(new MockRequestHandler());
connection.send("sendTest", [10]);
expect(connection.write).toBeCalledWith(expect.objectContaining({
method: "sendTest",
args: [10]
}));
});
it("Request packet should timeout", async () => {
let connection = new MockConnection(new MockRequestHandler());
expect(connection.request("sendTest", [25])).rejects.toMatch("timeout");
});
it("Request should be resolved", async () => {
let connection = new MockConnectionWithReply(new MockRequestHandler());
expect(connection.request("sendTest", [25])).resolves.toEqual(100);
});
});

packages/ipc/test/unit/AbstractMethodMap.ts → packages/ipc/test/unit/AbstractMethodMap.spec.ts View File


+ 0
- 0
packages/ipc/test/unit/AbstractServer.spec.ts View File


+ 0
- 5
packages/ipc/test/unit/Request.spec.ts View File

@ -1,5 +0,0 @@
describe("Request", () => {
it("", () => {
});
});

+ 14
- 9
packages/ipc/yarn.lock View File

@ -3,16 +3,21 @@
"@types/node-ipc@^9.1.3":
version "9.1.3"
resolved "https://registry.yarnpkg.com/@types/node-ipc/-/node-ipc-9.1.3.tgz#5381fbc910071083b28dd43225727877c108b361"
integrity sha512-ka7CPX9Dk2lwe4PxoZMLOwcQrtdcYe/7OKmH75fQbmt0jdKltWVkdGA81D5l55d0wNhkweHa3XmzFbt5C0ieOQ==
version "9.1.5"
resolved "https://registry.yarnpkg.com/@types/node-ipc/-/node-ipc-9.1.5.tgz#0f9dac03fe6c96b6ff379725faf20d8a97eb00fa"
integrity sha512-xxYUVj/Y8fNkxQlvndVeWlL99wAF4KwISsRy21RSAAT/SKyrh+X3/BZXHcM/ZJPNri9h1JWw58wDKT1zr2pXVw==
dependencies:
"@types/node" "*"
"@types/node@*":
version "15.0.1"
resolved "https://registry.yarnpkg.com/@types/node/-/node-15.0.1.tgz#ef34dea0881028d11398be5bf4e856743e3dc35a"
integrity sha512-TMkXt0Ck1y0KKsGr9gJtWGjttxlZnnvDtphxUOSd0bfaR6Q1jle+sPvrzNR1urqYTWMinoKvjKfXUGsumaO1PA==
version "16.3.2"
resolved "https://registry.yarnpkg.com/@types/node/-/node-16.3.2.tgz#655432817f83b51ac869c2d51dd8305fb8342e16"
integrity sha512-jJs9ErFLP403I+hMLGnqDRWT0RYKSvArxuBVh2veudHV7ifEC1WAmjJADacZ7mRbA2nWgHtn8xyECMAot0SkAw==
cancelable-promise@^4.2.1:
version "4.2.1"
resolved "https://registry.yarnpkg.com/cancelable-promise/-/cancelable-promise-4.2.1.tgz#b02f79c5dde2704acfff1bc1ac2b4090f55541fe"
integrity sha512-PJZ/000ocWhPZQBAuNewAOMA2WEkJ8RhXI6AxeGLiGdW8EYDmumzo9wKyNgjDgxc1q/HbXuTdlcI+wXrOe/jMw==
easy-stack@^1.0.1:
version "1.0.1"
@ -37,9 +42,9 @@ js-queue@2.0.2:
easy-stack "^1.0.1"
node-ipc@^9.1.4:
version "9.1.4"
resolved "https://registry.yarnpkg.com/node-ipc/-/node-ipc-9.1.4.tgz#2acf962681afdac2602876d98fe6434d54d9bd3c"
integrity sha512-A+f0mn2KxUt1uRTSd5ktxQUsn2OEhj5evo7NUi/powBzMSZ0vocdzDjlq9QN2v3LH6CJi3e5xAenpZ1QwU5A8g==
version "9.2.1"
resolved "https://registry.yarnpkg.com/node-ipc/-/node-ipc-9.2.1.tgz#b32f66115f9d6ce841dc4ec2009d6a733f98bb6b"
integrity sha512-mJzaM6O3xHf9VT8BULvJSbdVbmHUKRNOH7zDDkCrA1/T+CVjq2WVIDfLt0azZRXpgArJtl3rtmEozrbXPZ9GaQ==
dependencies:
event-pubsub "4.3.0"
js-message "1.0.7"


Loading…
Cancel
Save