diff --git a/services/seeker/src/services/IpcInterface.ts b/services/seeker/src/services/IpcInterface.ts index ca0610c..d40fcaf 100644 --- a/services/seeker/src/services/IpcInterface.ts +++ b/services/seeker/src/services/IpcInterface.ts @@ -2,6 +2,7 @@ import { SOCKET_PATH } from "@autoplex-api/seeker"; import { IpcServerService } from "@autoplex/ipc"; import { MovieTicket } from "@autoplex/database"; import Supervisor from "./Supervisor"; +import TorrentManager from "./TorrentManager"; export default class IpcInterface extends IpcServerService { @@ -15,11 +16,24 @@ export default class IpcInterface extends IpcServerService */ public readonly SOCKET_PATH = SOCKET_PATH; + /** + * Store a reference to the torrent client IPC + */ + protected torrentManager!: TorrentManager; + /** * Install the the event handlers */ protected override installMessageHandlers() { this.addMessageHandler("movie_ticket_added", this.onMovieTicketAdded); + this.addMessageHandler("movie_ticket_states", this.getMovieTicketStates); + } + + /** + * Link the required services + */ + public override link() { + this.torrentManager = this.app.service("Torrent Manager"); } // Interface Methods --------------------------------------------------------------------------- @@ -34,4 +48,25 @@ export default class IpcInterface extends IpcServerService } this.app.service("Supervisor").onMovieTicketNeedsTorrent(movie); } + + /** + * Get the states of the provided movie tickets + */ + protected async getMovieTicketStates(ticketIds: number[]) { + let tickets = await MovieTicket.findByIds(ticketIds, { relations: ["torrents"] }); + let torrents = this.torrentManager.cachedTorrents; + let results: any = {}; + for (let ticket of tickets) { + let result: any = {}; + if (ticket.isCanceled) { + continue; + } + if (ticket.torrents.length > 0) { + let progressList = ticket.torrents.map(torrent => torrents[torrent.infoHash].progress); + result["progress"] = Math.max(...progressList); + } + results[ticket.id] = result; + } + return results; + } } diff --git a/services/seeker/src/services/TorrentManager.ts b/services/seeker/src/services/TorrentManager.ts index 79fb561..45f460a 100644 --- a/services/seeker/src/services/TorrentManager.ts +++ b/services/seeker/src/services/TorrentManager.ts @@ -1,6 +1,8 @@ +import { ITorrent } from "@autoplex-api/torrent"; import { ITorrentLink } from "@autoplex-api/torrent-search"; import { MovieTicket, MovieTorrent } from "@autoplex/database"; import { InternalService, Microservice } from "@autoplex/microservice"; +import { sleep } from "@autoplex/utils"; import diskusage from "diskusage"; import { readdir } from "fs/promises"; import Supervisor from "./Supervisor"; @@ -46,6 +48,16 @@ export default class TorrentManager extends InternalService */ protected torrentIpc!: TorrentIpc; + /** + * Cache torrents for quick reference + */ + private __cachedTorrents: { [id: string]: ITorrent} = {}; + + /** + * Indicate if the service is running + */ + private __isRunning: boolean = false; + /** * Boot the Torrent Manager service */ @@ -67,6 +79,16 @@ export default class TorrentManager extends InternalService */ public override start() { this.torrentIpc.on("connected", this.onConnect.bind(this)); + this.__isRunning = true; + this.updateCacheLoop(); + } + + /** + * Shutdown the service + */ + public override shutdown() { + this.__isRunning = false; + return super.shutdown(); } // Interface methods --------------------------------------------------------------------------- @@ -153,6 +175,36 @@ export default class TorrentManager extends InternalService return true; } + // Torrent Management -------------------------------------------------------------------------- + + /** + * Update the torrent cache as long as the service is running + */ + protected async updateCacheLoop() { + const THROTTLE = 500; + let lastUpdate = 0; + while (this.__isRunning) { + await sleep(THROTTLE - (Date.now() - lastUpdate)); + lastUpdate = Date.now(); + await this.updateTorrentCache(); + } + } + + /** + * Update the torrent cache + */ + protected async updateTorrentCache() { + try { + let torrents = await this.torrentIpc.list(); + this.__cachedTorrents = {}; + for (let torrent of torrents) { + this.__cachedTorrents[torrent.infoHash] = torrent; + } + } catch(e) { + console.log("Failed to update torrent cache"); + } + } + // Event Handling ------------------------------------------------------------------------------ /** @@ -175,4 +227,13 @@ export default class TorrentManager extends InternalService let details = (await this.torrentIpc.details([infoHash]))[0]; this.app.service("Supervisor").onMovieTorrentFinished(torrent, details); } + + // Accessors ----------------------------------------------------------------------------------- + + /** + * Get the list of cached torrents + */ + public get cachedTorrents() { + return this.__cachedTorrents; + } }