Browse Source

Grouped IPC services and fixed logging issues/improved error reporting

master
David Ludwig 4 years ago
parent
commit
ee762a96de
6 changed files with 133 additions and 200 deletions
  1. +42
    -31
      src/server/services/Ipc/IpcClient.ts
  2. +23
    -0
      src/server/services/Ipc/SeekerIpcClient.ts
  3. +63
    -0
      src/server/services/Ipc/TorrentIpcClient.ts
  4. +0
    -164
      src/server/services/TorrentClientIpc.ts
  5. +1
    -1
      src/server/services/WebServer/routes/api.ts
  6. +4
    -4
      src/server/services/index.ts

src/server/services/SeekerIpc.ts → src/server/services/Ipc/IpcClient.ts View File


+ 23
- 0
src/server/services/Ipc/SeekerIpcClient.ts View File

@ -0,0 +1,23 @@
import IpcClient from "./IpcClient";
import Application from "../../Application";
export default class SeekerIpcClient extends IpcClient
{
/**
* Create a new IPC client for the Seeker
*/
constructor(app: Application) {
super("Seeker IPC", app, "request", "seeker");
}
// Methods -------------------------------------------------------------------------------------
/**
* Notify Seeker that a movie was added
*/
public notifyMovieRequested(ticketId: number) {
this.request("search_movie", ticketId).catch((e) => {
this.log("No response from seeker notifying added movie");
});
}
}

+ 63
- 0
src/server/services/Ipc/TorrentIpcClient.ts View File

@ -0,0 +1,63 @@
import { ISerializedTorrent, ITorrent } from "../../common";
import Application from "../../Application";
import IpcClient from "./IpcClient";
export default class TorrentIpcClient extends IpcClient
{
/**
* Create a new IPC client for the torrent client
*/
constructor(app: Application) {
super("Torrent IPC", app, "request", "torrent_client");
}
// Methods -------------------------------------------------------------------------------------
/**
* Add a torrent to the client
* @param torrent Magnet URI or file buffer
*/
public async add(torrent: string | Buffer) {
let response = await this.request("add", torrent);
if (response.error) {
throw new Error("Failed to add torrent");
}
return <string>response.response;
}
/**
* Remove a torrent from the client
* @param torrent Torrent info hash
*/
public async remove(torrent: string) {
let response = await this.request("remove", torrent);
if (response.error) {
throw new Error("Failed to remove torrent");
}
}
/**
* Get a list of all torrents in the client
*/
public async list() {
let response = await this.request("list");
if (response.error) {
console.error(response.error);
throw new Error("Failed to obtain torrent list");
}
return <ITorrent[]>response.response;
}
/**
* Get full details of each of the provided torrents
* @param torrentIds Array of torrent info hashes
*/
public async details(...torrentIds: string[]) {
let response = await this.request("details", torrentIds);
if (response.error) {
console.error(response.error);
throw new Error("Failed to retrieve torrent details");
}
return <ISerializedTorrent[]>response.response;
}
}

+ 0
- 164
src/server/services/TorrentClientIpc.ts View File

