Browse Source

Basic torrent client prototype success

staging
David Ludwig 4 years ago
parent
commit
16cc735049
11 changed files with 655 additions and 0 deletions
  1. +8
    -0
      services/torrent-client/.env.example
  2. +7
    -0
      services/torrent-client/ormconfig.json
  3. +38
    -0
      services/torrent-client/src/Application.ts
  4. +73
    -0
      services/torrent-client/src/database/entities/Torrent.ts
  5. +5
    -0
      services/torrent-client/src/database/entities/index.ts
  6. +17
    -0
      services/torrent-client/src/database/index.ts
  7. +7
    -0
      services/torrent-client/src/index.ts
  8. +199
    -0
      services/torrent-client/src/services/TorrentClient.ts
  9. +31
    -0
      services/torrent-client/src/typings/magnet-uri/index.d.ts
  10. +38
    -0
      services/torrent-client/src/typings/rimraf/index.t.ts
  11. +232
    -0
      services/torrent-client/src/typings/webtorrent-hybrid/index.d.ts

+ 8
- 0
services/torrent-client/.env.example View File

@ -0,0 +1,8 @@
DB_TYPE = mysql
DB_HOST = database
DB_PORT = 3306
DB_USER = root
DB_PASSWORD_FILE = /run/secrets/mysql_root_password
DB_DATABASE = autoplex_torrent
DEFAULT_STORAGE_PATH = ./.data

+ 7
- 0
services/torrent-client/ormconfig.json View File

@ -0,0 +1,7 @@
{
"cli": {
"entitiesDir": "src/database/entities",
"migrationsDir": "src/database/migrations",
"subscribersDir": "src/database/subscribers"
}
}

+ 38
- 0
services/torrent-client/src/Application.ts View File

@ -0,0 +1,38 @@
import { Equal } from "typeorm";
import connectToDatabase from "./database";
import TorrentClient from "./services/TorrentClient";
export default class Application
{
/**
* The torrent client instance
*/
private __client: TorrentClient;
/**
* Create the application
*/
public constructor() {
this.__client = new TorrentClient();
}
/**
* Boot the application services
*/
private async boot() {
await connectToDatabase();
await this.__client.boot();
}
/**
* Start and run the application
*/
public async exec() {
await this.boot();
console.log("Torrent client ready");
}
public get client() {
return this.__client;
}
}

+ 73
- 0
services/torrent-client/src/database/entities/Torrent.ts View File

@ -0,0 +1,73 @@
import { Entity, PrimaryGeneratedColumn, Column, BaseEntity, ManyToOne } from "typeorm";
import WebTorrent from "webtorrent-hybrid";
@Entity()
export default class Torrent extends BaseEntity
{
static fromWebTorrent(torrent: WebTorrent.Torrent) {
let entity = new Torrent();
entity.infoHash = torrent.infoHash;
entity.torrentFile = torrent.torrentFile;
entity.setSelectedFiles(torrent.files);
entity.downloadPath = torrent.path;
return entity;
}
@PrimaryGeneratedColumn()
id!: number;
@Column()
infoHash!: string;
@Column("blob")
torrentFile!: Buffer;
@Column({nullable: true})
selectOnly?: string;
@Column()
downloadPath!: string;
/**
* Get the list of selected files
*/
selectedFiles() {
let result: number[] = [];
for (let range of (this.selectOnly ?? "").split(',')) {
// @TODO Check this map function... it's weird
let [index, end] = range.split('-').map(num => parseInt(num));
do {
result.push(index);
} while(index++ < end);
}
return result;
}
/**
* Update the selected files from the torrent
*/
setSelectedFiles(files: WebTorrent.TorrentFile[]) {
let ranges: string[] = [];
let offset: number | null = null;
let range: number = 0;
files.forEach((file, i) => {
if (file.isSelected) {
if (offset === null) {
offset = i;
range = 0;
} else {
range++;
}
} else {
if (offset !== null) {
ranges.push(offset + (range > 0 ? `-${offset + range}` : ""));
offset = null;
}
}
});
if (offset !== null) {
ranges.push(offset + (range > 0 ? `-${offset + range}` : ""));
}
this.selectOnly = ranges.join(',');
}
}

