Browse Source

Add torrent-search service to Seeker. Remove old application code from Seeker

dev
David Ludwig 4 years ago
parent
commit
368e98c6ec
10 changed files with 64 additions and 208 deletions
  1. +2
    -0
      services/seeker/package.json
  2. +0
    -23
      services/seeker/src/Application.ts
  3. +6
    -2
      services/seeker/src/index.ts
  4. +4
    -5
      services/seeker/src/services/IpcInterface.ts
  5. +0
    -136
      services/seeker/src/services/MovieSearch.ts
  6. +5
    -6
      services/seeker/src/services/PostProcessor/PostProcessor.ts
  7. +28
    -25
      services/seeker/src/services/Supervisor.ts
  8. +1
    -2
      services/seeker/src/services/TorrentIpc.ts
  9. +17
    -7
      services/seeker/src/services/TorrentManager.ts
  10. +1
    -2
      services/seeker/src/services/index.ts

+ 2
- 0
services/seeker/package.json View File

@ -21,7 +21,9 @@
"typescript": "^4.2.4"
},
"dependencies": {
"@autoplex-api/seeker": "^0.0.0",
"@autoplex-api/torrent": "^0.0.0",
"@autoplex-api/torrent-search": "^0.0.0",
"@autoplex/database": "^0.0.0",
"@autoplex/ipc": "^0.0.0",
"@autoplex/microservice": "^0.0.0",


+ 0
- 23
services/seeker/src/Application.ts View File

@ -1,23 +0,0 @@
import * as services from "./services";
import { Microservice } from "@autoplex/microservice";
/**
* The main application class
*/
export default class Application extends Microservice
{
/**
* Return the current application instance
*/
public static instance() { return super.instance() }
/**
* Create a new application instance
*/
public constructor() {
super();
for (let ServiceClass of Object.values(services)) {
this.installService(ServiceClass);
}
}
}

+ 6
- 2
services/seeker/src/index.ts View File

@ -1,7 +1,11 @@
import Application from "./Application";
import { Microservice } from "@autoplex/microservice";
import * as services from "./services";
// Create a new application instance
let app = new Application();
let app = new Microservice();
// Install the internal services
app.installServices(Object.values(services));
// Start the application
app.exec().then(process.exit);

+ 4
- 5
services/seeker/src/services/IpcInterface.ts View File

@ -1,10 +1,9 @@
import { SOCKET_PATH } from "@autoplex-api/seeker";
import { IpcServerService } from "@autoplex/ipc";
import Application from "../Application";
import Supervisor from "./Supervisor";
import { MovieTicket } from "@autoplex/database";
import { env } from "@autoplex/utils";
import Supervisor from "./Supervisor";
export default class IpcInterface extends IpcServerService<Application>
export default class IpcInterface extends IpcServerService
{
/**
* The service name
@ -14,7 +13,7 @@ export default class IpcInterface extends IpcServerService<Application>
/**
* The path to the socket file
*/
public readonly SOCKET_PATH = env("IPC_SOCKET_PATH");
public readonly SOCKET_PATH = SOCKET_PATH;
/**
* Install the the event handlers


+ 0
- 136
services/seeker/src/services/MovieSearch.ts View File

@ -1,136 +0,0 @@
import Application from "../Application";
import { MovieTicket } from "@autoplex/database";
import * as providerClasses from "../torrents";
import Provider, { MediaType } from "../torrents/providers/Provider";
import Torrent from "../torrents/Torrent";
import { rankTorrents } from "../torrents/ranking";
import { InternalService } from "@autoplex/microservice";
import Supervisor from "./Supervisor";
export default class MovieSearch extends InternalService
{
/**
* The queue of current movie tickts requiring torrents
*/
protected movieQueue!: MovieTicket[];
/**
* Available providers that support movies
*/
protected providers!: Provider[];
/**
* Indicate if the service is currently searching for movies in the queue
*/
protected isSearchingForMovies: boolean;
/**
* Store a reference to the supervisor service
*/
protected supervisor!: Supervisor;
/**
* Create a new instance of the movie search service
*/
public constructor(app: Application) {
super(app);
this.movieQueue = [];
this.isSearchingForMovies = false;
}
/**
* The service name
*/
public readonly NAME = "Movie Search";
/**
* Boot the movie search service
*/
public async boot() {
let providers = Object.values(providerClasses);
this.providers = providers.filter(provider => provider.PROVIDES & MediaType.Movies)
.map(ProviderClass => new ProviderClass());
}
/**
* Connect to the supervisor instance when everything is booted
*/
public start() {
this.supervisor = this.app.service<Supervisor>("Supervisor");
}
/**
* Shutdown the service
*/
public async shutdown() {
}
// Interface Methods ---------------------------------------------------------------------------
/**
* Enqueue a movie to the search queue
*/
public enqueueMovie(movie: MovieTicket) {
this.movieQueue.push(movie);
this.searchMovies();
}
// Movie Searching -----------------------------------------------------------------------------
/**
* Search for movies currently in the queue
*/
protected async searchMovies() {
if (this.isSearchingForMovies) {
return;
}
this.isSearchingForMovies = true;
while (this.movieQueue.length > 0) {
let movie = this.movieQueue.splice(0, 1)[0];
let link = await this.searchMovie(movie);
if (link === null) {
continue;
}
this.supervisor.onMovieTorrentFound(movie, link);
}
this.isSearchingForMovies = false;
}
/**
* Search for a movie
*/
protected async searchMovie(movie: MovieTicket) {
// Search by IMDb
let torrents = await this.searchImdb(movie);
if (torrents.length == 0) {
return null;
}
// Determine the preferred torrents
let preferredTorrents = rankTorrents(torrents);
if (preferredTorrents.length == 0) {
preferredTorrents = torrents;
}
// Return the selected torrent
this.log("Found movie torrent for", movie.title);
return await preferredTorrents[0].downloadLink();
}
/**
* Search for a movie by its IMDb ID
*/
protected async searchImdb(movie: MovieTicket): Promise<Torrent[]> {
if (movie.imdbId == null) {
return [];
}
let results = await Promise.all(this.providers.map(provider => provider.searchMovie(movie)));
return (<Torrent[]>[]).concat(...results);
}
/**
* Pick the best torrent from the list
*/
protected pickBestTorrent() {
}
}

+ 5
- 6
services/seeker/src/services/PostProcessor/PostProcessor.ts View File

@ -1,11 +1,10 @@
import { ISerializedTorrent } from "@autoplex-api/torrent";
import { MovieTicket, MovieTorrent } from "@autoplex/database";
import { link, mkdir } from "fs/promises";
import { dirname, extname } from "path";
import Application from "../../Application";
import { MovieTicket, MovieTorrent } from "@autoplex/database";
import { ISerializedTorrent } from "@autoplex-api/torrent";
import { safeTitleFileName } from "../../utils";
import Supervisor from "../Supervisor";
import { InternalService } from "@autoplex/microservice";
import { InternalService, Microservice } from "@autoplex/microservice";
/**
* Common video file extensions
@ -23,7 +22,7 @@ interface IPendingMovieTorrent {
ticket : MovieTicket
}
export default class PostProcessor extends InternalService<Application>
export default class PostProcessor extends InternalService
{
/**
* The queue of movies to process
@ -38,7 +37,7 @@ export default class PostProcessor extends InternalService<Application>
/**
* Create a new instance of the post processor
*/
public constructor(app: Application) {
public constructor(app: Microservice) {
super(app);
this.pendingMovies = [];
this.isProcessingMovies = false;


+ 28
- 25
services/seeker/src/services/Supervisor.ts View File

@ -1,18 +1,16 @@
import Application from "../Application";
import { IpcClient as TorrentSearchIpc, ITorrentLink } from "@autoplex-api/torrent-search";
import { ISerializedTorrent } from "@autoplex-api/torrent";
import { MovieTicket, MovieTorrent } from "@autoplex/database";
import MovieSearch from "./MovieSearch";
import { InternalService, Microservice } from "@autoplex/microservice";
import PostProcessor from "./PostProcessor";
import { InternalService } from "@autoplex/microservice";
import TorrentManager from "./TorrentManager";
import { ISerializedTorrent } from "@autoplex-api/torrent";
export default class Supervisor extends InternalService<Application>
export default class Supervisor extends InternalService
{
/**
* The movie search service instance
*/
protected movieSearch!: MovieSearch;
protected torrentSearch!: TorrentSearchIpc;
/**
* The torrent client IPC service instance
@ -25,24 +23,23 @@ export default class Supervisor extends InternalService<Application>
protected postProcessor!: PostProcessor;
/**
* Create a new supervisor service instance
* The service name
*/
public constructor(app: Application) {
super(app);
}
public readonly NAME = "Supervisor";
/**
* The service name
* Link to other internal services
*/
public readonly NAME = "Supervisor";
public link(app: Microservice) {
this.torrentSearch = app.service<TorrentSearchIpc>("Torrent Search");
this.torrentManager = app.service<TorrentManager>("Torrent Manager");
this.postProcessor = app.service<PostProcessor>("Post Processor");
}
/**
* All services are booted and ready
*/
public start() {
this.movieSearch = this.app.service<MovieSearch>("Movie Search");
this.torrentManager = this.app.service<TorrentManager>("Torrent Manager");
this.postProcessor = this.app.service<PostProcessor>("Post Processor");
this.searchMovies();
}
@ -52,15 +49,19 @@ export default class Supervisor extends InternalService<Application>
* 1. A movie ticket has been added/torrent has gone stale, so
* dispatch a search task for a torrent
*/
public onMovieTicketNeedsTorrent(ticket: MovieTicket) {
this.log("A movie needs a torrent:", ticket.title);
this.movieSearch.enqueueMovie(ticket);
public async onMovieTicketNeedsTorrent(ticket: MovieTicket) {
let link = await this.torrentSearch.searchMovie(ticket.title, ticket.imdbId, ticket.year);
if (link === null) {
this.log("Could not find a torrent for:", ticket.title);
return;
}
this.onMovieTorrentFound(ticket, link);
}
/**
* 2. A movie torrent has been found, so add it to the client
*/
public onMovieTorrentFound(ticket: MovieTicket, link: string) {
public onMovieTorrentFound(ticket: MovieTicket, link: ITorrentLink) {
this.log("A torrent was found for movie:", ticket.title);
this.torrentManager.enqueueMovie(ticket, link);
}
@ -84,18 +85,20 @@ export default class Supervisor extends InternalService<Application>
// Tasks ---------------------------------------------------------------------------------------
/**
* @TODO Performing a promise-all instead of waiting between each movie may be much faster
* Search available movies in the database
*/
public async searchMovies() {
let movies = await MovieTicket.find({where: {isFulfilled: false}, relations: [ "torrents" ]});
for (let movie of movies) {
let tickets = await MovieTicket.find({
where: {isFulfilled: false, isCanceled: false},
relations: [ "torrents" ]
});
for (let ticket of tickets) {
// Skip already-resolved non-stale torrents
if (movie.torrents.length > 0 && !movie.isStale) {
if (ticket.torrents.length > 0 && !ticket.isStale) {
this.log("Skipping already satisfied ticket")
continue;
}
this.onMovieTicketNeedsTorrent(movie);
this.onMovieTicketNeedsTorrent(ticket);
}
}
}

+ 1
- 2
services/seeker/src/services/TorrentIpc.ts View File

@ -1,4 +1,3 @@
import Application from "../Application";
import { IpcClient } from "@autoplex-api/torrent";
interface IResponse {
@ -24,7 +23,7 @@ declare interface TorrentIpc {
/**
* The torrent client IPC service
*/
class TorrentIpc extends IpcClient<Application>
class TorrentIpc extends IpcClient
{
/**
* Install the message event handlers


+ 17
- 7
services/seeker/src/services/TorrentManager.ts View File

@ -1,9 +1,9 @@
import { ITorrentLink } from "@autoplex-api/torrent-search";
import { MovieTicket, MovieTorrent } from "@autoplex/database";
import { InternalService, Microservice } from "@autoplex/microservice";
import diskusage from "diskusage";
import { readdir } from "fs/promises";
import { MovieTicket, MovieTorrent } from "@autoplex/database";
import Supervisor from "./Supervisor";
import { InternalService } from "@autoplex/microservice";
import Application from "../Application";
import TorrentIpc from "./TorrentIpc";
interface IPendingMovieTorrent {
@ -19,7 +19,7 @@ interface IDiskMap {
// tvshows: string[]
}
export default class TorrentManager extends InternalService<Application>
export default class TorrentManager extends InternalService
{
/**
* The service name
@ -55,11 +55,17 @@ export default class TorrentManager extends InternalService<Application>
await this.loadDisks();
}
/**
* Link to other services
*/
public link(app: Microservice) {
this.torrentIpc = app.service<TorrentIpc>("Torrent");
}
/**
* Start the service
*/
public start() {
this.torrentIpc = this.app.service<TorrentIpc>("Torrent");
this.torrentIpc.on("connected", this.onConnect.bind(this));
}
@ -68,8 +74,12 @@ export default class TorrentManager extends InternalService<Application>
/**
* Add a movie to the queue
*/
public enqueueMovie(movie: MovieTicket, link: string) {
this.pendingMovies.push({movie, link});
public enqueueMovie(movie: MovieTicket, link: ITorrentLink) {
if (link.type === "file") {
this.log("Warning: Non-magnet torrent links are not yet supported!");
return;
}
this.pendingMovies.push({movie, link: link.link});
this.addMovies();
}


+ 1
- 2
services/seeker/src/services/index.ts View File

@ -1,6 +1,6 @@
export { IpcClient } from "@autoplex-api/torrent-search";
export { DatabaseService } from "@autoplex/database";
import IpcInterface from "./IpcInterface";
import MovieSearch from "./MovieSearch";
import PostProcessor from "./PostProcessor";
import Supervisor from "./Supervisor";
import TorrentIpc from "./TorrentIpc";
@ -8,7 +8,6 @@ import TorrentManager from "./TorrentManager";
export {
IpcInterface,
MovieSearch,
PostProcessor,
Supervisor,
TorrentIpc,


Loading…
Cancel
Save