Browse Source

Adjust interface communication. Add additional broadcasting events and clean up other parameters.

master
David Ludwig 4 years ago
parent
commit
c66f4f9e92
3 changed files with 68 additions and 12 deletions
  1. +2
    -3
      package.json
  2. +29
    -4
      src/services/IpcInterface.ts
  3. +37
    -5
      src/services/TorrentClient.ts

+ 2
- 3
package.json View File

@ -5,9 +5,8 @@
"main": "build/index.js",
"scripts": {
"build": "tsc",
"start": "node .",
"start:dev": "node --inspect=0.0.0.0:9229 -r ./node_modules/ts-node/register ./src/index.ts",
"start:watch": "nodemon",
"start": "NODE_ENV=production; node .",
"start:dev": "nodemon",
"test": "echo \"Error: no test specified\" && exit 1",
"postinstall": "patch-package"
},


+ 29
- 4
src/services/IpcInterface.ts View File

@ -1,3 +1,5 @@
import { mkdir } from "fs/promises";
import { dirname } from "path";
import ipc from "node-ipc";
import type { Server } from "node-ipc";
import { Socket } from "net";
@ -12,6 +14,9 @@ type IAddTorrent = string | {
export default class IpcInterface
{
/**
* The torrent client instance
*/
protected torrentClient: TorrentClient;
/**
@ -34,8 +39,9 @@ export default class IpcInterface
* Boot the IPC interface
*/
public boot() {
return new Promise<void>((resolve, reject) => {
return new Promise<void>(async (resolve, reject) => {
console.log("Serving:", process.env["IPC_SOCKET_PATH"]);
await mkdir(dirname(<string>process.env["IPC_SOCKET_PATH"]), { recursive: true });
ipc.serve(<string>process.env["IPC_SOCKET_PATH"], () => {
this.server = ipc.server;
this.installEventHandlers(this.server);
@ -53,6 +59,7 @@ export default class IpcInterface
this.addEventHandler(server, "remove", this.removeTorrent);
this.addEventHandler(server, "list", this.listTorrents);
this.addEventHandler(server, "details", this.torrentDetails);
this.torrentClient.on("torrent_finished", this.torrentFinished.bind(this));
}
/**
@ -61,7 +68,7 @@ export default class IpcInterface
protected addEventHandler(server: Server, method: string, handle: (...args: any[]) => Promise<any>) {
server.on(method, async (message: any, socket: Socket) => {
try {
let response = await handle.apply(this, [message]);
let response = await handle.apply(this, message);
this.server.emit(socket, method, { response });
} catch (error) {
console.log("Error:", method, error);
@ -81,9 +88,9 @@ export default class IpcInterface
protected async addTorrent(torrentInfo: IAddTorrent, downloadPath?: string) {
let torrent: WebTorrent.Torrent;
if (typeof torrentInfo == "string") {
torrent = await this.torrentClient.add(torrentInfo);
torrent = await this.torrentClient.add(torrentInfo, { downloadPath });
} else {
torrent = await this.torrentClient.add(Buffer.from(torrentInfo.data));
torrent = await this.torrentClient.add(Buffer.from(torrentInfo.data), { downloadPath });
}
return torrent.infoHash;
}
@ -120,4 +127,22 @@ export default class IpcInterface
}
return torrents.map(torrent => this.torrentClient.serializeTorrent(torrent));
}
// Subscription Interface Methods --------------------------------------------------------------
/**
* Broadcast a message to the connected clients
*/
protected broadcast(method: string, ...message: any[]) {
for (let socket of <Socket[]>(<any>ipc.server).sockets) {
this.server.emit(socket, method, message);
}
}
/**
* Notify connected clients that a torrent has finished
*/
public torrentFinished(torrent: WebTorrent.Torrent) {
this.broadcast("torrent_finished", torrent.infoHash);
}
}

+ 37
- 5
src/services/TorrentClient.ts View File

@ -6,13 +6,33 @@ import { extname, join, sep } from "path";
import Torrent from "../database/entities/Torrent";
import rimraf from "rimraf";
import { ISerializedTorrent, TorrentState } from "../common";
import { EventEmitter } from "events";
interface IAddOptions {
downloadPath?: string;
extensions?: string | string[];
}
export default class TorrentClient
/**
* Available events in the torrent client
*/
interface TorrentClientEvents {
"torrent_finished": (torrent: WebTorrent.Torrent) => void;
}
/**
* Declare the event types in the torrent client
*/
export declare interface TorrentClient {
emit<U extends keyof TorrentClientEvents>(
event: U, ...args: Parameters<TorrentClientEvents[U]>
): boolean;
on<U extends keyof TorrentClientEvents>(
event: U, listener: TorrentClientEvents[U]
): this;
}
export class TorrentClient extends EventEmitter
{
/**
* The current WebTorrent instance (available after boot)
@ -39,17 +59,27 @@ export default class TorrentClient
// Event Handling ------------------------------------------------------------------------------
/**
* Invoked when a WebTorrent client error occurs
*/
protected onError(error: string | Error) {
console.error("A Webtorrent client error occurred:", error);
}
/**
* Invoked when a torrent error occurs
*/
protected onTorrentError(torrent: WebTorrent.Torrent, error: string | Error) {
console.error(torrent.name, "had an error", error);
console.error("Torrent error occurred:", torrent.name, error);
Torrent.delete({ infoHash: torrent.infoHash });
}
/**
* Invoked when a torrent has finished
*/
protected onTorrentFinish(torrent: WebTorrent.Torrent) {
console.log(torrent.name, "finished");
console.log("Torrent finished:", torrent.name);
this.emit("torrent_finished", torrent);
}
/**
@ -171,7 +201,7 @@ export default class TorrentClient
assert(this.__webtorrent.get(torrentInfo.infoHash) === null, "Torrent has already been added");
// Add the torrent to the client
let torrent = this.addTorrent(torrentInfo, options.downloadPath ?? "/storage/default");
let torrent = this.addTorrent(torrentInfo, options.downloadPath ?? "/mnt/storage/Downloads");
// When the metadata has beened fetched, select the files to download and store the torrent
torrent.once("metadata", () => {
@ -249,3 +279,5 @@ export default class TorrentClient
return this.__webtorrent.torrents;
}
}
export default TorrentClient;

Loading…
Cancel
Save