+ 5
- 0
services/torrent-client/src/database/entities/index.ts View File

@ -0,0 +1,5 @@
import Torrent from "./Torrent";
export default [
Torrent
];

+ 17
- 0
services/torrent-client/src/database/index.ts View File

@ -0,0 +1,17 @@
import { readFileSync } from "fs";
import { createConnection } from "typeorm";
import entities from "./entities";
export default async function connectToDatabase() {
return createConnection({
type : <"mysql" | "mariadb">process.env["DB_TYPE"],
host : process.env["DB_HOST"],
port : parseInt(<string>process.env["DB_PORT"]),
username : process.env["DB_USER"],
password : readFileSync(<string>process.env["DB_PASSWORD_FILE"]).toString().trim(),
database : "autoplex_torrent",
synchronize: process.env["NODE_ENV"] != "production",
entities,
migrations: ["src/migrations/*.ts"]
});
}

+ 7
- 0
services/torrent-client/src/index.ts View File

@ -0,0 +1,7 @@
import Application from "./Application";
// Create the application
let app = new Application();
// Execute the app
app.exec();

+ 199
- 0
services/torrent-client/src/services/TorrentClient.ts View File

@ -0,0 +1,199 @@
import assert from "assert";
import MagnetUri from "magnet-uri";
import WebTorrent from "webtorrent-hybrid";
import parseTorrent from "parse-torrent";
import { extname, join, sep } from "path";
import Torrent from "../database/entities/Torrent";
import rimraf from "rimraf";
interface AddOptions {
downloadPath?: string;
extensions?: string | string[];
}
export default class TorrentClient
{
/**
* The current WebTorrent instance (available after boot)
*/
private __webtorrent!: WebTorrent.Instance;
// ---------------------------------------------------------------------------------------------
public async boot() {
this.__webtorrent = new WebTorrent();
this.__webtorrent.on("error", error => this.onError(error));
this.__webtorrent.on("torrent", torrent => {
torrent.on("done", () => this.onTorrentFinish(torrent));
// torrent.on("download", (...args) => this.onTorrentDownload(torrent, ...args));
// torrent.on("upload", (...args) => this.onTorrentUpload(torrent, ...args));
torrent.on("error", (...args) => this.onTorrentError(torrent, ...args));
torrent.on("noPeers", (...args) => this.onTorrentNoPeers(torrent, ...args));
torrent.on("wire", (...args) => this.onTorrentWire(torrent, ...args));
});
await this.loadTorrents();
}
// Event Handling ------------------------------------------------------------------------------
protected onError(error: string | Error) {
}
protected onTorrentError(torrent: WebTorrent.Torrent, error: string | Error) {
console.error(torrent.name, "had an error", error);
Torrent.delete({ infoHash: torrent.infoHash });
}
protected onTorrentFinish(torrent: WebTorrent.Torrent) {
console.log(torrent.name, "finished");
}
/**
* @NOTE: Ignoring these two events for performance purposes
*/
// protected onTorrentDownload(torrent: WebTorrent.Torrent, bytes: number) {}
// protected onTorrentUpload(torrent: WebTorrent.Torrent, bytes: number) {}
protected onTorrentNoPeers(torrent: WebTorrent.Torrent, announceTypes: "tracker" | "dht") {
}
protected onTorrentWire(torrent: WebTorrent.Torrent, wire: any, address: string | undefined) {
}
// Torrent Storage -----------------------------------------------------------------------------
/**
* Load the torrents from the database
*/
protected async loadTorrents() {
let torrents = await Torrent.find();
for (let torrent of torrents) {
let torrentInfo = parseTorrent(torrent.torrentFile);
this.addTorrent(torrentInfo, torrent.downloadPath, torrent.selectedFiles());
}
}
/**
* Delete a torrent from storage
*/
protected async deleteTorrent(torrent: WebTorrent.Torrent) {
return await Torrent.delete({ infoHash: torrent.infoHash });
}
/**
* Store a torrent in the database
*/
protected async storeTorrent(torrent: WebTorrent.Torrent) {
return await Torrent.fromWebTorrent(torrent).save();
}
/**
* Delete the download files from a torrent
*/
protected async removeTorrentFiles(torrent: WebTorrent.Torrent) {
return new Promise<void>((resolve, reject) => {
let toRemove = new Set<string>();
torrent.files.forEach(file => {
toRemove.add(file.path.split(sep)[0]);
});
toRemove.forEach(path => {
rimraf(join(torrent.path, path), (err) => {
if (err) {
return reject(err);
}
resolve();
});
});
});
}
// Torrent Handling ----------------------------------------------------------------------------
/**
* Add a torrent to the client
*/
protected addTorrent(torrent: MagnetUri.Instance, downloadPath: string, selectOnly?: number[]) {
// Select only: Select no files by default due to broken selection mechanism in Webtorrent
torrent.so = selectOnly ?? [];
return this.__webtorrent.add(torrent, { path: downloadPath });
}
/**
* Remove a torrent from the client
*/
protected removeTorrent(torrent: WebTorrent.Torrent) {
this.deleteTorrent(torrent);
this.__webtorrent.remove(torrent);
}
/**
* Select the initial list of files to download on a newly added torrent
*/
protected filterSelectFiles(torrent: WebTorrent.Torrent, extensions?: string | string[]) {
if (extensions === undefined) {
torrent.files.forEach(file => file.select());
return;
}
let exts = new Set(typeof extensions === "string" ? extensions.split(',') : extensions);
torrent.files.forEach(file => {
if (exts.has(extname(file.path).slice(1))) {
file.select();
} else {
file.deselect();
}
});
}
// Torrent Client Interface --------------------------------------------------------------------
/**
* Add a torrent to the client
*/
public async add(info: string | Buffer, options: AddOptions = {}) {
// Parse the torrent
let torrentInfo = parseTorrent(info);
assert(typeof torrentInfo.infoHash === "string");
// If the torrent already exists, skip it
assert(this.__webtorrent.get(torrentInfo.infoHash) === null, "Torrent has already been added");
// Add the torrent to the client
let torrent = this.addTorrent(torrentInfo, options.downloadPath ?? <string>process.env["DEFAULT_STORAGE_PATH"]);
// When the metadata has beened fetched, select the files to download and store the torrent
torrent.once("metadata", () => {
this.filterSelectFiles(torrent, options.extensions);
this.storeTorrent(torrent);
console.log(torrent.path);
});
return torrent;
}
/**
* Remove a torrent from the client
*/
public async remove(info: string | Buffer, withData: false) {
// Parse the torrent
let torrentInfo = parseTorrent(info);
assert(typeof torrentInfo.infoHash === "string");
// Get the torrent and ensure it exists it exists
let torrent = this.__webtorrent.get(torrentInfo.infoHash);
assert(torrent !== undefined, "Torrent has not been added");
// Remove the torrent
this.removeTorrent(torrent);
// Delete data if necessary
if (withData) {
this.removeTorrentFiles(torrent);
}
}
}

