import { InternalService, Microservice } from "@autoplex/microservice"; import { IncomingMessage } from "http"; import WebSocket, { Server } from "ws"; import jsonwebtoken from "jsonwebtoken"; import { IWebSocketRequest, IWebSocketResponse } from "./schema"; export abstract class WebSocketServerService extends InternalService { /** * The list of active client connections */ protected connections: WebSocket[] = []; /** * The websocket server instance */ protected server!: Server; /** * The list of registered methods */ private __methods: { [method: string]: (payload?: any) => Promise|any } = {}; // Overridable --------------------------------------------------------------------------------- /** * The application key for the application */ protected abstract get appKey(): string; /** * Install methods into the websocket service */ protected installMethods() { // no-op } // Service Methods ----------------------------------------------------------------------------- /** * Boot the service */ public override async boot() { this.installMethods(); this.server = new Server({ port: 3250 }); this.server.on("connection", this.acceptConnection.bind(this)); } /** * Shutdown the service */ public override async shutdown() { this.server.close(); for (let socket of this.connections) { socket.close(); } } /** * Accept the pending websocket connection */ protected acceptConnection(socket: WebSocket, request: IncomingMessage) { let timeout = setTimeout(() => socket.close.bind(socket), 5000); socket.once("message", async (data) => { clearTimeout(timeout); let cookies = this.parseCookies(request); let token = data + '.' + (cookies["jwt_signature"] ?? ""); try { await this.authenticate(token); socket.send("true"); socket.on("close", () => this.onClose(socket)); socket.on("error", (error) => this.onError(socket, error)); socket.on("message", (data) => this.onMessage(socket, data)); this.connections.push(socket); } catch(e) { socket.send("false"); socket.close(); } }); } /** * Verify the provided JWT */ protected async authenticate(token: string) { await new Promise((resolve, reject) => { jsonwebtoken.verify(token, this.appKey, (err, decoded) => { if (err) { reject(err); return; } resolve(decoded); }); }); } /** * Parse the received cookies from the request * https://stackoverflow.com/a/3409200/16243951 */ protected parseCookies(request: IncomingMessage) { let cookies: { [cookie: string]: string } = {}; for (let cookie of request.headers.cookie?.split(';') ?? []) { let parts = cookie.split('='); cookies[parts.shift()!.trim()] = decodeURI(parts.join('=')); } return cookies; } /** * Install a method into the server service */ protected installMethod(name: string, method: (payload?: any) => Promise|any) { if (this.__methods[name] !== undefined) { throw new Error("Attempted to install method with duplicate name in websocket service"); } this.__methods[name] = method; } // Event Handling ------------------------------------------------------------------------------ /** * Handle an incoming request */ protected async handleRequest(socket: WebSocket, request: IWebSocketRequest) { if (this.__methods[request.method] === undefined) { console.warn(`Requested unknown method: '${request.method}' with payload:`, request.payload); return; } let result = await this.__methods[request.method](request.payload); socket.send(JSON.stringify({ type: "response", requestId: request.requestId, payload: result })); } /** * Invoked when a connection closes */ protected onClose(socket: WebSocket) { let index = this.connections.indexOf(socket); this.connections.splice(index, 1); } /** * Invoked when a connection encounters an error */ protected onError(socket: WebSocket, error: Error) { this.onClose(socket); } /** * Invoked when a message is received from a connection */ protected onMessage(socket: WebSocket, data: string) { let parsed: IWebSocketRequest | IWebSocketResponse; try { parsed = JSON.parse(data); } catch(e) { console.warn("Failed to parse JSON response"); return; } if (parsed.type !== "request") { this.handleRequest(socket, parsed); // WAT!?!? } } }