diff --git a/services/seeker/package.json b/services/seeker/package.json index 06ef41d..5470bf9 100644 --- a/services/seeker/package.json +++ b/services/seeker/package.json @@ -21,7 +21,9 @@ "typescript": "^4.2.4" }, "dependencies": { + "@autoplex-api/seeker": "^0.0.0", "@autoplex-api/torrent": "^0.0.0", + "@autoplex-api/torrent-search": "^0.0.0", "@autoplex/database": "^0.0.0", "@autoplex/ipc": "^0.0.0", "@autoplex/microservice": "^0.0.0", diff --git a/services/seeker/src/Application.ts b/services/seeker/src/Application.ts deleted file mode 100644 index fea5e9e..0000000 --- a/services/seeker/src/Application.ts +++ /dev/null @@ -1,23 +0,0 @@ -import * as services from "./services"; -import { Microservice } from "@autoplex/microservice"; - -/** - * The main application class - */ -export default class Application extends Microservice -{ - /** - * Return the current application instance - */ - public static instance() { return super.instance() } - - /** - * Create a new application instance - */ - public constructor() { - super(); - for (let ServiceClass of Object.values(services)) { - this.installService(ServiceClass); - } - } -} diff --git a/services/seeker/src/index.ts b/services/seeker/src/index.ts index 4f0f3da..550ca6d 100644 --- a/services/seeker/src/index.ts +++ b/services/seeker/src/index.ts @@ -1,7 +1,11 @@ -import Application from "./Application"; +import { Microservice } from "@autoplex/microservice"; +import * as services from "./services"; // Create a new application instance -let app = new Application(); +let app = new Microservice(); + +// Install the internal services +app.installServices(Object.values(services)); // Start the application app.exec().then(process.exit); diff --git a/services/seeker/src/services/IpcInterface.ts b/services/seeker/src/services/IpcInterface.ts index 4b0f329..1c9b7e5 100644 --- a/services/seeker/src/services/IpcInterface.ts +++ b/services/seeker/src/services/IpcInterface.ts @@ -1,10 +1,9 @@ +import { SOCKET_PATH } from "@autoplex-api/seeker"; import { IpcServerService } from "@autoplex/ipc"; -import Application from "../Application"; -import Supervisor from "./Supervisor"; import { MovieTicket } from "@autoplex/database"; -import { env } from "@autoplex/utils"; +import Supervisor from "./Supervisor"; -export default class IpcInterface extends IpcServerService +export default class IpcInterface extends IpcServerService { /** * The service name @@ -14,7 +13,7 @@ export default class IpcInterface extends IpcServerService /** * The path to the socket file */ - public readonly SOCKET_PATH = env("IPC_SOCKET_PATH"); + public readonly SOCKET_PATH = SOCKET_PATH; /** * Install the the event handlers diff --git a/services/seeker/src/services/MovieSearch.ts b/services/seeker/src/services/MovieSearch.ts deleted file mode 100644 index 2353f7e..0000000 --- a/services/seeker/src/services/MovieSearch.ts +++ /dev/null @@ -1,136 +0,0 @@ -import Application from "../Application"; -import { MovieTicket } from "@autoplex/database"; -import * as providerClasses from "../torrents"; -import Provider, { MediaType } from "../torrents/providers/Provider"; -import Torrent from "../torrents/Torrent"; -import { rankTorrents } from "../torrents/ranking"; -import { InternalService } from "@autoplex/microservice"; -import Supervisor from "./Supervisor"; - -export default class MovieSearch extends InternalService -{ - /** - * The queue of current movie tickts requiring torrents - */ - protected movieQueue!: MovieTicket[]; - - /** - * Available providers that support movies - */ - protected providers!: Provider[]; - - /** - * Indicate if the service is currently searching for movies in the queue - */ - protected isSearchingForMovies: boolean; - - /** - * Store a reference to the supervisor service - */ - protected supervisor!: Supervisor; - - /** - * Create a new instance of the movie search service - */ - public constructor(app: Application) { - super(app); - this.movieQueue = []; - this.isSearchingForMovies = false; - } - - /** - * The service name - */ - public readonly NAME = "Movie Search"; - - /** - * Boot the movie search service - */ - public async boot() { - let providers = Object.values(providerClasses); - this.providers = providers.filter(provider => provider.PROVIDES & MediaType.Movies) - .map(ProviderClass => new ProviderClass()); - } - - /** - * Connect to the supervisor instance when everything is booted - */ - public start() { - this.supervisor = this.app.service("Supervisor"); - } - - /** - * Shutdown the service - */ - public async shutdown() { - - } - - // Interface Methods --------------------------------------------------------------------------- - - /** - * Enqueue a movie to the search queue - */ - public enqueueMovie(movie: MovieTicket) { - this.movieQueue.push(movie); - this.searchMovies(); - } - - // Movie Searching ----------------------------------------------------------------------------- - - /** - * Search for movies currently in the queue - */ - protected async searchMovies() { - if (this.isSearchingForMovies) { - return; - } - this.isSearchingForMovies = true; - while (this.movieQueue.length > 0) { - let movie = this.movieQueue.splice(0, 1)[0]; - let link = await this.searchMovie(movie); - if (link === null) { - continue; - } - this.supervisor.onMovieTorrentFound(movie, link); - } - this.isSearchingForMovies = false; - } - - /** - * Search for a movie - */ - protected async searchMovie(movie: MovieTicket) { - // Search by IMDb - let torrents = await this.searchImdb(movie); - if (torrents.length == 0) { - return null; - } - // Determine the preferred torrents - let preferredTorrents = rankTorrents(torrents); - if (preferredTorrents.length == 0) { - preferredTorrents = torrents; - } - // Return the selected torrent - this.log("Found movie torrent for", movie.title); - return await preferredTorrents[0].downloadLink(); - } - - /** - * Search for a movie by its IMDb ID - */ - protected async searchImdb(movie: MovieTicket): Promise { - if (movie.imdbId == null) { - return []; - } - let results = await Promise.all(this.providers.map(provider => provider.searchMovie(movie))); - return ([]).concat(...results); - } - - /** - * Pick the best torrent from the list - */ - protected pickBestTorrent() { - - } -} diff --git a/services/seeker/src/services/PostProcessor/PostProcessor.ts b/services/seeker/src/services/PostProcessor/PostProcessor.ts index 37ccbdd..b3a14dc 100644 --- a/services/seeker/src/services/PostProcessor/PostProcessor.ts +++ b/services/seeker/src/services/PostProcessor/PostProcessor.ts @@ -1,11 +1,10 @@ +import { ISerializedTorrent } from "@autoplex-api/torrent"; +import { MovieTicket, MovieTorrent } from "@autoplex/database"; import { link, mkdir } from "fs/promises"; import { dirname, extname } from "path"; -import Application from "../../Application"; -import { MovieTicket, MovieTorrent } from "@autoplex/database"; -import { ISerializedTorrent } from "@autoplex-api/torrent"; import { safeTitleFileName } from "../../utils"; import Supervisor from "../Supervisor"; -import { InternalService } from "@autoplex/microservice"; +import { InternalService, Microservice } from "@autoplex/microservice"; /** * Common video file extensions @@ -23,7 +22,7 @@ interface IPendingMovieTorrent { ticket : MovieTicket } -export default class PostProcessor extends InternalService +export default class PostProcessor extends InternalService { /** * The queue of movies to process @@ -38,7 +37,7 @@ export default class PostProcessor extends InternalService /** * Create a new instance of the post processor */ - public constructor(app: Application) { + public constructor(app: Microservice) { super(app); this.pendingMovies = []; this.isProcessingMovies = false; diff --git a/services/seeker/src/services/Supervisor.ts b/services/seeker/src/services/Supervisor.ts index 54f1f3d..c31e91b 100644 --- a/services/seeker/src/services/Supervisor.ts +++ b/services/seeker/src/services/Supervisor.ts @@ -1,18 +1,16 @@ -import Application from "../Application"; +import { IpcClient as TorrentSearchIpc, ITorrentLink } from "@autoplex-api/torrent-search"; +import { ISerializedTorrent } from "@autoplex-api/torrent"; import { MovieTicket, MovieTorrent } from "@autoplex/database"; -import MovieSearch from "./MovieSearch"; +import { InternalService, Microservice } from "@autoplex/microservice"; import PostProcessor from "./PostProcessor"; -import { InternalService } from "@autoplex/microservice"; import TorrentManager from "./TorrentManager"; -import { ISerializedTorrent } from "@autoplex-api/torrent"; -export default class Supervisor extends InternalService +export default class Supervisor extends InternalService { - /** * The movie search service instance */ - protected movieSearch!: MovieSearch; + protected torrentSearch!: TorrentSearchIpc; /** * The torrent client IPC service instance @@ -25,24 +23,23 @@ export default class Supervisor extends InternalService protected postProcessor!: PostProcessor; /** - * Create a new supervisor service instance + * The service name */ - public constructor(app: Application) { - super(app); - } + public readonly NAME = "Supervisor"; /** - * The service name + * Link to other internal services */ - public readonly NAME = "Supervisor"; + public link(app: Microservice) { + this.torrentSearch = app.service("Torrent Search"); + this.torrentManager = app.service("Torrent Manager"); + this.postProcessor = app.service("Post Processor"); + } /** * All services are booted and ready */ public start() { - this.movieSearch = this.app.service("Movie Search"); - this.torrentManager = this.app.service("Torrent Manager"); - this.postProcessor = this.app.service("Post Processor"); this.searchMovies(); } @@ -52,15 +49,19 @@ export default class Supervisor extends InternalService * 1. A movie ticket has been added/torrent has gone stale, so * dispatch a search task for a torrent */ - public onMovieTicketNeedsTorrent(ticket: MovieTicket) { - this.log("A movie needs a torrent:", ticket.title); - this.movieSearch.enqueueMovie(ticket); + public async onMovieTicketNeedsTorrent(ticket: MovieTicket) { + let link = await this.torrentSearch.searchMovie(ticket.title, ticket.imdbId, ticket.year); + if (link === null) { + this.log("Could not find a torrent for:", ticket.title); + return; + } + this.onMovieTorrentFound(ticket, link); } /** * 2. A movie torrent has been found, so add it to the client */ - public onMovieTorrentFound(ticket: MovieTicket, link: string) { + public onMovieTorrentFound(ticket: MovieTicket, link: ITorrentLink) { this.log("A torrent was found for movie:", ticket.title); this.torrentManager.enqueueMovie(ticket, link); } @@ -84,18 +85,20 @@ export default class Supervisor extends InternalService // Tasks --------------------------------------------------------------------------------------- /** - * @TODO Performing a promise-all instead of waiting between each movie may be much faster * Search available movies in the database */ public async searchMovies() { - let movies = await MovieTicket.find({where: {isFulfilled: false}, relations: [ "torrents" ]}); - for (let movie of movies) { + let tickets = await MovieTicket.find({ + where: {isFulfilled: false, isCanceled: false}, + relations: [ "torrents" ] + }); + for (let ticket of tickets) { // Skip already-resolved non-stale torrents - if (movie.torrents.length > 0 && !movie.isStale) { + if (ticket.torrents.length > 0 && !ticket.isStale) { this.log("Skipping already satisfied ticket") continue; } - this.onMovieTicketNeedsTorrent(movie); + this.onMovieTicketNeedsTorrent(ticket); } } } diff --git a/services/seeker/src/services/TorrentIpc.ts b/services/seeker/src/services/TorrentIpc.ts index fdfd548..b2cb777 100644 --- a/services/seeker/src/services/TorrentIpc.ts +++ b/services/seeker/src/services/TorrentIpc.ts @@ -1,4 +1,3 @@ -import Application from "../Application"; import { IpcClient } from "@autoplex-api/torrent"; interface IResponse { @@ -24,7 +23,7 @@ declare interface TorrentIpc { /** * The torrent client IPC service */ -class TorrentIpc extends IpcClient +class TorrentIpc extends IpcClient { /** * Install the message event handlers diff --git a/services/seeker/src/services/TorrentManager.ts b/services/seeker/src/services/TorrentManager.ts index 09fcd5f..26d6276 100644 --- a/services/seeker/src/services/TorrentManager.ts +++ b/services/seeker/src/services/TorrentManager.ts @@ -1,9 +1,9 @@ +import { ITorrentLink } from "@autoplex-api/torrent-search"; +import { MovieTicket, MovieTorrent } from "@autoplex/database"; +import { InternalService, Microservice } from "@autoplex/microservice"; import diskusage from "diskusage"; import { readdir } from "fs/promises"; -import { MovieTicket, MovieTorrent } from "@autoplex/database"; import Supervisor from "./Supervisor"; -import { InternalService } from "@autoplex/microservice"; -import Application from "../Application"; import TorrentIpc from "./TorrentIpc"; interface IPendingMovieTorrent { @@ -19,7 +19,7 @@ interface IDiskMap { // tvshows: string[] } -export default class TorrentManager extends InternalService +export default class TorrentManager extends InternalService { /** * The service name @@ -55,11 +55,17 @@ export default class TorrentManager extends InternalService await this.loadDisks(); } + /** + * Link to other services + */ + public link(app: Microservice) { + this.torrentIpc = app.service("Torrent"); + } + /** * Start the service */ public start() { - this.torrentIpc = this.app.service("Torrent"); this.torrentIpc.on("connected", this.onConnect.bind(this)); } @@ -68,8 +74,12 @@ export default class TorrentManager extends InternalService /** * Add a movie to the queue */ - public enqueueMovie(movie: MovieTicket, link: string) { - this.pendingMovies.push({movie, link}); + public enqueueMovie(movie: MovieTicket, link: ITorrentLink) { + if (link.type === "file") { + this.log("Warning: Non-magnet torrent links are not yet supported!"); + return; + } + this.pendingMovies.push({movie, link: link.link}); this.addMovies(); } diff --git a/services/seeker/src/services/index.ts b/services/seeker/src/services/index.ts index 3882b68..4811c1e 100644 --- a/services/seeker/src/services/index.ts +++ b/services/seeker/src/services/index.ts @@ -1,6 +1,6 @@ +export { IpcClient } from "@autoplex-api/torrent-search"; export { DatabaseService } from "@autoplex/database"; import IpcInterface from "./IpcInterface"; -import MovieSearch from "./MovieSearch"; import PostProcessor from "./PostProcessor"; import Supervisor from "./Supervisor"; import TorrentIpc from "./TorrentIpc"; @@ -8,7 +8,6 @@ import TorrentManager from "./TorrentManager"; export { IpcInterface, - MovieSearch, PostProcessor, Supervisor, TorrentIpc,