+ 31
- 0
services/torrent-client/src/typings/magnet-uri/index.d.ts View File

@ -0,0 +1,31 @@
declare const MagnetUri: MagnetUri.MagnetUri;
declare module "magnet-uri" {
declare namespace MagnetUri {
interface MagnetUri {
(uri: string): Instance;
decode(uri: string): Instance;
encode(parsed: Instance): string;
}
interface Instance extends Object {
dn?: string | string[];
tr?: string | string[];
xs?: string | string[];
as?: string | string[];
ws?: string | string[];
kt?: string[];
ix?: number | number[];
xt?: string | string[];
so?: number[];
infoHash?: string;
infoHashBuffer?: Buffer;
name?: string | string[];
keywords?: string | string[];
announce?: string[];
urlList?: string[];
}
}
export = MagnetUri;
}

+ 38
- 0
services/torrent-client/src/typings/rimraf/index.t.ts View File

@ -0,0 +1,38 @@
import * as glob from "glob";
import * as fs from "fs";
declare function rimraf(path: string, options: rimraf.Options, callback: (error?: Error) => void): void;
declare function rimraf(path: string, callback: (error?: Error) => void): void;
declare namespace rimraf {
/**
* It can remove stuff synchronously, too.
* But that's not so good. Use the async API.
* It's better.
*/
function sync(path: string, options?: Options): void;
/**
* see {@link https://github.com/isaacs/rimraf/blob/79b933fb362b2c51bedfa448be848e1d7ed32d7e/README.md#options}
*/
interface Options {
maxBusyTries?: number;
emfileWait?: number;
/** @default false */
disableGlob?: boolean;
glob?: glob.IOptions | false;
unlink?: typeof fs.unlink;
chmod?: typeof fs.chmod;
stat?: typeof fs.stat;
lstat?: typeof fs.lstat;
rmdir?: typeof fs.rmdir;
readdir?: typeof fs.readdir;
unlinkSync?: typeof fs.unlinkSync;
chmodSync?: typeof fs.chmodSync;
statSync?: typeof fs.statSync;
lstatSync?: typeof fs.lstatSync;
rmdirSync?: typeof fs.rmdirSync;
readdirSync?: typeof fs.readdirSync;
}
}
export = rimraf;