@ -1,164 +0,0 @@
import ipc from "node-ipc";
import { Socket } from "net";
import { ISerializedTorrent, ITorrent } from "../common";
import Service from "./Service";
import Application from "../Application";
interface IResponse {
response?: any,
error?: string | Error
}
export default class TorrentClientIpc extends Service
{
/**
* Indicate if there is an active connection to the IPC
*/
private __isConnected: boolean;
/**
* The active IPC socket
*/
protected socket!: Socket;
/**
* Create a new IPC client for the torrent client
*/
constructor(app: Application) {
super("Torrent Client IPC", app);
ipc.config.id = "request";
ipc.config.retry = 1500;
ipc.config.silent = true;
this.__isConnected = false;
}
/**
* Boot the torrent client IPC service
*/
public boot() {
return new Promise<void>((resolve, reject) => {
ipc.connectTo("torrent_client", process.env["TORRENT_CLIENT_IPC_SOCKET"], () => {
this.socket = ipc.of["torrent_client"];
this.installSocketEventHandlers(this.socket);
this.installSocketMessageHandlers(this.socket);
resolve();
});
});
}
public async shutdown() {
ipc.disconnect("torrent_client");
}
/**
* Install the event handlers for the IPC socket
*/
protected installSocketEventHandlers(socket: Socket) {
socket.on("connect", () => this.onConnect());
socket.on("error", (error: any) => this.onError(error));
socket.on("disconnect", () => this.onDisconnect());
socket.on("destroy", () => this.onDestroy());
}
protected installSocketMessageHandlers(socket: Socket) {
}
// Socket Event Handlers -----------------------------------------------------------------------
protected onConnect() {
this.log("IPC: Connection established");
this.__isConnected = true;
}
protected onError(error: string | Error) {
this.log("IPC: Error occurred:", error);
}
protected onDisconnect() {
this.log("IPC: Disconnected");
this.__isConnected = false;
}
protected onDestroy() {
this.log("IPC: Destroyed");
}
// Methods -------------------------------------------------------------------------------------
/**
* Perform a general request to the torrent client
*/
protected async request(method: string, message?: any) {
return new Promise<IResponse>((resolve, reject) => {
if (!this.isConnected) {
throw new Error("Not connected to torrent client");
}
let respond = (response: any) => {
clearTimeout(timeout);
resolve(response);
}
// Include timeout mechanism in the off chance something breaks
let timeout = setTimeout(() => {
this.socket.off(method, respond);
reject("Torrent client IPC request timeout")
}, 1000);
this.socket.once(method, respond);
this.socket.emit(method, message);
});
}
/**
* Add a torrent to the client
* @param torrent Magnet URI or file buffer
*/
public async add(torrent: string | Buffer) {
let response = await this.request("add", torrent);
if (response.error) {
throw new Error("Failed to add torrent");
}
return <string>response.response;
}
/**
* Remove a torrent from the client
* @param torrent Torrent info hash
*/
public async remove(torrent: string) {
let response = await this.request("remove", torrent);
if (response.error) {
throw new Error("Failed to remove torrent");
}
}
/**
* Get a list of all torrents in the client
*/
public async list() {
let response = await this.request("list");
if (response.error) {
console.error(response.error);
throw new Error("Failed to obtain torrent list");
}
return <ITorrent[]>response.response;
}
/**
* Get full details of each of the provided torrents
* @param torrentIds Array of torrent info hashes
*/
public async details(...torrentIds: string[]) {
let response = await this.request("details", torrentIds);
if (response.error) {
console.error(response.error);
throw new Error("Failed to retrieve torrent details");
}
return <ISerializedTorrent[]>response.response;
}
// Accessors -----------------------------------------------------------------------------------
get isConnected() {
return this.__isConnected;
}
}

+ 1
- 1
src/server/services/WebServer/routes/api.ts View File

@ -1,5 +1,5 @@
import Application from "@server/Application";
import SeekerIpc from "@server/services/SeekerIpc";
import SeekerIpc from "@server/services/Ipc/SeekerIpcClient";
import MovieSearch from "@server/services/MovieSearch";
import { auth } from "../middleware/auth";
import RouteRegisterFactory from "./RouteRegisterFactory";


+ 4
- 4
src/server/services/index.ts View File

@ -1,8 +1,8 @@
import Database from "./Database";
import DiscordBot from "./DiscordBot";
import MovieSearch from "./MovieSearch";
import SeekerIpc from "./SeekerIpc";
import TorrentClientIpc from "./TorrentClientIpc";
import SeekerIpcClient from "./Ipc/SeekerIpcClient";
import TorrentIpcClient from "./Ipc/TorrentIpcClient";
import TvDb from "./TvDb";
import WebServer from "./WebServer";
@ -10,8 +10,8 @@ export default {
Database,
// DiscordBot,
MovieSearch,
// TorrentClientIpc,
SeekerIpc,
// TorrentIpcClient,
SeekerIpcClient,
TvDb,
WebServer
}

Loading…
Cancel
Save