Browse Source

Add IPC interface to the torrent client

staging
David Ludwig 4 years ago
parent
commit
5131cd7109
6 changed files with 223 additions and 8 deletions
  1. +4
    -0
      services/torrent-client/.env.example
  2. +12
    -1
      services/torrent-client/src/Application.ts
  3. +29
    -0
      services/torrent-client/src/common.ts
  4. +1
    -1
      services/torrent-client/src/database/entities/Torrent.ts
  5. +121
    -0
      services/torrent-client/src/services/IpcInterface.ts
  6. +56
    -6
      services/torrent-client/src/services/TorrentClient.ts

+ 4
- 0
services/torrent-client/.env.example View File

@ -6,3 +6,7 @@ DB_PASSWORD_FILE = /run/secrets/mysql_root_password
DB_DATABASE = autoplex_torrent
DEFAULT_STORAGE_PATH = ./.data
REST_PORT = 3000
IPC_SOCKET_PATH = /tmp/torrent_client.sock

+ 12
- 1
services/torrent-client/src/Application.ts View File

@ -1,5 +1,6 @@
import { Equal } from "typeorm";
import connectToDatabase from "./database";
import IpcInterface from "./services/IpcInterface";
import TorrentClient from "./services/TorrentClient";
export default class Application
@ -9,11 +10,17 @@ export default class Application
*/
private __client: TorrentClient;
/**
* The IPC interface for the torrent client
*/
private __ipcInterface: IpcInterface;
/**
* Create the application
*/
public constructor() {
this.__client = new TorrentClient();
this.__ipcInterface = new IpcInterface(this.__client);
}
/**
@ -22,6 +29,7 @@ export default class Application
private async boot() {
await connectToDatabase();
await this.__client.boot();
await this.__ipcInterface.boot();
}
/**
@ -32,7 +40,10 @@ export default class Application
console.log("Torrent client ready");
}
public get client() {
/**
* Get the torrent client instance
*/
public get torrentClient() {
return this.__client;
}
}

+ 29
- 0
services/torrent-client/src/common.ts View File

@ -0,0 +1,29 @@
export interface ISerializedFile {
path : string;
size : number;
downloaded: number;
progress : number;
selected : boolean;
}
export interface ISerializedTorrent {
name : string;
infoHash : string;
downloaded : number;
uploaded : number;
ratio : number;
size : number;
downloadSpeed: number;
uploadSpeed : number;
numPeers : number;
progress : number;
path : string;
state : TorrentState;
files : ISerializedFile[];
}
export enum TorrentState {
Ready = 0x1,
Paused = 0x2,
Done = 0x4
}

+ 1
- 1
services/torrent-client/src/database/entities/Torrent.ts View File

@ -19,7 +19,7 @@ export default class Torrent extends BaseEntity
@Column()
infoHash!: string;
@Column("blob")
@Column("mediumblob")
torrentFile!: Buffer;
@Column({nullable: true})


+ 121
- 0
services/torrent-client/src/services/IpcInterface.ts View File

@ -0,0 +1,121 @@
import ipc from "node-ipc";
import type { Server } from "node-ipc";
import { Socket } from "net";
import assert from "assert";
import WebTorrent from "webtorrent-hybrid";
import TorrentClient from "./TorrentClient";
type IAddTorrent = string | {
type: "Buffer",
data: number[]
}
export default class IpcInterface
{
protected torrentClient: TorrentClient;
/**
* Quick reference to the IPC server
*/
protected server!: Server;
/**
* Create a new IPC interface
*/
public constructor(client: TorrentClient) {
ipc.config.id = "torrent-client";
ipc.config.retry = 1500;
ipc.config.silent = true;
this.torrentClient = client;
}
/**
* Boot the IPC interface
*/
public boot() {
return new Promise<void>((resolve, reject) => {
console.log("Serving:", process.env["IPC_SOCKET_PATH"]);
ipc.serve(<string>process.env["IPC_SOCKET_PATH"], () => {
this.server = ipc.server;
this.installEventHandlers(this.server);
resolve();
});
ipc.server.start();
});
}
/**
* Install the the event handlers
*/
protected installEventHandlers(server: Server) {
this.addEventHandler(server, "add", this.addTorrent);
this.addEventHandler(server, "remove", this.removeTorrent);
this.addEventHandler(server, "list", this.listTorrents);
this.addEventHandler(server, "details", this.torrentDetails);
}
/**
* Handle a specific event
*/
protected addEventHandler(server: Server, method: string, handle: (...args: any[]) => Promise<any>) {
server.on(method, async (message: any, socket: Socket) => {
try {
let response = await handle.apply(this, [message]);
this.server.emit(socket, method, { response });
} catch (error) {
console.log("Error:", method, error);
this.server.emit(socket, method, {
response: undefined,
error
});
}
});
}
// Interface Methods ---------------------------------------------------------------------------
/**
* Add a torrent to the client
*/
protected async addTorrent(torrentInfo: IAddTorrent) {
let torrent: WebTorrent.Torrent;
if (typeof torrentInfo == "string") {
torrent = await this.torrentClient.add(torrentInfo);
} else {
torrent = await this.torrentClient.add(Buffer.from(torrentInfo.data));
}
return torrent.infoHash;
}
/**
* Remove a torrent from the
* @param message Add
*/
protected async removeTorrent(torrentId: string) {
await this.torrentClient.remove(torrentId);
}
protected async listTorrents() {
return this.torrentClient.torrents.map(torrent => Object({
name: torrent.name,
infoHash: torrent.infoHash,
progress: torrent.progress,
state: this.torrentClient.torrentState(torrent)
}));
}
protected async torrentDetails(torrentIds: string[]) {
let torrents: WebTorrent.Torrent[];
if (torrentIds.length == 0) {
torrents = this.torrentClient.torrents;
} else {
torrents = torrentIds.map(torrentId => {
let torrent = this.torrentClient.get(torrentId);
assert(torrent != null, `Unknown torrent ID provided: ${torrentId}`);
return torrent;
});
}
return torrents.map(torrent => this.torrentClient.serializeTorrent(torrent));
}
}

