Browse Source

Use new IPC package and add torrent client IPC

dev
David Ludwig 4 years ago
parent
commit
9661d9a261
7 changed files with 7 additions and 126 deletions
  1. +0
    -3
      services/seeker/.env.example
  2. +1
    -0
      services/seeker/package.json
  3. +1
    -1
      services/seeker/src/services/IpcInterface.ts
  4. +1
    -1
      services/seeker/src/services/PostProcessor/PostProcessor.ts
  5. +1
    -1
      services/seeker/src/services/Supervisor.ts
  6. +2
    -116
      services/seeker/src/services/TorrentManager/TorrentClientIpc.ts
  7. +1
    -4
      services/seeker/src/services/TorrentManager/TorrentManager.ts

+ 0
- 3
services/seeker/.env.example View File

@ -9,6 +9,3 @@ DB_PORT = 3306
DB_USER = root DB_USER = root
DB_PASSWORD_FILE = /run/secrets/mysql_root_password DB_PASSWORD_FILE = /run/secrets/mysql_root_password
DB_DATABASE = autoplex_request DB_DATABASE = autoplex_request
# Torrent client IPC socket path
TORRENT_CLIENT_IPC_SOCKET = /var/autoplex/ipc/torrent_client.sock

+ 1
- 0
services/seeker/package.json View File

