From c66f4f9e921fbd738d4a0875f5fdf38a889c8bb0 Mon Sep 17 00:00:00 2001 From: David Ludwig Date: Mon, 26 Apr 2021 21:34:38 -0500 Subject: [PATCH] Adjust interface communication. Add additional broadcasting events and clean up other parameters. --- package.json | 5 ++--- src/services/IpcInterface.ts | 33 +++++++++++++++++++++++---- src/services/TorrentClient.ts | 42 ++++++++++++++++++++++++++++++----- 3 files changed, 68 insertions(+), 12 deletions(-) diff --git a/package.json b/package.json index d935063..2136aff 100644 --- a/package.json +++ b/package.json @@ -5,9 +5,8 @@ "main": "build/index.js", "scripts": { "build": "tsc", - "start": "node .", - "start:dev": "node --inspect=0.0.0.0:9229 -r ./node_modules/ts-node/register ./src/index.ts", - "start:watch": "nodemon", + "start": "NODE_ENV=production; node .", + "start:dev": "nodemon", "test": "echo \"Error: no test specified\" && exit 1", "postinstall": "patch-package" }, diff --git a/src/services/IpcInterface.ts b/src/services/IpcInterface.ts index 12a8b57..92e38fd 100644 --- a/src/services/IpcInterface.ts +++ b/src/services/IpcInterface.ts @@ -1,3 +1,5 @@ +import { mkdir } from "fs/promises"; +import { dirname } from "path"; import ipc from "node-ipc"; import type { Server } from "node-ipc"; import { Socket } from "net"; @@ -12,6 +14,9 @@ type IAddTorrent = string | { export default class IpcInterface { + /** + * The torrent client instance + */ protected torrentClient: TorrentClient; /** @@ -34,8 +39,9 @@ export default class IpcInterface * Boot the IPC interface */ public boot() { - return new Promise((resolve, reject) => { + return new Promise(async (resolve, reject) => { console.log("Serving:", process.env["IPC_SOCKET_PATH"]); + await mkdir(dirname(process.env["IPC_SOCKET_PATH"]), { recursive: true }); ipc.serve(process.env["IPC_SOCKET_PATH"], () => { this.server = ipc.server; this.installEventHandlers(this.server); @@ -53,6 +59,7 @@ export default class IpcInterface this.addEventHandler(server, "remove", this.removeTorrent); this.addEventHandler(server, "list", this.listTorrents); this.addEventHandler(server, "details", this.torrentDetails); + this.torrentClient.on("torrent_finished", this.torrentFinished.bind(this)); } /** @@ -61,7 +68,7 @@ export default class IpcInterface protected addEventHandler(server: Server, method: string, handle: (...args: any[]) => Promise) { server.on(method, async (message: any, socket: Socket) => { try { - let response = await handle.apply(this, [message]); + let response = await handle.apply(this, message); this.server.emit(socket, method, { response }); } catch (error) { console.log("Error:", method, error); @@ -81,9 +88,9 @@ export default class IpcInterface protected async addTorrent(torrentInfo: IAddTorrent, downloadPath?: string) { let torrent: WebTorrent.Torrent; if (typeof torrentInfo == "string") { - torrent = await this.torrentClient.add(torrentInfo); + torrent = await this.torrentClient.add(torrentInfo, { downloadPath }); } else { - torrent = await this.torrentClient.add(Buffer.from(torrentInfo.data)); + torrent = await this.torrentClient.add(Buffer.from(torrentInfo.data), { downloadPath }); } return torrent.infoHash; } @@ -120,4 +127,22 @@ export default class IpcInterface } return torrents.map(torrent => this.torrentClient.serializeTorrent(torrent)); } + + // Subscription Interface Methods -------------------------------------------------------------- + + /** + * Broadcast a message to the connected clients + */ + protected broadcast(method: string, ...message: any[]) { + for (let socket of (ipc.server).sockets) { + this.server.emit(socket, method, message); + } + } + + /** + * Notify connected clients that a torrent has finished + */ + public torrentFinished(torrent: WebTorrent.Torrent) { + this.broadcast("torrent_finished", torrent.infoHash); + } } diff --git a/src/services/TorrentClient.ts b/src/services/TorrentClient.ts index 31fee8a..363f600 100644 --- a/src/services/TorrentClient.ts +++ b/src/services/TorrentClient.ts @@ -6,13 +6,33 @@ import { extname, join, sep } from "path"; import Torrent from "../database/entities/Torrent"; import rimraf from "rimraf"; import { ISerializedTorrent, TorrentState } from "../common"; +import { EventEmitter } from "events"; interface IAddOptions { downloadPath?: string; extensions?: string | string[]; } -export default class TorrentClient +/** + * Available events in the torrent client + */ +interface TorrentClientEvents { + "torrent_finished": (torrent: WebTorrent.Torrent) => void; +} + +/** + * Declare the event types in the torrent client + */ +export declare interface TorrentClient { + emit( + event: U, ...args: Parameters + ): boolean; + on( + event: U, listener: TorrentClientEvents[U] + ): this; +} + +export class TorrentClient extends EventEmitter { /** * The current WebTorrent instance (available after boot) @@ -39,17 +59,27 @@ export default class TorrentClient // Event Handling ------------------------------------------------------------------------------ + /** + * Invoked when a WebTorrent client error occurs + */ protected onError(error: string | Error) { - + console.error("A Webtorrent client error occurred:", error); } + /** + * Invoked when a torrent error occurs + */ protected onTorrentError(torrent: WebTorrent.Torrent, error: string | Error) { - console.error(torrent.name, "had an error", error); + console.error("Torrent error occurred:", torrent.name, error); Torrent.delete({ infoHash: torrent.infoHash }); } + /** + * Invoked when a torrent has finished + */ protected onTorrentFinish(torrent: WebTorrent.Torrent) { - console.log(torrent.name, "finished"); + console.log("Torrent finished:", torrent.name); + this.emit("torrent_finished", torrent); } /** @@ -171,7 +201,7 @@ export default class TorrentClient assert(this.__webtorrent.get(torrentInfo.infoHash) === null, "Torrent has already been added"); // Add the torrent to the client - let torrent = this.addTorrent(torrentInfo, options.downloadPath ?? "/storage/default"); + let torrent = this.addTorrent(torrentInfo, options.downloadPath ?? "/mnt/storage/Downloads"); // When the metadata has beened fetched, select the files to download and store the torrent torrent.once("metadata", () => { @@ -249,3 +279,5 @@ export default class TorrentClient return this.__webtorrent.torrents; } } + +export default TorrentClient;