+ 232
- 0
services/torrent-client/src/typings/webtorrent-hybrid/index.d.ts View File

@ -0,0 +1,232 @@
// declare module "webtorrent-hybrid" {
// declare class WebTorrent {
// }
// declare const WebTorrent: WebTorrent.WebTorrent;
// declare namespace WebTorrent {
// interface WebTorrent {
// new (config?: Options): Instance;
// (config?: Options): Instance;
// WEBRTC_SUPPORT: boolean;
// }
// }
// export = WebTorrent;
// }
// Type definitions for WebTorrent 0.109
// Project: https://github.com/feross/webtorrent, https://webtorrent.io
// Definitions by: Bazyli Brzóska <https://github.com/niieani>
// Tomasz Łaziuk <https://github.com/tlaziuk>
// Gabriel Juchault <https://github.com/gjuchault>
// Adam Crowder <https://github.com/cheeseandcereal>
// Definitions: https://github.com/DefinitelyTyped/DefinitelyTyped
/// <reference types="node" />
declare module "webtorrent-hybrid" {
import { Instance as MagnetUri } from 'magnet-uri';
import { Instance as ParseTorrent } from 'parse-torrent';
import { Instance as SimplePeer } from 'simple-peer';
import { RequestOptions, Server } from 'http';
import { Wire } from 'bittorrent-protocol';
declare const WebTorrent: WebTorrent.WebTorrent;
declare namespace WebTorrent {
interface WebTorrent {
new (config?: Options): Instance;
(config?: Options): Instance;
WEBRTC_SUPPORT: boolean;
}
interface Options {
maxConns?: number;
nodeId?: string | Buffer;
peerId?: string | Buffer;
tracker?: boolean | {};
dht?: boolean | {};
webSeeds?: boolean;
utp?: boolean;
}
interface TorrentOptions {
announce?: any[];
getAnnounceOpts?(): void;
maxWebConns?: number;
path?: string;
store?(chunkLength: number, storeOpts: { length: number, files: File[], torrent: Torrent, }): any;
private?: boolean;
}
interface TorrentDestroyOptions {
destroyStore?: boolean;
}
interface Instance extends NodeJS.EventEmitter {
on(event: 'torrent', callback: (torrent: Torrent) => void): this;
on(event: 'error', callback: (err: Error | string) => void): this;
add(torrent: string | Buffer | File | ParseTorrent | MagnetUri, opts?: TorrentOptions, cb?: (torrent: Torrent) => any): Torrent;
add(torrent: string | Buffer | File | ParseTorrent | MagnetUri, cb?: (torrent: Torrent) => any): Torrent;
seed(input: string | string[] | File | File[] | FileList | Buffer | Buffer[] | NodeJS.ReadableStream | NodeJS.ReadableStream[], opts?: TorrentOptions, cb?: (torrent: Torrent) => any): Torrent;
seed(input: string | string[] | File | File[] | FileList | Buffer | Buffer[] | NodeJS.ReadableStream | NodeJS.ReadableStream[], cb?: (torrent: Torrent) => any): Torrent;
remove(torrentId: Torrent | string | Buffer, opts?: TorrentDestroyOptions, callback?: (err: Error | string) => void): void;
destroy(callback?: (err: Error | string) => void): void;
readonly torrents: Torrent[];
get(torrentId: Torrent | string | Buffer): Torrent | void;
readonly downloadSpeed: number;
readonly uploadSpeed: number;
readonly progress: number;
readonly ratio: number;
}
interface Torrent extends NodeJS.EventEmitter {
readonly infoHash: string;
readonly magnetURI: string;
readonly torrentFile: Buffer;
readonly torrentFileBlobURL: string;
readonly files: TorrentFile[];
readonly announce: string[];
readonly pieces: Array<TorrentPiece | null>;
readonly timeRemaining: number;
readonly received: number;
readonly downloaded: number;
readonly uploaded: number;
readonly downloadSpeed: number;
readonly uploadSpeed: number;
readonly progress: number;
readonly ratio: number;
readonly length: number;
readonly pieceLength: number;
readonly lastPieceLength: number;
readonly numPeers: number;
readonly path: string;
readonly ready: boolean;
readonly paused: boolean;
readonly done: boolean;
readonly name: string;
readonly created: Date;
readonly createdBy: string;
readonly comment: string;
readonly maxWebConns: number;
destroy(opts?: TorrentDestroyOptions, cb?: (err: Error | string) => void): void;
addPeer(peer: string | SimplePeer): boolean;
addWebSeed(url: string): void;
removePeer(peer: string | SimplePeer): void;
select(start: number, end: number, priority?: number, notify?: () => void): void;
deselect(start: number, end: number, priority: number): void;
createServer(opts?: RequestOptions): Server;
pause(): void;
resume(): void;
rescanFiles(callback?: (err: Error | string | null) => void): void;
on(event: 'infoHash' | 'metadata' | 'ready' | 'done', callback: () => void): this;
on(event: 'warning' | 'error', callback: (err: Error | string) => void): this;
on(event: 'download' | 'upload', callback: (bytes: number) => void): this;
on(event: 'wire', callback: (wire: Wire, addr?: string) => void): this;
on(event: 'noPeers', callback: (announceType: 'tracker' | 'dht') => void): this;
}
interface TorrentFile extends NodeJS.EventEmitter {
// Custom property to check if a file is selected
readonly isSelected: boolean;
readonly name: string;
readonly path: string;
readonly length: number;
readonly downloaded: number;
readonly progress: number;
select(): void;
deselect(): void;
createReadStream(opts?: { start: number, end: number, }): NodeJS.ReadableStream;
getBuffer(callback: (err: string | Error | undefined, buffer?: Buffer) => void): void;
appendTo(
rootElement: HTMLElement | string,
opts?: { autoplay?: boolean, controls?: boolean, maxBlobLength?: number },
callback?: (err: Error | undefined, element: HTMLMediaElement) => void): void;
appendTo(rootElement: HTMLElement | string, callback?: (err: Error | undefined, element: HTMLMediaElement) => void): void;
renderTo(
rootElement: HTMLMediaElement | string,
opts?: { autoplay?: boolean, controls?: boolean, maxBlobLength?: number },
callback?: (err: Error | undefined, element: HTMLMediaElement) => void): void;
renderTo(rootElement: HTMLMediaElement | string, callback?: (err: Error | undefined, element: HTMLMediaElement) => void): void;
getBlob(callback: (err: string | Error | undefined, blob?: Blob) => void): void;
getBlobURL(callback: (err: string | Error | undefined, blobURL?: string) => void): void;
}
interface TorrentPiece {
readonly length: number;
readonly missing: number;
}
}
export = WebTorrent;
}

Loading…
Cancel
Save