+ 56
- 6
services/torrent-client/src/services/TorrentClient.ts View File

@ -5,8 +5,9 @@ import parseTorrent from "parse-torrent";
import { extname, join, sep } from "path";
import Torrent from "../database/entities/Torrent";
import rimraf from "rimraf";
import { ISerializedTorrent, TorrentState } from "../common";
interface AddOptions {
interface IAddOptions {
downloadPath?: string;
extensions?: string | string[];
}
@ -73,8 +74,13 @@ export default class TorrentClient
protected async loadTorrents() {
let torrents = await Torrent.find();
for (let torrent of torrents) {
let torrentInfo = parseTorrent(torrent.torrentFile);
this.addTorrent(torrentInfo, torrent.downloadPath, torrent.selectedFiles());
try {
let torrentInfo = parseTorrent(torrent.torrentFile);
this.addTorrent(torrentInfo, torrent.downloadPath, torrent.selectedFiles());
} catch(e) {
torrent.remove();
continue;
}
}
}
@ -154,10 +160,10 @@ export default class TorrentClient
/**
* Add a torrent to the client
*/
public async add(info: string | Buffer, options: AddOptions = {}) {
public async add(info: string | Buffer, options: IAddOptions = {}) {
// Parse the torrent
let torrentInfo = parseTorrent(info);
assert(typeof torrentInfo.infoHash === "string");
assert(typeof torrentInfo.infoHash === "string" && torrentInfo.infoHash.length > 0, "Invalid magnet link provided");
// If the torrent already exists, skip it
assert(this.__webtorrent.get(torrentInfo.infoHash) === null, "Torrent has already been added");
@ -179,7 +185,7 @@ export default class TorrentClient
/**
* Remove a torrent from the client
*/
public async remove(info: string | Buffer, withData: false) {
public async remove(info: string | Buffer, withData: boolean = false) {
// Parse the torrent
let torrentInfo = parseTorrent(info);
assert(typeof torrentInfo.infoHash === "string");
@ -196,4 +202,48 @@ export default class TorrentClient
this.removeTorrentFiles(torrent);
}
}
public has(infoHash: string) {
return this.__webtorrent.get(infoHash) != null;
}
public get(infoHash: string) {
return this.__webtorrent.get(infoHash);
}
public torrentState(torrent: WebTorrent.Torrent) {
let state = 0;
state |= <number>(torrent.ready && TorrentState.Ready);
state |= <number>(torrent.paused && TorrentState.Paused);
state |= <number>(torrent.done && TorrentState.Done);
return <TorrentState>state;
}
public serializeTorrent(torrent: WebTorrent.Torrent) {
return <ISerializedTorrent>{
name: torrent.name,
infoHash: torrent.infoHash,
downloaded: torrent.downloaded,
uploaded: torrent.uploaded,
ratio: torrent.ratio,
size: torrent.length,
downloadSpeed: torrent.downloadSpeed,
uploadSpeed: torrent.uploadSpeed,
numPeers: torrent.numPeers,
progress: torrent.progress,
path: torrent.path,
state: this.torrentState(torrent),
files: torrent.files.map(file => Object({
path: file.path,
size: file.length,
downloaded: file.downloaded,
progress: file.progress,
selected: file.isSelected
}))
};
}
get torrents() {
return this.__webtorrent.torrents;
}
}

Loading…
Cancel
Save