You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

190 lines
6.2 KiB

import { Socket } from "net";
import assert from "assert";
import { Microservice } from "@autoplex/microservice";
import AbstractIpcService from "./AbstractIpcService";
import { IIpcResponse, IIpcRequest, IPC } from "./schema";
import { IpcConnectionError, IpcTimeoutError } from "./IpcError";
export abstract class IpcClientService<M extends Microservice = Microservice> extends AbstractIpcService<M>
{
/**
* Indicate if there is an active connection to the IPC
*/
private __isConnected: boolean = false;
/**
* The most recent request ID
*/
private __requestId!: number;
/**
* The active IPC socket
*/
private __socket: Socket|null = null;
// Service Implementation ----------------------------------------------------------------------
/**
* Install the event handlers for receiving on the IPC socket
*
* Example: this.addMessageHandler("some_event", this.onSomeEvent)
*/
protected installMessageHandlers() {
// no-op
}
// Service Management --------------------------------------------------------------------------
/**
* Boot the IPC client service
*/
public bootIpc(ipc: IPC) {
// Connect to the server
return new Promise<void>((resolve, _) => {
this.rawIpcInstance!.connectTo(this.NAME, this.SOCKET_PATH, () => {
this.__isConnected = false;
this.__requestId = 0;
this.__socket = <Socket>ipc!.of[this.NAME];
this.installEventHandlers(this.__socket!);
this.installMessageHandlers();
this.__socket!.once("connect", resolve);
});
});
}
/**
* Install the event handlers for the IPC socket
*/
protected installEventHandlers(socket: Socket) {
socket.on("connect", () => this.onConnect());
socket.on("error", (error: any) => this.onError(error));
socket.on("disconnect", () => this.onDisconnect());
socket.on("destroy", () => this.onDestroy());
}
/**
* Add a handler for an event broadcasted by the server
*/
protected addMessageHandler(method: string, handle: (...args: any[]) => Promise<any>) {
assert(this.__socket !== null, "Attempted to add events to null socket");
this.__socket.on(method, async (data: any) => handle.apply(this, [data]));
}
/**
* Shutdown the IPC service
*/
public async shutdownIpc(ipc: IPC|null) {
ipc?.disconnect(this.NAME);
this.__socket?.removeAllListeners();
this.__socket?.destroy();
this.__socket = null;
}
// Socket Event Handlers -----------------------------------------------------------------------
/**
* Invoked when the client established a connection to an IPC server
*/
protected onConnect() {
this.log("IPC: Connection established");
this.__isConnected = true;
}
/**
* Invoked when an IPC error occurs
*/
protected onError(error: string | Error) {
if (this.__isConnected) {
this.log("IPC: Error occurred:", error);
}
}
/**
* Invoked when disconnected from an IPC server
*/
protected onDisconnect() {
if (this.__isConnected) {
this.log("IPC: Disconnected");
}
this.__isConnected = false;
}
/**
* Invoked when the IPC socket has been destroyed
*/
protected onDestroy() {
this.log("IPC: Destroyed");
this.__isConnected = false;
}
// Methods -------------------------------------------------------------------------------------
/**
* Perform a general request and wait for a response
*/
protected async request(method: string, data?: any, timeout: number|null = null) {
return new Promise<IIpcResponse>((resolve, reject) => {
// If the client is not connected to a server, reject immediately
if (!this.__isConnected || this.__socket === null) {
reject(new IpcConnectionError("Not connected"));
return;
}
// Clean up event listeners
let cleanUp = () => {
if (responseTimeout !== null) {
clearTimeout(responseTimeout);
}
if (this.__socket === null) {
return;
}
this.__socket.off(responseMethod, respond);
this.__socket.off("disconnect", respond);
this.__socket.off("destroy", respond);
};
// Handle the response
let respond = (response: IIpcResponse) => {
cleanUp();
resolve(response);
};
// Abort the request
let abort = (error: Error) => {
cleanUp();
reject(error);
};
// Fetch a request ID and declare a timeout
const requestId = this.__requestId++;
const responseMethod = `method_response_${requestId}`;
// Include timeout mechanism in the off chance something breaks
let responseTimeout: NodeJS.Timeout|null = null;
if (timeout !== null) {
responseTimeout = setTimeout(() => abort(
new IpcTimeoutError("Timeout")
), timeout);
}
this.__socket.once("disconnect", () => abort(new IpcConnectionError("Disconnected")));
this.__socket.once("destroy", () => abort(new IpcConnectionError("Destroyed")));
this.__socket.once(responseMethod, respond);
this.__socket.emit(method, <IIpcRequest>{ id: requestId, data });
});
}
/**
* Send a message over IPC without waiting for a response
*/
protected send(method: string, data?: any) {
if (!this.__isConnected || this.__socket === null) {
throw new IpcConnectionError("Not connected");
}
this.__socket.emit(method, <IIpcRequest>{ id: null, data });
}
// Accessors -----------------------------------------------------------------------------------
/**
* Get the connection status of the IPC connection
*/
public get isConnected() {
return this.__isConnected;
}
}