Browse Source

Add progress handling to Seeker

dev
David Ludwig 4 years ago
parent
commit
62d1b80376
2 changed files with 96 additions and 0 deletions
  1. +35
    -0
      services/seeker/src/services/IpcInterface.ts
  2. +61
    -0
      services/seeker/src/services/TorrentManager.ts

+ 35
- 0
services/seeker/src/services/IpcInterface.ts View File

@ -2,6 +2,7 @@ import { SOCKET_PATH } from "@autoplex-api/seeker";
import { IpcServerService } from "@autoplex/ipc"; import { IpcServerService } from "@autoplex/ipc";
import { MovieTicket } from "@autoplex/database"; import { MovieTicket } from "@autoplex/database";
import Supervisor from "./Supervisor"; import Supervisor from "./Supervisor";
import TorrentManager from "./TorrentManager";
export default class IpcInterface extends IpcServerService export default class IpcInterface extends IpcServerService
{ {
@ -15,11 +16,24 @@ export default class IpcInterface extends IpcServerService
*/ */
public readonly SOCKET_PATH = SOCKET_PATH; public readonly SOCKET_PATH = SOCKET_PATH;
/**
* Store a reference to the torrent client IPC
*/
protected torrentManager!: TorrentManager;
/** /**
* Install the the event handlers * Install the the event handlers
*/ */
protected override installMessageHandlers() { protected override installMessageHandlers() {
this.addMessageHandler("movie_ticket_added", this.onMovieTicketAdded); this.addMessageHandler("movie_ticket_added", this.onMovieTicketAdded);
this.addMessageHandler("movie_ticket_states", this.getMovieTicketStates);
}
/**
* Link the required services
*/
public override link() {
this.torrentManager = this.app.service<TorrentManager>("Torrent Manager");
} }
// Interface Methods --------------------------------------------------------------------------- // Interface Methods ---------------------------------------------------------------------------
@ -34,4 +48,25 @@ export default class IpcInterface extends IpcServerService
} }
this.app.service<Supervisor>("Supervisor").onMovieTicketNeedsTorrent(movie); this.app.service<Supervisor>("Supervisor").onMovieTicketNeedsTorrent(movie);
} }
/**
* Get the states of the provided movie tickets
*/
protected async getMovieTicketStates(ticketIds: number[]) {
let tickets = await MovieTicket.findByIds(ticketIds, { relations: ["torrents"] });
let torrents = this.torrentManager.cachedTorrents;
let results: any = {};
for (let ticket of tickets) {
let result: any = {};
if (ticket.isCanceled) {
continue;
}
if (ticket.torrents.length > 0) {
let progressList = ticket.torrents.map(torrent => torrents[torrent.infoHash].progress);
result["progress"] = Math.max(...progressList);
}
results[ticket.id] = result;
}
return results;
}
} }

+ 61
- 0
services/seeker/src/services/TorrentManager.ts View File

@ -1,6 +1,8 @@
import { ITorrent } from "@autoplex-api/torrent";
import { ITorrentLink } from "@autoplex-api/torrent-search"; import { ITorrentLink } from "@autoplex-api/torrent-search";
import { MovieTicket, MovieTorrent } from "@autoplex/database"; import { MovieTicket, MovieTorrent } from "@autoplex/database";
import { InternalService, Microservice } from "@autoplex/microservice"; import { InternalService, Microservice } from "@autoplex/microservice";
import { sleep } from "@autoplex/utils";
import diskusage from "diskusage"; import diskusage from "diskusage";
import { readdir } from "fs/promises"; import { readdir } from "fs/promises";
import Supervisor from "./Supervisor"; import Supervisor from "./Supervisor";
@ -46,6 +48,16 @@ export default class TorrentManager extends InternalService
*/ */
protected torrentIpc!: TorrentIpc; protected torrentIpc!: TorrentIpc;
/**
* Cache torrents for quick reference
*/
private __cachedTorrents: { [id: string]: ITorrent} = {};
/**
* Indicate if the service is running
*/
private __isRunning: boolean = false;
/** /**
* Boot the Torrent Manager service * Boot the Torrent Manager service
*/ */
@ -67,6 +79,16 @@ export default class TorrentManager extends InternalService
*/ */
public override start() { public override start() {
this.torrentIpc.on("connected", this.onConnect.bind(this)); this.torrentIpc.on("connected", this.onConnect.bind(this));
this.__isRunning = true;
this.updateCacheLoop();
}
/**
* Shutdown the service
*/
public override shutdown() {
this.__isRunning = false;
return super.shutdown();
} }
// Interface methods --------------------------------------------------------------------------- // Interface methods ---------------------------------------------------------------------------
@ -153,6 +175,36 @@ export default class TorrentManager extends InternalService
return true; return true;
} }
// Torrent Management --------------------------------------------------------------------------
/**
* Update the torrent cache as long as the service is running
*/
protected async updateCacheLoop() {
const THROTTLE = 500;
let lastUpdate = 0;
while (this.__isRunning) {
await sleep(THROTTLE - (Date.now() - lastUpdate));
lastUpdate = Date.now();
await this.updateTorrentCache();
}
}
/**
* Update the torrent cache
*/
protected async updateTorrentCache() {
try {
let torrents = await this.torrentIpc.list();
this.__cachedTorrents = {};
for (let torrent of torrents) {
this.__cachedTorrents[torrent.infoHash] = torrent;
}
} catch(e) {
console.log("Failed to update torrent cache");
}
}
// Event Handling ------------------------------------------------------------------------------ // Event Handling ------------------------------------------------------------------------------
/** /**
@ -175,4 +227,13 @@ export default class TorrentManager extends InternalService
let details = (await this.torrentIpc.details([infoHash]))[0]; let details = (await this.torrentIpc.details([infoHash]))[0];
this.app.service<Supervisor>("Supervisor").onMovieTorrentFinished(torrent, details); this.app.service<Supervisor>("Supervisor").onMovieTorrentFinished(torrent, details);
} }
// Accessors -----------------------------------------------------------------------------------
/**
* Get the list of cached torrents
*/
public get cachedTorrents() {
return this.__cachedTorrents;
}
} }

Loading…
Cancel
Save