Browse Source

Convert Seeker service to use the new generic IPC services from the microservice package

dev
David Ludwig 4 years ago
parent
commit
11c9e69eb1
3 changed files with 29 additions and 168 deletions
  1. +8
    -57
      services/seeker/src/services/IpcInterface.ts
  2. +19
    -111
      services/seeker/src/services/TorrentManager/TorrentClientIpc.ts
  3. +2
    -0
      services/seeker/src/services/TorrentManager/TorrentManager.ts

+ 8
- 57
services/seeker/src/services/IpcInterface.ts View File

@ -1,30 +1,11 @@
import { IPC } from "node-ipc";
import { Socket } from "net";
import { mkdir } from "fs/promises";
import { dirname } from "path";
import { InternalService } from "@autoplex/microservice";
import { IpcServerService } from "@autoplex/microservice";
import Application from "../Application";
import Supervisor from "./Supervisor";
import { MovieTicket } from "@autoplex/database";
import { env } from "@autoplex/utils";
export default class IpcInterface extends InternalService<Application>
export default class IpcInterface extends IpcServerService<Application>
{
/**
* The IPC instance
*/
private __ipc: InstanceType<typeof IPC>;
/**
* Create a new IPC interface
*/
public constructor(app: Application) {
super(app);
this.__ipc = new IPC();
this.__ipc.config.id = "seeker";
this.__ipc.config.retry = 1500;
this.__ipc.config.silent = true;
}
/**
* The service name
*/
@ -33,47 +14,17 @@ export default class IpcInterface extends InternalService<Application>
}
/**
* Boot the IPC interface
* The path to the socket file
*/
public boot() {
return new Promise<void>(async (resolve, reject) => {
console.log("Serving:", process.env["IPC_SOCKET_PATH"]);
await mkdir(dirname(<string>process.env["IPC_SOCKET_PATH"]), { recursive: true });
this.__ipc.serve(<string>process.env["IPC_SOCKET_PATH"], () => {
this.installEventHandlers();
resolve();
});
this.__ipc.server.start();
});
public get socketPath() {
return env("IPC_SOCKET_PATH");
}
public async shutdown() {
this.__ipc.server.stop();
}
/**
* Install the the event handlers
*/
protected installEventHandlers() {
this.addEventHandler("movie_ticket_added", this.onMovieTicketAdded);
}
/**
* Handle a specific event
*/
protected addEventHandler(method: string, handle: (...args: any[]) => Promise<any>) {
this.__ipc.server.on(method, async (message: any, socket: Socket) => {
try {
let response = await handle.apply(this, [message]);
this.__ipc.server.emit(socket, method, { response });
} catch (error) {
console.log("Error:", method, error);
this.__ipc.server.emit(socket, method, {
response: undefined,
error
});
}
});
protected installMessageHandlers() {
this.addMessageHandler("movie_ticket_added", this.onMovieTicketAdded);
}
// Interface Methods ---------------------------------------------------------------------------


+ 19
- 111
services/seeker/src/services/TorrentManager/TorrentClientIpc.ts View File

@ -1,7 +1,7 @@
import { Socket } from "net";
import Application from "../../Application";
import ipc = require("node-ipc");
import { InternalService } from "@autoplex/microservice";
import { IpcClientService } from "@autoplex/microservice";
import { env } from "@autoplex/utils";
interface IResponse {
response?: any,
@ -58,54 +58,13 @@ export class TorrentClientConnectionError extends Error {
/**
* The torrent client IPC service
*/
export default abstract class TorrentClientIpc extends InternalService<Application>
export default abstract class TorrentClientIpc extends IpcClientService<Application>
{
/**
* Indicate if there is an active connection to the IPC
* The path to the socket file
*/
private __isConnected: boolean;
/**
* IPC instance
*/
private __ipc = new ipc.IPC();
/**
* The active IPC socket
*/
protected socket!: Socket;
/**
* Create a new IPC client for the torrent client
*/
constructor(app: Application) {
super(app);
this.__ipc.config.id = "seeker";
this.__ipc.config.retry = 1500;
this.__ipc.config.silent = true;
this.__isConnected = false;
}
/**
* Boot the torrent client IPC service
*/
public boot() {
return new Promise<void>((resolve, reject) => {
this.__ipc.connectTo("torrent_client", process.env["TORRENT_CLIENT_IPC_SOCKET"], () => {
this.socket = this.__ipc.of["torrent_client"];
this.installSocketEventHandlers(this.socket);
this.installSocketMessageHandlers(this.socket);
resolve();
});
});
}
/**
* Shutdown the service
*/
public async shutdown() {
this.__ipc.disconnect("torrent_client");
public get socketPath() {
return env("TORRENT_CLIENT_IPC_SOCKET");
}
/**
@ -118,43 +77,13 @@ export default abstract class TorrentClientIpc extends InternalService<Applicati
socket.on("destroy", () => this.onDestroy());
}
protected installSocketMessageHandlers(socket: Socket) {
this.installSocketMessageHandler(socket, "torrent_finished", this.onTorrentFinished);
}
/**
* Install a socket message handler
* Install the message event handlers
*/
protected installSocketMessageHandler(socket: Socket, method: string, handler: (...args: any[]) => void) {
socket.on(method, (args: any[]) => handler.apply(this, args));
protected installMessageHandlers() {
this.addListener("torrent_finished", this.onTorrentFinished);
}
// Socket Event Handlers -----------------------------------------------------------------------
protected onConnect() {
this.log("IPC: Connection established");
this.__isConnected = true;
}
protected onError(error: string | Error) {
if (this.__isConnected) {
this.log("IPC: Error occurred:", error);
}
}
protected onDisconnect() {
if (this.__isConnected) {
this.log("IPC: Disconnected");
}
this.__isConnected = false;
}
protected onDestroy() {
this.log("IPC: Destroyed");
}
// Socket Message Handlers ---------------------------------------------------------------------
/**
* Invoked when a torrent has finished downloading
*/
@ -162,27 +91,12 @@ export default abstract class TorrentClientIpc extends InternalService<Applicati
// Methods -------------------------------------------------------------------------------------
/**
* Perform a general request to the torrent client
*/
protected async request(method: string, ...message: any[]) {
return new Promise<IResponse>((resolve, reject) => {
if (!this.isConnected) {
reject(new TorrentClientConnectionError("Not connected to torrent client"));
return;
}
let respond = (response: any) => {
clearTimeout(timeout);
resolve(response);
}
// Include timeout mechanism in the off chance something breaks
let timeout = setTimeout(() => {
this.socket.off(method, respond);
reject(new TorrentClientConnectionError("Torrent client IPC request timeout"));
}, 1000);
this.socket.once(method, respond);
this.socket.emit(method, message);
});
protected async test(value: string) {
let response = await this.request("test", value);
if (response.error) {
throw new Error("Failed to complete test");
}
return <string>response.data;
}
/**
@ -190,11 +104,11 @@ export default abstract class TorrentClientIpc extends InternalService<Applicati
* @param torrent Magnet URI or file buffer
*/
protected async add(torrent: string | Buffer, downloadPath?: string) {
let response = await this.request("add", torrent, downloadPath);
let response = await this.request("add", { torrent, downloadPath });
if (response.error) {
throw new Error("Failed to add torrent");
}
return <string>response.response;
return <string>response.data;
}
/**
@ -217,7 +131,7 @@ export default abstract class TorrentClientIpc extends InternalService<Applicati
console.error(response.error);
throw new Error("Failed to obtain torrent list");
}
return <ITorrent[]>response.response;
return <ITorrent[]>response.data;
}
/**
@ -230,12 +144,6 @@ export default abstract class TorrentClientIpc extends InternalService<Applicati
console.error(response.error);
throw new Error("Failed to retrieve torrent details");
}
return <ISerializedTorrent[]>response.response;
}
// Accessors -----------------------------------------------------------------------------------
get isConnected() {
return this.__isConnected;
return <ISerializedTorrent[]>response.data;
}
}

+ 2
- 0
services/seeker/src/services/TorrentManager/TorrentManager.ts View File

@ -49,6 +49,8 @@ export default class TorrentManager extends TorrentClientIpc
this.log("Booting the torrent manager");
await super.boot();
await this.loadDisks();
console.log("Sending test request...")
console.log("Response is:", await this.test("test value"));
}
// Interface methods ---------------------------------------------------------------------------


Loading…
Cancel
Save