From 5131cd7109175c8f28d4984d6059f5b7ad071d94 Mon Sep 17 00:00:00 2001 From: David Ludwig Date: Tue, 6 Apr 2021 12:42:05 -0500 Subject: [PATCH] Add IPC interface to the torrent client --- services/torrent-client/.env.example | 4 + services/torrent-client/src/Application.ts | 13 +- services/torrent-client/src/common.ts | 29 +++++ .../src/database/entities/Torrent.ts | 2 +- .../src/services/IpcInterface.ts | 121 ++++++++++++++++++ .../src/services/TorrentClient.ts | 62 ++++++++- 6 files changed, 223 insertions(+), 8 deletions(-) create mode 100644 services/torrent-client/src/common.ts create mode 100644 services/torrent-client/src/services/IpcInterface.ts diff --git a/services/torrent-client/.env.example b/services/torrent-client/.env.example index f9aa925..d97facc 100644 --- a/services/torrent-client/.env.example +++ b/services/torrent-client/.env.example @@ -6,3 +6,7 @@ DB_PASSWORD_FILE = /run/secrets/mysql_root_password DB_DATABASE = autoplex_torrent DEFAULT_STORAGE_PATH = ./.data + +REST_PORT = 3000 + +IPC_SOCKET_PATH = /tmp/torrent_client.sock diff --git a/services/torrent-client/src/Application.ts b/services/torrent-client/src/Application.ts index cec19c5..cba7e5e 100644 --- a/services/torrent-client/src/Application.ts +++ b/services/torrent-client/src/Application.ts @@ -1,5 +1,6 @@ import { Equal } from "typeorm"; import connectToDatabase from "./database"; +import IpcInterface from "./services/IpcInterface"; import TorrentClient from "./services/TorrentClient"; export default class Application @@ -9,11 +10,17 @@ export default class Application */ private __client: TorrentClient; + /** + * The IPC interface for the torrent client + */ + private __ipcInterface: IpcInterface; + /** * Create the application */ public constructor() { this.__client = new TorrentClient(); + this.__ipcInterface = new IpcInterface(this.__client); } /** @@ -22,6 +29,7 @@ export default class Application private async boot() { await connectToDatabase(); await this.__client.boot(); + await this.__ipcInterface.boot(); } /** @@ -32,7 +40,10 @@ export default class Application console.log("Torrent client ready"); } - public get client() { + /** + * Get the torrent client instance + */ + public get torrentClient() { return this.__client; } } diff --git a/services/torrent-client/src/common.ts b/services/torrent-client/src/common.ts new file mode 100644 index 0000000..1205adf --- /dev/null +++ b/services/torrent-client/src/common.ts @@ -0,0 +1,29 @@ +export interface ISerializedFile { + path : string; + size : number; + downloaded: number; + progress : number; + selected : boolean; +} + +export interface ISerializedTorrent { + name : string; + infoHash : string; + downloaded : number; + uploaded : number; + ratio : number; + size : number; + downloadSpeed: number; + uploadSpeed : number; + numPeers : number; + progress : number; + path : string; + state : TorrentState; + files : ISerializedFile[]; +} + +export enum TorrentState { + Ready = 0x1, + Paused = 0x2, + Done = 0x4 +} diff --git a/services/torrent-client/src/database/entities/Torrent.ts b/services/torrent-client/src/database/entities/Torrent.ts index 39c31c1..799d815 100644 --- a/services/torrent-client/src/database/entities/Torrent.ts +++ b/services/torrent-client/src/database/entities/Torrent.ts @@ -19,7 +19,7 @@ export default class Torrent extends BaseEntity @Column() infoHash!: string; - @Column("blob") + @Column("mediumblob") torrentFile!: Buffer; @Column({nullable: true}) diff --git a/services/torrent-client/src/services/IpcInterface.ts b/services/torrent-client/src/services/IpcInterface.ts new file mode 100644 index 0000000..909dc28 --- /dev/null +++ b/services/torrent-client/src/services/IpcInterface.ts @@ -0,0 +1,121 @@ +import ipc from "node-ipc"; +import type { Server } from "node-ipc"; +import { Socket } from "net"; +import assert from "assert"; +import WebTorrent from "webtorrent-hybrid"; +import TorrentClient from "./TorrentClient"; + +type IAddTorrent = string | { + type: "Buffer", + data: number[] +} + +export default class IpcInterface +{ + protected torrentClient: TorrentClient; + + /** + * Quick reference to the IPC server + */ + protected server!: Server; + + /** + * Create a new IPC interface + */ + public constructor(client: TorrentClient) { + ipc.config.id = "torrent-client"; + ipc.config.retry = 1500; + ipc.config.silent = true; + + this.torrentClient = client; + } + + /** + * Boot the IPC interface + */ + public boot() { + return new Promise((resolve, reject) => { + console.log("Serving:", process.env["IPC_SOCKET_PATH"]); + ipc.serve(process.env["IPC_SOCKET_PATH"], () => { + this.server = ipc.server; + this.installEventHandlers(this.server); + resolve(); + }); + ipc.server.start(); + }); + } + + /** + * Install the the event handlers + */ + protected installEventHandlers(server: Server) { + this.addEventHandler(server, "add", this.addTorrent); + this.addEventHandler(server, "remove", this.removeTorrent); + this.addEventHandler(server, "list", this.listTorrents); + this.addEventHandler(server, "details", this.torrentDetails); + } + + /** + * Handle a specific event + */ + protected addEventHandler(server: Server, method: string, handle: (...args: any[]) => Promise) { + server.on(method, async (message: any, socket: Socket) => { + try { + let response = await handle.apply(this, [message]); + this.server.emit(socket, method, { response }); + } catch (error) { + console.log("Error:", method, error); + this.server.emit(socket, method, { + response: undefined, + error + }); + } + }); + } + + // Interface Methods --------------------------------------------------------------------------- + + /** + * Add a torrent to the client + */ + protected async addTorrent(torrentInfo: IAddTorrent) { + let torrent: WebTorrent.Torrent; + if (typeof torrentInfo == "string") { + torrent = await this.torrentClient.add(torrentInfo); + } else { + torrent = await this.torrentClient.add(Buffer.from(torrentInfo.data)); + } + return torrent.infoHash; + } + + /** + * Remove a torrent from the + * @param message Add + */ + protected async removeTorrent(torrentId: string) { + await this.torrentClient.remove(torrentId); + } + + protected async listTorrents() { + return this.torrentClient.torrents.map(torrent => Object({ + name: torrent.name, + infoHash: torrent.infoHash, + progress: torrent.progress, + state: this.torrentClient.torrentState(torrent) + })); + } + + protected async torrentDetails(torrentIds: string[]) { + let torrents: WebTorrent.Torrent[]; + if (torrentIds.length == 0) { + torrents = this.torrentClient.torrents; + } else { + torrents = torrentIds.map(torrentId => { + let torrent = this.torrentClient.get(torrentId); + assert(torrent != null, `Unknown torrent ID provided: ${torrentId}`); + return torrent; + }); + } + return torrents.map(torrent => this.torrentClient.serializeTorrent(torrent)); + } +} diff --git a/services/torrent-client/src/services/TorrentClient.ts b/services/torrent-client/src/services/TorrentClient.ts index fe1a38f..5aea1a4 100644 --- a/services/torrent-client/src/services/TorrentClient.ts +++ b/services/torrent-client/src/services/TorrentClient.ts @@ -5,8 +5,9 @@ import parseTorrent from "parse-torrent"; import { extname, join, sep } from "path"; import Torrent from "../database/entities/Torrent"; import rimraf from "rimraf"; +import { ISerializedTorrent, TorrentState } from "../common"; -interface AddOptions { +interface IAddOptions { downloadPath?: string; extensions?: string | string[]; } @@ -73,8 +74,13 @@ export default class TorrentClient protected async loadTorrents() { let torrents = await Torrent.find(); for (let torrent of torrents) { - let torrentInfo = parseTorrent(torrent.torrentFile); - this.addTorrent(torrentInfo, torrent.downloadPath, torrent.selectedFiles()); + try { + let torrentInfo = parseTorrent(torrent.torrentFile); + this.addTorrent(torrentInfo, torrent.downloadPath, torrent.selectedFiles()); + } catch(e) { + torrent.remove(); + continue; + } } } @@ -154,10 +160,10 @@ export default class TorrentClient /** * Add a torrent to the client */ - public async add(info: string | Buffer, options: AddOptions = {}) { + public async add(info: string | Buffer, options: IAddOptions = {}) { // Parse the torrent let torrentInfo = parseTorrent(info); - assert(typeof torrentInfo.infoHash === "string"); + assert(typeof torrentInfo.infoHash === "string" && torrentInfo.infoHash.length > 0, "Invalid magnet link provided"); // If the torrent already exists, skip it assert(this.__webtorrent.get(torrentInfo.infoHash) === null, "Torrent has already been added"); @@ -179,7 +185,7 @@ export default class TorrentClient /** * Remove a torrent from the client */ - public async remove(info: string | Buffer, withData: false) { + public async remove(info: string | Buffer, withData: boolean = false) { // Parse the torrent let torrentInfo = parseTorrent(info); assert(typeof torrentInfo.infoHash === "string"); @@ -196,4 +202,48 @@ export default class TorrentClient this.removeTorrentFiles(torrent); } } + + public has(infoHash: string) { + return this.__webtorrent.get(infoHash) != null; + } + + public get(infoHash: string) { + return this.__webtorrent.get(infoHash); + } + + public torrentState(torrent: WebTorrent.Torrent) { + let state = 0; + state |= (torrent.ready && TorrentState.Ready); + state |= (torrent.paused && TorrentState.Paused); + state |= (torrent.done && TorrentState.Done); + return state; + } + + public serializeTorrent(torrent: WebTorrent.Torrent) { + return { + name: torrent.name, + infoHash: torrent.infoHash, + downloaded: torrent.downloaded, + uploaded: torrent.uploaded, + ratio: torrent.ratio, + size: torrent.length, + downloadSpeed: torrent.downloadSpeed, + uploadSpeed: torrent.uploadSpeed, + numPeers: torrent.numPeers, + progress: torrent.progress, + path: torrent.path, + state: this.torrentState(torrent), + files: torrent.files.map(file => Object({ + path: file.path, + size: file.length, + downloaded: file.downloaded, + progress: file.progress, + selected: file.isSelected + })) + }; + } + + get torrents() { + return this.__webtorrent.torrents; + } }