From 9d65030e06713e01751bae855bd658f58d850619 Mon Sep 17 00:00:00 2001 From: Thomas Kaul <4159106+dtslvr@users.noreply.github.com> Date: Mon, 9 Sep 2024 18:52:27 +0200 Subject: [PATCH] Extend queue service --- apps/api/src/app/admin/queue/queue.module.ts | 3 +- apps/api/src/app/admin/queue/queue.service.ts | 38 ++++++++++++++----- .../app/components/admin-jobs/admin-jobs.html | 2 + 3 files changed, 33 insertions(+), 10 deletions(-) diff --git a/apps/api/src/app/admin/queue/queue.module.ts b/apps/api/src/app/admin/queue/queue.module.ts index 46ae3b8a5..33598fbb8 100644 --- a/apps/api/src/app/admin/queue/queue.module.ts +++ b/apps/api/src/app/admin/queue/queue.module.ts @@ -1,4 +1,5 @@ import { DataGatheringModule } from '@ghostfolio/api/services/data-gathering/data-gathering.module'; +import { PortfolioSnapshotQueueModule } from '@ghostfolio/api/services/portfolio-snapshot/portfolio-snapshot.module'; import { Module } from '@nestjs/common'; @@ -7,7 +8,7 @@ import { QueueService } from './queue.service'; @Module({ controllers: [QueueController], - imports: [DataGatheringModule], + imports: [DataGatheringModule, PortfolioSnapshotQueueModule], providers: [QueueService] }) export class QueueModule {} diff --git a/apps/api/src/app/admin/queue/queue.service.ts b/apps/api/src/app/admin/queue/queue.service.ts index abae3cad1..15fdf7cb0 100644 --- a/apps/api/src/app/admin/queue/queue.service.ts +++ b/apps/api/src/app/admin/queue/queue.service.ts @@ -1,5 +1,6 @@ import { DATA_GATHERING_QUEUE, + PORTFOLIO_SNAPSHOT_QUEUE, QUEUE_JOB_STATUS_LIST } from '@ghostfolio/common/config'; import { AdminJobs } from '@ghostfolio/common/interfaces'; @@ -12,11 +13,19 @@ import { JobStatus, Queue } from 'bull'; export class QueueService { public constructor( @InjectQueue(DATA_GATHERING_QUEUE) - private readonly dataGatheringQueue: Queue + private readonly dataGatheringQueue: Queue, + @InjectQueue(PORTFOLIO_SNAPSHOT_QUEUE) + private readonly portfolioSnapshotQueue: Queue ) {} public async deleteJob(aId: string) { - return (await this.dataGatheringQueue.getJob(aId))?.remove(); + let job = await this.dataGatheringQueue.getJob(aId); + + if (!job) { + job = await this.portfolioSnapshotQueue.getJob(aId); + } + + return job?.remove(); } public async deleteJobs({ @@ -25,15 +34,21 @@ export class QueueService { status?: JobStatus[]; }) { for (const statusItem of status) { - await this.dataGatheringQueue.clean( - 300, - statusItem === 'waiting' ? 'wait' : statusItem - ); + const queueStatus = statusItem === 'waiting' ? 'wait' : statusItem; + + await this.dataGatheringQueue.clean(300, queueStatus); + await this.portfolioSnapshotQueue.clean(300, queueStatus); } } public async executeJob(aId: string) { - return (await this.dataGatheringQueue.getJob(aId))?.promote(); + let job = await this.dataGatheringQueue.getJob(aId); + + if (!job) { + job = await this.portfolioSnapshotQueue.getJob(aId); + } + + return job?.promote(); } public async getJobs({ @@ -43,10 +58,15 @@ export class QueueService { limit?: number; status?: JobStatus[]; }): Promise { - const jobs = await this.dataGatheringQueue.getJobs(status); + const [dataGatheringJobs, portfolioSnapshotJobs] = await Promise.all([ + this.dataGatheringQueue.getJobs(status), + this.portfolioSnapshotQueue.getJobs(status) + ]); + + const allJobs = [...dataGatheringJobs, ...portfolioSnapshotJobs]; const jobsWithState = await Promise.all( - jobs + allJobs .filter((job) => { return job; }) diff --git a/apps/client/src/app/components/admin-jobs/admin-jobs.html b/apps/client/src/app/components/admin-jobs/admin-jobs.html index e194b2b37..ef46d766e 100644 --- a/apps/client/src/app/components/admin-jobs/admin-jobs.html +++ b/apps/client/src/app/components/admin-jobs/admin-jobs.html @@ -35,6 +35,8 @@ Asset Profile } @else if (element.name === 'GATHER_HISTORICAL_MARKET_DATA') { Historical Market Data + } @else if (element.name === 'PORTFOLIO') { + Portfolio Snapshot }