You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

91 lines
2.7 KiB

import assert from "assert";
import { mkdir } from "fs/promises";
import { Socket } from "net";
import { dirname } from "path";
import { Microservice } from "@autoplex/microservice";
import { IIpcRequest, IPC } from "./schema";
import AbstractIpcService from "./AbstractIpcService";
type IpcServer = IPC["server"];
export abstract class IpcServerService<M extends Microservice = Microservice> extends AbstractIpcService<M>
{
/**
* The IPC server instance
*/
private __server!: IpcServer|null;
// Service Implementation ----------------------------------------------------------------------
/**
* Install the event handlers for receiving on the IPC socket
*
* Example: this.addMessageHandler("some_event", this.onSomeEvent)
*/
protected installMessageHandlers() {
// no-op
}
// Service Management --------------------------------------------------------------------------
/**
* Boot the IPC service
*/
public bootIpc(ipc: IPC) {
return new Promise<void>(async (resolve) => {
// Create the socket directory if it doesn't exist
await mkdir(dirname(this.SOCKET_PATH), { recursive: true });
// Serve the IPC server
ipc.serve(this.SOCKET_PATH, () => {
this.__server = ipc.server;
this.installMessageHandlers();
resolve();
});
ipc.server.start();
});
}
/**
* Add a message/request handler for the server
*/
protected addMessageHandler(method: string, handle: (...args: any[]) => Promise<any>) {
assert(this.__server !== null, "Attempted to add events to null server");
this.__server.on(method, async (request: IIpcRequest, socket: Socket) => {
let handlerPromise = handle.apply(this, [request.data]);
if (request.id === null) {
handlerPromise.catch(error => this.log("Error:", method, error, request));
return;
}
const responseMethod = `method_response_${request.id}`;
try {
this.__server!.emit(socket, responseMethod, { data: await handlerPromise });
} catch(error) {
this.log(this.log("Error:", method, error, request));
this.__server!.emit(socket, responseMethod, { error });
}
});
}
/**
* Shutdown the IPC service
*/
public async shutdownIpc(ipc: IPC|null) {
this.__server?.stop();
this.__server = null;
for (let socket of <Socket[]>Object.values(ipc?.of ?? [])) {
socket.destroy();
}
}
// Methods -------------------------------------------------------------------------------------
/**
* Broadcast a message to all connected clients
*/
public broadcast(method: string, data?: any) {
if (this.__server === null) {
return;
}
this.__server.emit(method, data);
}
}