@ -21,6 +21,7 @@
"typescript": "^4.2.4" "typescript": "^4.2.4"
}, },
"dependencies": { "dependencies": {
"@autoplex-api/torrent": "^0.0.0",
"@autoplex/database": "^0.0.0", "@autoplex/database": "^0.0.0",
"@autoplex/microservice": "^0.0.0", "@autoplex/microservice": "^0.0.0",
"@autoplex/utils": "^0.0.0", "@autoplex/utils": "^0.0.0",


+ 1
- 1
services/seeker/src/services/IpcInterface.ts View File

@ -1,4 +1,4 @@
import { IpcServerService } from "@autoplex/microservice";
import { IpcServerService } from "@autoplex/ipc";
import Application from "../Application"; import Application from "../Application";
import Supervisor from "./Supervisor"; import Supervisor from "./Supervisor";
import { MovieTicket } from "@autoplex/database"; import { MovieTicket } from "@autoplex/database";


+ 1
- 1
services/seeker/src/services/PostProcessor/PostProcessor.ts View File

@ -2,7 +2,7 @@ import { link, mkdir } from "fs/promises";
import { dirname, extname } from "path"; import { dirname, extname } from "path";
import Application from "../../Application"; import Application from "../../Application";
import { MovieTicket, MovieTorrent } from "@autoplex/database"; import { MovieTicket, MovieTorrent } from "@autoplex/database";
import { ISerializedTorrent } from "../TorrentManager/TorrentClientIpc";
import { ISerializedTorrent } from "@autoplex-api/torrent";
import { safeTitleFileName } from "../../utils"; import { safeTitleFileName } from "../../utils";
import Supervisor from "../Supervisor"; import Supervisor from "../Supervisor";
import { InternalService } from "@autoplex/microservice"; import { InternalService } from "@autoplex/microservice";


+ 1
- 1
services/seeker/src/services/Supervisor.ts View File

@ -4,7 +4,7 @@ import MovieSearch from "./MovieSearch";
import PostProcessor from "./PostProcessor"; import PostProcessor from "./PostProcessor";
import { InternalService } from "@autoplex/microservice"; import { InternalService } from "@autoplex/microservice";
import TorrentManager from "./TorrentManager"; import TorrentManager from "./TorrentManager";
import { ISerializedTorrent } from "./TorrentManager/TorrentClientIpc";
import { ISerializedTorrent } from "@autoplex-api/torrent";
export default class Supervisor extends InternalService<Application> export default class Supervisor extends InternalService<Application>
{ {


+ 2
- 116
services/seeker/src/services/TorrentManager/TorrentClientIpc.ts View File

@ -1,80 +1,16 @@
import { Socket } from "net";
import Application from "../../Application"; import Application from "../../Application";
import { IpcClientService } from "@autoplex/microservice";
import { env } from "@autoplex/utils";
import { IpcClient } from "@autoplex-api/torrent";
interface IResponse { interface IResponse {
response?: any, response?: any,
error?: string | Error error?: string | Error
} }
export interface ITorrent {
name: string,
infoHash: string,
progress: number,
state: TorrentState
}
export interface ISerializedFile {
path : string;
size : number;
downloaded: number;
progress : number;
selected : boolean;
}
export interface ISerializedTorrent {
name : string;
infoHash : string;
downloaded : number;
uploaded : number;
ratio : number;
size : number;
downloadSpeed: number;
uploadSpeed : number;
numPeers : number;
progress : number;
path : string;
state : TorrentState;
files : ISerializedFile[];
}
export enum TorrentState {
Ready = 0x1,
Paused = 0x2,
Done = 0x4
}
/**
* A custom error type for torrent client connection errors
*/
export class TorrentClientConnectionError extends Error {
constructor(...args: any[]) {
super(...args);
Object.setPrototypeOf(this, TorrentClientConnectionError.prototype);
}
}
/** /**
* The torrent client IPC service * The torrent client IPC service
*/ */
export default abstract class TorrentClientIpc extends IpcClientService<Application>
export default abstract class TorrentClientIpc extends IpcClient<Application>
{ {
/**
* The path to the socket file
*/
public readonly SOCKET_PATH = env("TORRENT_CLIENT_IPC_SOCKET");
/**
* Install the event handlers for the IPC socket
*/
protected installSocketEventHandlers(socket: Socket) {
socket.on("connect", () => this.onConnect());
socket.on("error", (error: any) => this.onError(error));
socket.on("disconnect", () => this.onDisconnect());
socket.on("destroy", () => this.onDestroy());
}
/** /**
* Install the message event handlers * Install the message event handlers
*/ */
@ -86,54 +22,4 @@ export default abstract class TorrentClientIpc extends IpcClientService<Applicat
* Invoked when a torrent has finished downloading * Invoked when a torrent has finished downloading
*/ */
protected async onTorrentFinished(infoHash: string) {} protected async onTorrentFinished(infoHash: string) {}
// Methods -------------------------------------------------------------------------------------
/**
* Add a torrent to the client
* @param torrent Magnet URI or file buffer
*/
protected async add(torrent: string | Buffer, downloadPath?: string) {
let response = await this.request("add", { torrent, downloadPath });
if (response.error) {
throw new Error("Failed to add torrent");
}
return <string>response.data;
}
/**
* Remove a torrent from the client
* @param torrent Torrent info hash
*/
protected async remove(torrent: string) {
let response = await this.request("remove", torrent);
if (response.error) {
throw new Error("Failed to remove torrent");
}
}
/**
* Get a list of all torrents in the client
*/
protected async list() {
let response = await this.request("list");
if (response.error) {
console.error(response.error);
throw new Error("Failed to obtain torrent list");
}
return <ITorrent[]>response.data;
}
/**
* Get full details of each of the provided torrents
* @param torrentIds Array of torrent info hashes
*/
protected async details(...torrentIds: string[]) {
let response = await this.request("details", torrentIds);
if (response.error) {
console.error(response.error);
throw new Error("Failed to retrieve torrent details");
}
return <ISerializedTorrent[]>response.data;
}
} }

+ 1
- 4
services/seeker/src/services/TorrentManager/TorrentManager.ts View File

@ -2,7 +2,7 @@ import diskusage from "diskusage";
import { readdir } from "fs/promises"; import { readdir } from "fs/promises";
import { MovieTicket, MovieTorrent } from "@autoplex/database"; import { MovieTicket, MovieTorrent } from "@autoplex/database";
import Supervisor from "../Supervisor"; import Supervisor from "../Supervisor";
import TorrentClientIpc, { TorrentClientConnectionError } from "./TorrentClientIpc"
import TorrentClientIpc from "./TorrentClientIpc"
interface IPendingMovieTorrent { interface IPendingMovieTorrent {
link: string, link: string,
@ -122,9 +122,6 @@ export default class TorrentManager extends TorrentClientIpc
torrent.movieTicket = movie; torrent.movieTicket = movie;
await torrent.save(); await torrent.save();
} catch(e) { } catch(e) {
if (e instanceof TorrentClientConnectionError) {
throw e;
}
console.log("Failed download the torrent"); console.log("Failed download the torrent");
return false; return false;
} }


Loading…
Cancel
Save