|
|
@ -1,19 +1,15 @@ |
|
|
|
import { mkdir } from "fs/promises"; |
|
|
|
import { dirname } from "path"; |
|
|
|
import ipc from "node-ipc"; |
|
|
|
import type { Server } from "node-ipc"; |
|
|
|
import { Socket } from "net"; |
|
|
|
import assert from "assert"; |
|
|
|
import WebTorrent from "webtorrent-hybrid"; |
|
|
|
import TorrentClient from "./TorrentClient"; |
|
|
|
import { InternalService, Microservice } from "@autoplex/microservice"; |
|
|
|
import { IpcServerService } from "@autoplex/microservice"; |
|
|
|
import { env } from "@autoplex/utils"; |
|
|
|
|
|
|
|
type IAddTorrent = string | { |
|
|
|
type: "Buffer", |
|
|
|
data: number[] |
|
|
|
} |
|
|
|
|
|
|
|
export default class IpcInterface extends InternalService |
|
|
|
export default class IpcInterface extends IpcServerService |
|
|
|
{ |
|
|
|
/** |
|
|
|
* The torrent client instance |
|
|
@ -21,87 +17,46 @@ export default class IpcInterface extends InternalService |
|
|
|
protected torrentClient!: TorrentClient; |
|
|
|
|
|
|
|
/** |
|
|
|
* Quick reference to the IPC server |
|
|
|
*/ |
|
|
|
protected server!: Server; |
|
|
|
|
|
|
|
/** |
|
|
|
* Create a new IPC interface |
|
|
|
* The service name |
|
|
|
*/ |
|
|
|
public constructor(app: Microservice) { |
|
|
|
super(app); |
|
|
|
ipc.config.id = "torrent-client"; |
|
|
|
ipc.config.retry = 1500; |
|
|
|
ipc.config.silent = true; |
|
|
|
} |
|
|
|
public readonly NAME = "IPC"; |
|
|
|
|
|
|
|
/** |
|
|
|
* The service name |
|
|
|
* The path to the socket file |
|
|
|
*/ |
|
|
|
public get name() { |
|
|
|
return "IPC"; |
|
|
|
} |
|
|
|
protected readonly SOCKET_PATH = env("IPC_SOCKET_PATH"); |
|
|
|
|
|
|
|
/** |
|
|
|
* Boot the IPC interface |
|
|
|
*/ |
|
|
|
public boot() { |
|
|
|
// Store a reference to the torrent client
|
|
|
|
public async boot() { |
|
|
|
this.torrentClient = this.app.service<TorrentClient>("Torrent Client"); |
|
|
|
|
|
|
|
// Boot the IPC socket
|
|
|
|
return new Promise<void>(async (resolve, reject) => { |
|
|
|
console.log("Serving:", process.env["IPC_SOCKET_PATH"]); |
|
|
|
await mkdir(dirname(<string>process.env["IPC_SOCKET_PATH"]), { recursive: true }); |
|
|
|
ipc.serve(<string>process.env["IPC_SOCKET_PATH"], () => { |
|
|
|
this.server = ipc.server; |
|
|
|
this.installEventHandlers(this.server); |
|
|
|
resolve(); |
|
|
|
}); |
|
|
|
ipc.server.start(); |
|
|
|
}); |
|
|
|
await super.boot(); |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* Install the the event handlers |
|
|
|
*/ |
|
|
|
protected installEventHandlers(server: Server) { |
|
|
|
this.addEventHandler(server, "add", this.addTorrent); |
|
|
|
this.addEventHandler(server, "remove", this.removeTorrent); |
|
|
|
this.addEventHandler(server, "list", this.listTorrents); |
|
|
|
this.addEventHandler(server, "details", this.torrentDetails); |
|
|
|
protected installMessageHandlers() { |
|
|
|
this.addMessageHandler("add", this.addTorrent); |
|
|
|
this.addMessageHandler("remove", this.removeTorrent); |
|
|
|
this.addMessageHandler("list", this.listTorrents); |
|
|
|
this.addMessageHandler("details", this.torrentDetails); |
|
|
|
this.torrentClient.on("torrent_finished", this.torrentFinished.bind(this)); |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* Handle a specific event |
|
|
|
*/ |
|
|
|
protected addEventHandler(server: Server, method: string, handle: (...args: any[]) => Promise<any>) { |
|
|
|
server.on(method, async (message: any, socket: Socket) => { |
|
|
|
try { |
|
|
|
let response = await handle.apply(this, message); |
|
|
|
this.server.emit(socket, method, { response }); |
|
|
|
} catch (error) { |
|
|
|
console.log("Error:", method, error); |
|
|
|
this.server.emit(socket, method, { |
|
|
|
response: undefined, |
|
|
|
error |
|
|
|
}); |
|
|
|
} |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
|
// Interface Methods ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
/** |
|
|
|
* Add a torrent to the client |
|
|
|
*/ |
|
|
|
protected async addTorrent(torrentInfo: IAddTorrent, downloadPath?: string) { |
|
|
|
// protected async addTorrent(torrentInfo: IAddTorrent, downloadPath?: string) {
|
|
|
|
protected async addTorrent(payload: { torrent: IAddTorrent, downloadPath?: string }) { |
|
|
|
let torrent: WebTorrent.Torrent; |
|
|
|
if (typeof torrentInfo == "string") { |
|
|
|
torrent = await this.torrentClient.add(torrentInfo, { downloadPath }); |
|
|
|
if (typeof payload.torrent == "string") { |
|
|
|
torrent = await this.torrentClient.add(payload.torrent, { downloadPath: payload.downloadPath }); |
|
|
|
} else { |
|
|
|
torrent = await this.torrentClient.add(Buffer.from(torrentInfo.data), { downloadPath }); |
|
|
|
torrent = await this.torrentClient.add(Buffer.from(payload.torrent.data), { downloadPath: payload.downloadPath }); |
|
|
|
} |
|
|
|
return torrent.infoHash; |
|
|
|
} |
|
|
@ -144,11 +99,11 @@ export default class IpcInterface extends InternalService |
|
|
|
/** |
|
|
|
* Broadcast a message to the connected clients |
|
|
|
*/ |
|
|
|
protected broadcast(method: string, ...message: any[]) { |
|
|
|
for (let socket of <Socket[]>(<any>ipc.server).sockets) { |
|
|
|
this.server.emit(socket, method, message); |
|
|
|
} |
|
|
|
} |
|
|
|
// protected broadcast(method: string, ...message: any[]) {
|
|
|
|
// for (let socket of <Socket[]>(<any>ipc.server).sockets) {
|
|
|
|
// this.server.emit(socket, method, message);
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
|
|
|
|
/** |
|
|
|
* Notify connected clients that a torrent has finished |
|
|
|