From b8b4a349d31b722b7bd438eaa10e6f186ebf17c8 Mon Sep 17 00:00:00 2001 From: Thomas Kaul <4159106+dtslvr@users.noreply.github.com> Date: Sun, 8 Feb 2026 17:51:33 +0100 Subject: [PATCH] Migrate from bull to bullmq --- .../src/app/admin/queue/queue.controller.ts | 6 +- apps/api/src/app/admin/queue/queue.service.ts | 17 ++--- apps/api/src/app/app.module.ts | 4 +- .../calculator/portfolio-calculator.ts | 4 +- .../data-gathering/data-gathering.module.ts | 7 +- .../data-gathering.processor.ts | 67 +++++++++++-------- .../data-gathering/data-gathering.service.ts | 8 +-- .../portfolio-snapshot.module.ts | 16 +---- .../portfolio-snapshot.processor.ts | 34 ++++++---- .../portfolio-snapshot.service.mock.ts | 6 +- .../portfolio-snapshot.service.ts | 18 +++-- .../admin-jobs/admin-jobs.component.ts | 3 +- libs/common/src/lib/config.ts | 17 ++--- .../lib/interfaces/admin-jobs.interface.ts | 4 +- libs/ui/src/lib/services/admin.service.ts | 5 +- 15 files changed, 115 insertions(+), 101 deletions(-) diff --git a/apps/api/src/app/admin/queue/queue.controller.ts b/apps/api/src/app/admin/queue/queue.controller.ts index 060abd247..65d388b19 100644 --- a/apps/api/src/app/admin/queue/queue.controller.ts +++ b/apps/api/src/app/admin/queue/queue.controller.ts @@ -1,5 +1,6 @@ import { HasPermission } from '@ghostfolio/api/decorators/has-permission.decorator'; import { HasPermissionGuard } from '@ghostfolio/api/guards/has-permission.guard'; +import { JobStatusType } from '@ghostfolio/common/config'; import { AdminJobs } from '@ghostfolio/common/interfaces'; import { permissions } from '@ghostfolio/common/permissions'; @@ -12,7 +13,6 @@ import { UseGuards } from '@nestjs/common'; import { AuthGuard } from '@nestjs/passport'; -import { JobStatus } from 'bull'; import { QueueService } from './queue.service'; @@ -26,7 +26,7 @@ export class QueueController { public async deleteJobs( @Query('status') filterByStatus?: string ): Promise { - const status = (filterByStatus?.split(',') as JobStatus[]) ?? undefined; + const status = (filterByStatus?.split(',') as JobStatusType[]) ?? undefined; return this.queueService.deleteJobs({ status }); } @@ -36,7 +36,7 @@ export class QueueController { public async getJobs( @Query('status') filterByStatus?: string ): Promise { - const status = (filterByStatus?.split(',') as JobStatus[]) ?? undefined; + const status = (filterByStatus?.split(',') as JobStatusType[]) ?? undefined; return this.queueService.getJobs({ status }); } diff --git a/apps/api/src/app/admin/queue/queue.service.ts b/apps/api/src/app/admin/queue/queue.service.ts index 747c4d6fb..23355ebdb 100644 --- a/apps/api/src/app/admin/queue/queue.service.ts +++ b/apps/api/src/app/admin/queue/queue.service.ts @@ -1,13 +1,14 @@ import { DATA_GATHERING_QUEUE, + JobStatusType, PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE, QUEUE_JOB_STATUS_LIST } from '@ghostfolio/common/config'; import { AdminJobs } from '@ghostfolio/common/interfaces'; -import { InjectQueue } from '@nestjs/bull'; +import { InjectQueue } from '@nestjs/bullmq'; import { Injectable } from '@nestjs/common'; -import { JobStatus, Queue } from 'bull'; +import { Queue } from 'bullmq'; @Injectable() export class QueueService { @@ -29,15 +30,15 @@ export class QueueService { } public async deleteJobs({ - status = QUEUE_JOB_STATUS_LIST + status = [...QUEUE_JOB_STATUS_LIST] }: { - status?: JobStatus[]; + status?: JobStatusType[]; }) { for (const statusItem of status) { const queueStatus = statusItem === 'waiting' ? 'wait' : statusItem; - await this.dataGatheringQueue.clean(300, queueStatus); - await this.portfolioSnapshotQueue.clean(300, queueStatus); + await this.dataGatheringQueue.clean(300, 0, queueStatus); + await this.portfolioSnapshotQueue.clean(300, 0, queueStatus); } } @@ -53,10 +54,10 @@ export class QueueService { public async getJobs({ limit = 1000, - status = QUEUE_JOB_STATUS_LIST + status = [...QUEUE_JOB_STATUS_LIST] }: { limit?: number; - status?: JobStatus[]; + status?: JobStatusType[]; }): Promise { const [dataGatheringJobs, portfolioSnapshotJobs] = await Promise.all([ this.dataGatheringQueue.getJobs(status), diff --git a/apps/api/src/app/app.module.ts b/apps/api/src/app/app.module.ts index 89f52e1ea..0c6707b91 100644 --- a/apps/api/src/app/app.module.ts +++ b/apps/api/src/app/app.module.ts @@ -14,7 +14,7 @@ import { SUPPORTED_LANGUAGE_CODES } from '@ghostfolio/common/config'; -import { BullModule } from '@nestjs/bull'; +import { BullModule } from '@nestjs/bullmq'; import { MiddlewareConsumer, Module, NestModule } from '@nestjs/common'; import { ConfigModule } from '@nestjs/config'; import { EventEmitterModule } from '@nestjs/event-emitter'; @@ -70,7 +70,7 @@ import { UserModule } from './user/user.module'; AuthModule, BenchmarksModule, BullModule.forRoot({ - redis: { + connection: { db: parseInt(process.env.REDIS_DB ?? '0', 10), host: process.env.REDIS_HOST, password: process.env.REDIS_PASSWORD, diff --git a/apps/api/src/app/portfolio/calculator/portfolio-calculator.ts b/apps/api/src/app/portfolio/calculator/portfolio-calculator.ts index 2e58a4ef5..778354bfd 100644 --- a/apps/api/src/app/portfolio/calculator/portfolio-calculator.ts +++ b/apps/api/src/app/portfolio/calculator/portfolio-calculator.ts @@ -1163,7 +1163,9 @@ export abstract class PortfolioCalculator { const job = await this.portfolioSnapshotService.getJob(jobId); if (job) { - await job.finished(); + await job.waitUntilFinished( + this.portfolioSnapshotService.getQueueEvents() + ); } await this.initialize(); diff --git a/apps/api/src/services/queues/data-gathering/data-gathering.module.ts b/apps/api/src/services/queues/data-gathering/data-gathering.module.ts index b51823476..76454dc8e 100644 --- a/apps/api/src/services/queues/data-gathering/data-gathering.module.ts +++ b/apps/api/src/services/queues/data-gathering/data-gathering.module.ts @@ -9,19 +9,14 @@ import { DataGatheringService } from '@ghostfolio/api/services/queues/data-gathe import { SymbolProfileModule } from '@ghostfolio/api/services/symbol-profile/symbol-profile.module'; import { DATA_GATHERING_QUEUE } from '@ghostfolio/common/config'; -import { BullModule } from '@nestjs/bull'; +import { BullModule } from '@nestjs/bullmq'; import { Module } from '@nestjs/common'; -import ms from 'ms'; import { DataGatheringProcessor } from './data-gathering.processor'; @Module({ imports: [ BullModule.registerQueue({ - limiter: { - duration: ms('4 seconds'), - max: 1 - }, name: DATA_GATHERING_QUEUE }), ConfigurationModule, diff --git a/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts b/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts index 1a4038652..172dd1666 100644 --- a/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts +++ b/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts @@ -5,18 +5,16 @@ import { MarketDataService } from '@ghostfolio/api/services/market-data/market-d import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile/symbol-profile.service'; import { DATA_GATHERING_QUEUE, - DEFAULT_PROCESSOR_GATHER_ASSET_PROFILE_CONCURRENCY, - DEFAULT_PROCESSOR_GATHER_HISTORICAL_MARKET_DATA_CONCURRENCY, GATHER_ASSET_PROFILE_PROCESS_JOB_NAME, GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME } from '@ghostfolio/common/config'; import { DATE_FORMAT, getStartOfUtcDate } from '@ghostfolio/common/helper'; import { AssetProfileIdentifier } from '@ghostfolio/common/interfaces'; -import { Process, Processor } from '@nestjs/bull'; +import { Processor, WorkerHost } from '@nestjs/bullmq'; import { Injectable, Logger } from '@nestjs/common'; import { Prisma } from '@prisma/client'; -import { Job } from 'bull'; +import { Job, UnrecoverableError } from 'bullmq'; import { addDays, format, @@ -26,28 +24,45 @@ import { isBefore, parseISO } from 'date-fns'; +import ms from 'ms'; import { DataGatheringService } from './data-gathering.service'; @Injectable() -@Processor(DATA_GATHERING_QUEUE) -export class DataGatheringProcessor { +@Processor(DATA_GATHERING_QUEUE, { + concurrency: parseInt( + process.env.PROCESSOR_GATHER_ASSET_PROFILE_CONCURRENCY ?? + process.env.PROCESSOR_GATHER_HISTORICAL_MARKET_DATA_CONCURRENCY ?? + '1', + 10 + ), + limiter: { + max: 1, + duration: ms('4 seconds') + } +}) +export class DataGatheringProcessor extends WorkerHost { public constructor( private readonly dataGatheringService: DataGatheringService, private readonly dataProviderService: DataProviderService, private readonly marketDataService: MarketDataService, private readonly symbolProfileService: SymbolProfileService - ) {} - - @Process({ - concurrency: parseInt( - process.env.PROCESSOR_GATHER_ASSET_PROFILE_CONCURRENCY ?? - DEFAULT_PROCESSOR_GATHER_ASSET_PROFILE_CONCURRENCY.toString(), - 10 - ), - name: GATHER_ASSET_PROFILE_PROCESS_JOB_NAME - }) - public async gatherAssetProfile(job: Job) { + ) { + super(); + } + + public async process(job: Job): Promise { + switch (job.name) { + case GATHER_ASSET_PROFILE_PROCESS_JOB_NAME: + return this.gatherAssetProfile(job); + case GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME: + return this.gatherHistoricalMarketData(job); + default: + throw new Error(`Unknown job name: ${job.name}`); + } + } + + private async gatherAssetProfile(job: Job) { const { dataSource, symbol } = job.data; try { @@ -79,7 +94,9 @@ export class DataGatheringProcessor { `DataGatheringProcessor (${GATHER_ASSET_PROFILE_PROCESS_JOB_NAME})` ); - return job.discard(); + throw new UnrecoverableError( + `Asset ${symbol} (${dataSource}) has been delisted` + ); } Logger.error( @@ -91,15 +108,7 @@ export class DataGatheringProcessor { } } - @Process({ - concurrency: parseInt( - process.env.PROCESSOR_GATHER_HISTORICAL_MARKET_DATA_CONCURRENCY ?? - DEFAULT_PROCESSOR_GATHER_HISTORICAL_MARKET_DATA_CONCURRENCY.toString(), - 10 - ), - name: GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME - }) - public async gatherHistoricalMarketData(job: Job) { + private async gatherHistoricalMarketData(job: Job) { const { dataSource, date, force, symbol } = job.data; try { @@ -191,7 +200,9 @@ export class DataGatheringProcessor { `DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME})` ); - return job.discard(); + throw new UnrecoverableError( + `Asset ${symbol} (${dataSource}) has been delisted` + ); } Logger.error( diff --git a/apps/api/src/services/queues/data-gathering/data-gathering.service.ts b/apps/api/src/services/queues/data-gathering/data-gathering.service.ts index cec63c3eb..ac3954655 100644 --- a/apps/api/src/services/queues/data-gathering/data-gathering.service.ts +++ b/apps/api/src/services/queues/data-gathering/data-gathering.service.ts @@ -24,10 +24,10 @@ import { BenchmarkProperty } from '@ghostfolio/common/interfaces'; -import { InjectQueue } from '@nestjs/bull'; +import { InjectQueue } from '@nestjs/bullmq'; import { Inject, Injectable, Logger } from '@nestjs/common'; import { DataSource } from '@prisma/client'; -import { JobOptions, Queue } from 'bull'; +import { JobsOptions, Queue } from 'bullmq'; import { format, min, subDays, subMilliseconds, subYears } from 'date-fns'; import { isEmpty } from 'lodash'; import ms, { StringValue } from 'ms'; @@ -53,13 +53,13 @@ export class DataGatheringService { }: { data: any; name: string; - opts?: JobOptions; + opts?: JobsOptions; }) { return this.dataGatheringQueue.add(name, data, opts); } public async addJobsToQueue( - jobs: { data: any; name: string; opts?: JobOptions }[] + jobs: { data: any; name: string; opts?: JobsOptions }[] ) { return this.dataGatheringQueue.addBulk(jobs); } diff --git a/apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.module.ts b/apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.module.ts index 958636334..34e75e17b 100644 --- a/apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.module.ts +++ b/apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.module.ts @@ -8,12 +8,9 @@ import { DataProviderModule } from '@ghostfolio/api/services/data-provider/data- import { ExchangeRateDataModule } from '@ghostfolio/api/services/exchange-rate-data/exchange-rate-data.module'; import { MarketDataModule } from '@ghostfolio/api/services/market-data/market-data.module'; import { PortfolioSnapshotService } from '@ghostfolio/api/services/queues/portfolio-snapshot/portfolio-snapshot.service'; -import { - DEFAULT_PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_TIMEOUT, - PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE -} from '@ghostfolio/common/config'; +import { PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE } from '@ghostfolio/common/config'; -import { BullModule } from '@nestjs/bull'; +import { BullModule } from '@nestjs/bullmq'; import { Module } from '@nestjs/common'; import { PortfolioSnapshotProcessor } from './portfolio-snapshot.processor'; @@ -23,14 +20,7 @@ import { PortfolioSnapshotProcessor } from './portfolio-snapshot.processor'; imports: [ AccountBalanceModule, BullModule.registerQueue({ - name: PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE, - settings: { - lockDuration: parseInt( - process.env.PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_TIMEOUT ?? - DEFAULT_PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_TIMEOUT.toString(), - 10 - ) - } + name: PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE }), ConfigurationModule, DataProviderModule, diff --git a/apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.processor.ts b/apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.processor.ts index 58a0a8f8a..b206a198e 100644 --- a/apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.processor.ts +++ b/apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.processor.ts @@ -7,37 +7,43 @@ import { ConfigurationService } from '@ghostfolio/api/services/configuration/con import { CACHE_TTL_INFINITE, DEFAULT_PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_CONCURRENCY, + DEFAULT_PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_TIMEOUT, PORTFOLIO_SNAPSHOT_PROCESS_JOB_NAME, PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE } from '@ghostfolio/common/config'; -import { Process, Processor } from '@nestjs/bull'; +import { Processor, WorkerHost } from '@nestjs/bullmq'; import { Injectable, Logger } from '@nestjs/common'; -import { Job } from 'bull'; +import { Job } from 'bullmq'; import { addMilliseconds } from 'date-fns'; import { PortfolioSnapshotQueueJob } from './interfaces/portfolio-snapshot-queue-job.interface'; @Injectable() -@Processor(PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE) -export class PortfolioSnapshotProcessor { +@Processor(PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE, { + concurrency: parseInt( + process.env.PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_CONCURRENCY ?? + DEFAULT_PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_CONCURRENCY.toString(), + 10 + ), + lockDuration: parseInt( + process.env.PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_TIMEOUT ?? + DEFAULT_PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_TIMEOUT.toString(), + 10 + ) +}) +export class PortfolioSnapshotProcessor extends WorkerHost { public constructor( private readonly accountBalanceService: AccountBalanceService, private readonly calculatorFactory: PortfolioCalculatorFactory, private readonly configurationService: ConfigurationService, private readonly orderService: OrderService, private readonly redisCacheService: RedisCacheService - ) {} + ) { + super(); + } - @Process({ - concurrency: parseInt( - process.env.PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_CONCURRENCY ?? - DEFAULT_PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_CONCURRENCY.toString(), - 10 - ), - name: PORTFOLIO_SNAPSHOT_PROCESS_JOB_NAME - }) - public async calculatePortfolioSnapshot(job: Job) { + public async process(job: Job): Promise { try { const startTime = performance.now(); diff --git a/apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.service.mock.ts b/apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.service.mock.ts index 898718106..e1c1af97a 100644 --- a/apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.service.mock.ts +++ b/apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.service.mock.ts @@ -1,4 +1,4 @@ -import { Job, JobOptions } from 'bull'; +import { Job, JobsOptions } from 'bullmq'; import { setTimeout } from 'timers/promises'; import { PortfolioSnapshotQueueJob } from './interfaces/portfolio-snapshot-queue-job.interface'; @@ -9,10 +9,10 @@ export const PortfolioSnapshotServiceMock = { }: { data: PortfolioSnapshotQueueJob; name: string; - opts?: JobOptions; + opts?: JobsOptions; }): Promise> { const mockJob: Partial> = { - finished: async () => { + waitUntilFinished: async () => { await setTimeout(100); return Promise.resolve(); diff --git a/apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.service.ts b/apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.service.ts index d7449a9cc..cbd89b9ca 100644 --- a/apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.service.ts +++ b/apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.service.ts @@ -1,17 +1,23 @@ import { PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE } from '@ghostfolio/common/config'; -import { InjectQueue } from '@nestjs/bull'; +import { InjectQueue } from '@nestjs/bullmq'; import { Injectable } from '@nestjs/common'; -import { JobOptions, Queue } from 'bull'; +import { JobsOptions, Queue, QueueEvents } from 'bullmq'; import { PortfolioSnapshotQueueJob } from './interfaces/portfolio-snapshot-queue-job.interface'; @Injectable() export class PortfolioSnapshotService { + private readonly queueEvents: QueueEvents; + public constructor( @InjectQueue(PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE) private readonly portfolioSnapshotQueue: Queue - ) {} + ) { + this.queueEvents = new QueueEvents(PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE, { + connection: this.portfolioSnapshotQueue.opts?.connection as any + }); + } public async addJobToQueue({ data, @@ -20,7 +26,7 @@ export class PortfolioSnapshotService { }: { data: PortfolioSnapshotQueueJob; name: string; - opts?: JobOptions; + opts?: JobsOptions; }) { return this.portfolioSnapshotQueue.add(name, data, opts); } @@ -28,4 +34,8 @@ export class PortfolioSnapshotService { public async getJob(jobId: string) { return this.portfolioSnapshotQueue.getJob(jobId); } + + public getQueueEvents() { + return this.queueEvents; + } } diff --git a/apps/client/src/app/components/admin-jobs/admin-jobs.component.ts b/apps/client/src/app/components/admin-jobs/admin-jobs.component.ts index de70a7b6e..6a89af22d 100644 --- a/apps/client/src/app/components/admin-jobs/admin-jobs.component.ts +++ b/apps/client/src/app/components/admin-jobs/admin-jobs.component.ts @@ -31,7 +31,6 @@ import { MatSelectModule } from '@angular/material/select'; import { MatSort, MatSortModule } from '@angular/material/sort'; import { MatTableDataSource, MatTableModule } from '@angular/material/table'; import { IonIcon } from '@ionic/angular/standalone'; -import { JobStatus } from 'bull'; import { addIcons } from 'ionicons'; import { alertCircleOutline, @@ -194,7 +193,7 @@ export class GfAdminJobsComponent implements OnDestroy, OnInit { this.unsubscribeSubject.complete(); } - private fetchJobs(aStatus?: JobStatus[]) { + private fetchJobs(aStatus?: string[]) { this.isLoading = true; this.adminService diff --git a/libs/common/src/lib/config.ts b/libs/common/src/lib/config.ts index b558ccc42..4ab67991b 100644 --- a/libs/common/src/lib/config.ts +++ b/libs/common/src/lib/config.ts @@ -1,5 +1,5 @@ import { AssetClass, AssetSubClass, DataSource, Type } from '@prisma/client'; -import { JobOptions, JobStatus } from 'bull'; +import { JobsOptions } from 'bullmq'; import ms from 'ms'; export const ghostfolioPrefix = 'GF'; @@ -56,7 +56,7 @@ export const CACHE_TTL_INFINITE = 0; export const DATA_GATHERING_QUEUE = 'DATA_GATHERING_QUEUE'; export const DATA_GATHERING_QUEUE_PRIORITY_HIGH = 1; -export const DATA_GATHERING_QUEUE_PRIORITY_LOW = Number.MAX_SAFE_INTEGER; +export const DATA_GATHERING_QUEUE_PRIORITY_LOW = 2_097_152; export const DATA_GATHERING_QUEUE_PRIORITY_MEDIUM = Math.round( DATA_GATHERING_QUEUE_PRIORITY_LOW / 2 ); @@ -64,8 +64,7 @@ export const DATA_GATHERING_QUEUE_PRIORITY_MEDIUM = Math.round( export const PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE = 'PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE'; export const PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE_PRIORITY_HIGH = 1; -export const PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE_PRIORITY_LOW = - Number.MAX_SAFE_INTEGER; +export const PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE_PRIORITY_LOW = 2_097_152; export const DEFAULT_CURRENCY = 'USD'; export const DEFAULT_DATE_FORMAT_MONTH_YEAR = 'MMM yyyy'; @@ -150,7 +149,7 @@ export const DERIVED_CURRENCIES = [ ]; export const GATHER_ASSET_PROFILE_PROCESS_JOB_NAME = 'GATHER_ASSET_PROFILE'; -export const GATHER_ASSET_PROFILE_PROCESS_JOB_OPTIONS: JobOptions = { +export const GATHER_ASSET_PROFILE_PROCESS_JOB_OPTIONS: JobsOptions = { attempts: 12, backoff: { delay: ms('1 minute'), @@ -161,7 +160,7 @@ export const GATHER_ASSET_PROFILE_PROCESS_JOB_OPTIONS: JobOptions = { export const GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME = 'GATHER_HISTORICAL_MARKET_DATA'; -export const GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_OPTIONS: JobOptions = { +export const GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_OPTIONS: JobsOptions = { attempts: 12, backoff: { delay: ms('1 minute'), @@ -177,7 +176,7 @@ export const INVESTMENT_ACTIVITY_TYPES = [ ] as Type[]; export const PORTFOLIO_SNAPSHOT_PROCESS_JOB_NAME = 'PORTFOLIO'; -export const PORTFOLIO_SNAPSHOT_PROCESS_JOB_OPTIONS: JobOptions = { +export const PORTFOLIO_SNAPSHOT_PROCESS_JOB_OPTIONS: JobsOptions = { removeOnComplete: true }; @@ -219,7 +218,9 @@ export const QUEUE_JOB_STATUS_LIST = [ 'failed', 'paused', 'waiting' -] as JobStatus[]; +] as const; + +export type JobStatusType = (typeof QUEUE_JOB_STATUS_LIST)[number]; export const REPLACE_NAME_PARTS = [ 'Amundi Index Solutions -', diff --git a/libs/common/src/lib/interfaces/admin-jobs.interface.ts b/libs/common/src/lib/interfaces/admin-jobs.interface.ts index b4c91ebc0..8f0a05d1f 100644 --- a/libs/common/src/lib/interfaces/admin-jobs.interface.ts +++ b/libs/common/src/lib/interfaces/admin-jobs.interface.ts @@ -1,4 +1,4 @@ -import { Job, JobStatus } from 'bull'; +import { Job } from 'bullmq'; export interface AdminJobs { jobs: (Pick< @@ -12,6 +12,6 @@ export interface AdminJobs { | 'stacktrace' | 'timestamp' > & { - state: JobStatus | 'stuck'; + state: string; })[]; } diff --git a/libs/ui/src/lib/services/admin.service.ts b/libs/ui/src/lib/services/admin.service.ts index 145f134e3..38e288839 100644 --- a/libs/ui/src/lib/services/admin.service.ts +++ b/libs/ui/src/lib/services/admin.service.ts @@ -28,7 +28,6 @@ import { HttpClient, HttpHeaders, HttpParams } from '@angular/common/http'; import { Inject, Injectable } from '@angular/core'; import { SortDirection } from '@angular/material/sort'; import { DataSource, MarketData, Platform } from '@prisma/client'; -import { JobStatus } from 'bull'; @Injectable({ providedIn: 'root' @@ -51,7 +50,7 @@ export class AdminService { return this.http.delete(`/api/v1/admin/queue/job/${aId}`); } - public deleteJobs({ status }: { status: JobStatus[] }) { + public deleteJobs({ status }: { status: string[] }) { let params = new HttpParams(); if (status?.length > 0) { @@ -129,7 +128,7 @@ export class AdminService { ); } - public fetchJobs({ status }: { status?: JobStatus[] }) { + public fetchJobs({ status }: { status?: string[] }) { let params = new HttpParams(); if (status?.length > 0) {