import { InternalService, Microservice } from "@autoplex/microservice";
|
|
import { IncomingMessage } from "http";
|
|
import WebSocket, { Server } from "ws";
|
|
import jsonwebtoken from "jsonwebtoken";
|
|
import { IWebSocketRequest, IWebSocketResponse } from "./schema";
|
|
|
|
export abstract class WebSocketServerService<M extends Microservice> extends InternalService<M>
|
|
{
|
|
/**
|
|
* The list of active client connections
|
|
*/
|
|
protected connections: WebSocket[] = [];
|
|
|
|
/**
|
|
* The websocket server instance
|
|
*/
|
|
protected server!: Server;
|
|
|
|
/**
|
|
* The list of registered methods
|
|
*/
|
|
#methods: { [method: string]: (payload?: any) => Promise<any>|any } = {};
|
|
|
|
// Overridable ---------------------------------------------------------------------------------
|
|
|
|
/**
|
|
* The application key for the application
|
|
*/
|
|
protected abstract get appKey(): string;
|
|
|
|
/**
|
|
* Install methods into the websocket service
|
|
*/
|
|
protected installMethods() {
|
|
// no-op
|
|
}
|
|
|
|
// Service Methods -----------------------------------------------------------------------------
|
|
|
|
/**
|
|
* Boot the service
|
|
*/
|
|
public override async boot() {
|
|
this.installMethods();
|
|
this.server = new Server({ port: 3250 });
|
|
this.server.on("connection", this.acceptConnection.bind(this));
|
|
}
|
|
|
|
/**
|
|
* Shutdown the service
|
|
*/
|
|
public override async shutdown() {
|
|
this.server.close();
|
|
for (let socket of this.connections) {
|
|
socket.close();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Accept the pending websocket connection
|
|
*/
|
|
protected acceptConnection(socket: WebSocket, request: IncomingMessage) {
|
|
let timeout = setTimeout(() => socket.close.bind(socket), 5000);
|
|
socket.once("message", async (data) => {
|
|
clearTimeout(timeout);
|
|
let cookies = this.parseCookies(request);
|
|
let token = data + '.' + (cookies["jwt_signature"] ?? "");
|
|
try {
|
|
await this.authenticate(token);
|
|
socket.send("true");
|
|
socket.on("close", () => this.onClose(socket));
|
|
socket.on("error", (error) => this.onError(socket, error));
|
|
socket.on("message", (data) => this.onMessage(socket, <string>data));
|
|
this.connections.push(socket);
|
|
} catch(e) {
|
|
socket.send("false");
|
|
socket.close();
|
|
}
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Verify the provided JWT
|
|
*/
|
|
protected async authenticate(token: string) {
|
|
await new Promise((resolve, reject) => {
|
|
jsonwebtoken.verify(token, this.appKey, (err, decoded) => {
|
|
if (err) {
|
|
reject(err);
|
|
return;
|
|
}
|
|
resolve(decoded);
|
|
});
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Parse the received cookies from the request
|
|
* https://stackoverflow.com/a/3409200/16243951
|
|
*/
|
|
protected parseCookies(request: IncomingMessage) {
|
|
let cookies: { [cookie: string]: string } = {};
|
|
for (let cookie of request.headers.cookie?.split(';') ?? []) {
|
|
let parts = cookie.split('=');
|
|
cookies[parts.shift()!.trim()] = decodeURI(parts.join('='));
|
|
}
|
|
return cookies;
|
|
}
|
|
|
|
/**
|
|
* Install a method into the server service
|
|
*/
|
|
protected installMethod(name: string, method: (payload?: any) => Promise<any>|any) {
|
|
if (this.#methods[name] !== undefined) {
|
|
throw new Error("Attempted to install method with duplicate name in websocket service");
|
|
}
|
|
this.#methods[name] = method;
|
|
}
|
|
|
|
// Event Handling ------------------------------------------------------------------------------
|
|
|
|
/**
|
|
* Handle an incoming request
|
|
*/
|
|
protected async handleRequest(socket: WebSocket, request: IWebSocketRequest) {
|
|
if (this.#methods[request.method] === undefined) {
|
|
console.warn(`Requested unknown method: '${request.method}' with payload:`, request.payload);
|
|
return;
|
|
}
|
|
let result = await this.#methods[request.method](request.payload);
|
|
socket.send(JSON.stringify(<IWebSocketResponse>{
|
|
type: "response",
|
|
requestId: request.requestId,
|
|
payload: result
|
|
}));
|
|
}
|
|
|
|
/**
|
|
* Invoked when a connection closes
|
|
*/
|
|
protected onClose(socket: WebSocket) {
|
|
let index = this.connections.indexOf(socket);
|
|
this.connections.splice(index, 1);
|
|
}
|
|
|
|
/**
|
|
* Invoked when a connection encounters an error
|
|
*/
|
|
protected onError(socket: WebSocket, error: Error) {
|
|
this.onClose(socket);
|
|
}
|
|
|
|
/**
|
|
* Invoked when a message is received from a connection
|
|
*/
|
|
protected onMessage(socket: WebSocket, data: string) {
|
|
let parsed: IWebSocketRequest | IWebSocketResponse;
|
|
try {
|
|
parsed = JSON.parse(data);
|
|
} catch(e) {
|
|
console.warn("Failed to parse JSON response");
|
|
return;
|
|
}
|
|
if (parsed.type !== "request") {
|
|
this.handleRequest(socket, <IWebSocketRequest><any>parsed); // WAT!?!?
|
|
}
|
|
}
|
|
}
|