From ff7eac85733e8ba0ead25ff1ed6441e3cc48e089 Mon Sep 17 00:00:00 2001 From: Thomas Kaul <4159106+dtslvr@users.noreply.github.com> Date: Sun, 8 Feb 2026 17:50:44 +0100 Subject: [PATCH 1/2] Migrate from bull to bullmq --- package-lock.json | 88 +++++++++++++++++------------------------------ package.json | 4 +-- 2 files changed, 33 insertions(+), 59 deletions(-) diff --git a/package-lock.json b/package-lock.json index da8bf26f4..05a14372a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -26,7 +26,7 @@ "@internationalized/number": "3.6.5", "@ionic/angular": "8.7.8", "@keyv/redis": "4.4.0", - "@nestjs/bull": "11.0.4", + "@nestjs/bullmq": "11.0.4", "@nestjs/cache-manager": "3.0.1", "@nestjs/common": "11.1.8", "@nestjs/config": "4.0.2", @@ -45,7 +45,7 @@ "alphavantage": "2.2.0", "big.js": "7.0.1", "bootstrap": "4.6.2", - "bull": "4.16.5", + "bullmq": "5.67.3", "chart.js": "4.5.1", "chartjs-adapter-date-fns": "3.0.0", "chartjs-chart-treemap": "3.1.0", @@ -5038,9 +5038,9 @@ } }, "node_modules/@ioredis/commands": { - "version": "1.4.0", - "resolved": "https://registry.npmjs.org/@ioredis/commands/-/commands-1.4.0.tgz", - "integrity": "sha512-aFT2yemJJo+TZCmieA7qnYGQooOS7QfNmYrzGtsYd3g9j5iDP8AimYYAesf79ohjbLG12XxC4nG5DyEnC88AsQ==", + "version": "1.5.0", + "resolved": "https://registry.npmjs.org/@ioredis/commands/-/commands-1.5.0.tgz", + "integrity": "sha512-eUgLqrMf8nJkZxT24JvVRrQya1vZkQh8BBeYNwGDqa5I0VUi8ACx7uFvAaLxintokpTenkK6DASvo/bvNbBGow==", "license": "MIT" }, "node_modules/@isaacs/balanced-match": { @@ -7502,32 +7502,32 @@ "@tybys/wasm-util": "^0.10.1" } }, - "node_modules/@nestjs/bull": { + "node_modules/@nestjs/bull-shared": { "version": "11.0.4", - "resolved": "https://registry.npmjs.org/@nestjs/bull/-/bull-11.0.4.tgz", - "integrity": "sha512-QVz2PR/rJF/isy7otVnMTSqLf/O71p9Ka7lBZt9Gm+NQFv8fcH2L11GL7TA0whyCcw/kAX5iRepUXz/wed4JoA==", + "resolved": "https://registry.npmjs.org/@nestjs/bull-shared/-/bull-shared-11.0.4.tgz", + "integrity": "sha512-VBJcDHSAzxQnpcDfA0kt9MTGUD1XZzfByV70su0W0eDCQ9aqIEBlzWRW21tv9FG9dIut22ysgDidshdjlnczLw==", "license": "MIT", "dependencies": { - "@nestjs/bull-shared": "^11.0.4", "tslib": "2.8.1" }, "peerDependencies": { - "@nestjs/common": "^8.0.0 || ^9.0.0 || ^10.0.0 || ^11.0.0", - "@nestjs/core": "^8.0.0 || ^9.0.0 || ^10.0.0 || ^11.0.0", - "bull": "^3.3 || ^4.0.0" + "@nestjs/common": "^10.0.0 || ^11.0.0", + "@nestjs/core": "^10.0.0 || ^11.0.0" } }, - "node_modules/@nestjs/bull-shared": { + "node_modules/@nestjs/bullmq": { "version": "11.0.4", - "resolved": "https://registry.npmjs.org/@nestjs/bull-shared/-/bull-shared-11.0.4.tgz", - "integrity": "sha512-VBJcDHSAzxQnpcDfA0kt9MTGUD1XZzfByV70su0W0eDCQ9aqIEBlzWRW21tv9FG9dIut22ysgDidshdjlnczLw==", + "resolved": "https://registry.npmjs.org/@nestjs/bullmq/-/bullmq-11.0.4.tgz", + "integrity": "sha512-wBzK9raAVG0/6NTMdvLGM4/FQ1lsB35/pYS8L6a0SDgkTiLpd7mAjQ8R692oMx5s7IjvgntaZOuTUrKYLNfIkA==", "license": "MIT", "dependencies": { + "@nestjs/bull-shared": "^11.0.4", "tslib": "2.8.1" }, "peerDependencies": { "@nestjs/common": "^10.0.0 || ^11.0.0", - "@nestjs/core": "^10.0.0 || ^11.0.0" + "@nestjs/core": "^10.0.0 || ^11.0.0", + "bullmq": "^3.0.0 || ^4.0.0 || ^5.0.0" } }, "node_modules/@nestjs/cache-manager": { @@ -15636,31 +15636,19 @@ "integrity": "sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ==", "license": "MIT" }, - "node_modules/bull": { - "version": "4.16.5", - "resolved": "https://registry.npmjs.org/bull/-/bull-4.16.5.tgz", - "integrity": "sha512-lDsx2BzkKe7gkCYiT5Acj02DpTwDznl/VNN7Psn7M3USPG7Vs/BaClZJJTAG+ufAR9++N1/NiUTdaFBWDIl5TQ==", + "node_modules/bullmq": { + "version": "5.67.3", + "resolved": "https://registry.npmjs.org/bullmq/-/bullmq-5.67.3.tgz", + "integrity": "sha512-eeQobOJn8M0Rj8tcZCVFLrimZgJQallJH1JpclOoyut2nDNkDwTEPMVcZzLeSR2fGeIVbfJTjU96F563Qkge5A==", "license": "MIT", "dependencies": { - "cron-parser": "^4.9.0", - "get-port": "^5.1.1", - "ioredis": "^5.3.2", - "lodash": "^4.17.21", - "msgpackr": "^1.11.2", - "semver": "^7.5.2", - "uuid": "^8.3.0" - }, - "engines": { - "node": ">=12" - } - }, - "node_modules/bull/node_modules/uuid": { - "version": "8.3.2", - "resolved": "https://registry.npmjs.org/uuid/-/uuid-8.3.2.tgz", - "integrity": "sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==", - "license": "MIT", - "bin": { - "uuid": "dist/bin/uuid" + "cron-parser": "4.9.0", + "ioredis": "5.9.2", + "msgpackr": "1.11.5", + "node-abort-controller": "3.1.1", + "semver": "7.7.3", + "tslib": "2.8.1", + "uuid": "11.1.0" } }, "node_modules/bundle-name": { @@ -20891,18 +20879,6 @@ "node": ">=8.0.0" } }, - "node_modules/get-port": { - "version": "5.1.1", - "resolved": "https://registry.npmjs.org/get-port/-/get-port-5.1.1.tgz", - "integrity": "sha512-g/Q1aTSDOxFpchXC4i8ZWvxA1lnPqx/JHqcpIw0/LX9T8x/GBbi6YnlN5nhaKIFkT8oFsscUKgDJYxfwfS6QsQ==", - "license": "MIT", - "engines": { - "node": ">=8" - }, - "funding": { - "url": "https://github.com/sponsors/sindresorhus" - } - }, "node_modules/get-proto": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/get-proto/-/get-proto-1.0.1.tgz", @@ -22188,12 +22164,12 @@ } }, "node_modules/ioredis": { - "version": "5.8.2", - "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.8.2.tgz", - "integrity": "sha512-C6uC+kleiIMmjViJINWk80sOQw5lEzse1ZmvD+S/s8p8CWapftSaC+kocGTx6xrbrJ4WmYQGC08ffHLr6ToR6Q==", + "version": "5.9.2", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.9.2.tgz", + "integrity": "sha512-tAAg/72/VxOUW7RQSX1pIxJVucYKcjFjfvj60L57jrZpYCHC3XN0WCQ3sNYL4Gmvv+7GPvTAjc+KSdeNuE8oWQ==", "license": "MIT", "dependencies": { - "@ioredis/commands": "1.4.0", + "@ioredis/commands": "1.5.0", "cluster-key-slot": "^1.1.0", "debug": "^4.3.4", "denque": "^2.1.0", @@ -26283,7 +26259,6 @@ "version": "3.1.1", "resolved": "https://registry.npmjs.org/node-abort-controller/-/node-abort-controller-3.1.1.tgz", "integrity": "sha512-AGK2yQKIjRuqnc6VkX2Xj5d+QW8xZ87pa1UK6yA6ouUyuxfHuMP6umE5QK7UmTeOAymo+Zx1Fxiuw9rVx8taHQ==", - "dev": true, "license": "MIT" }, "node_modules/node-addon-api": { @@ -33913,7 +33888,6 @@ "https://github.com/sponsors/ctavan" ], "license": "MIT", - "optional": true, "bin": { "uuid": "dist/esm/bin/uuid" } diff --git a/package.json b/package.json index 70922d840..c9d2d5c25 100644 --- a/package.json +++ b/package.json @@ -70,7 +70,7 @@ "@internationalized/number": "3.6.5", "@ionic/angular": "8.7.8", "@keyv/redis": "4.4.0", - "@nestjs/bull": "11.0.4", + "@nestjs/bullmq": "11.0.4", "@nestjs/cache-manager": "3.0.1", "@nestjs/common": "11.1.8", "@nestjs/config": "4.0.2", @@ -89,7 +89,7 @@ "alphavantage": "2.2.0", "big.js": "7.0.1", "bootstrap": "4.6.2", - "bull": "4.16.5", + "bullmq": "5.67.3", "chart.js": "4.5.1", "chartjs-adapter-date-fns": "3.0.0", "chartjs-chart-treemap": "3.1.0", From b8b4a349d31b722b7bd438eaa10e6f186ebf17c8 Mon Sep 17 00:00:00 2001 From: Thomas Kaul <4159106+dtslvr@users.noreply.github.com> Date: Sun, 8 Feb 2026 17:51:33 +0100 Subject: [PATCH 2/2] Migrate from bull to bullmq --- .../src/app/admin/queue/queue.controller.ts | 6 +- apps/api/src/app/admin/queue/queue.service.ts | 17 ++--- apps/api/src/app/app.module.ts | 4 +- .../calculator/portfolio-calculator.ts | 4 +- .../data-gathering/data-gathering.module.ts | 7 +- .../data-gathering.processor.ts | 67 +++++++++++-------- .../data-gathering/data-gathering.service.ts | 8 +-- .../portfolio-snapshot.module.ts | 16 +---- .../portfolio-snapshot.processor.ts | 34 ++++++---- .../portfolio-snapshot.service.mock.ts | 6 +- .../portfolio-snapshot.service.ts | 18 +++-- .../admin-jobs/admin-jobs.component.ts | 3 +- libs/common/src/lib/config.ts | 17 ++--- .../lib/interfaces/admin-jobs.interface.ts | 4 +- libs/ui/src/lib/services/admin.service.ts | 5 +- 15 files changed, 115 insertions(+), 101 deletions(-) diff --git a/apps/api/src/app/admin/queue/queue.controller.ts b/apps/api/src/app/admin/queue/queue.controller.ts index 060abd247..65d388b19 100644 --- a/apps/api/src/app/admin/queue/queue.controller.ts +++ b/apps/api/src/app/admin/queue/queue.controller.ts @@ -1,5 +1,6 @@ import { HasPermission } from '@ghostfolio/api/decorators/has-permission.decorator'; import { HasPermissionGuard } from '@ghostfolio/api/guards/has-permission.guard'; +import { JobStatusType } from '@ghostfolio/common/config'; import { AdminJobs } from '@ghostfolio/common/interfaces'; import { permissions } from '@ghostfolio/common/permissions'; @@ -12,7 +13,6 @@ import { UseGuards } from '@nestjs/common'; import { AuthGuard } from '@nestjs/passport'; -import { JobStatus } from 'bull'; import { QueueService } from './queue.service'; @@ -26,7 +26,7 @@ export class QueueController { public async deleteJobs( @Query('status') filterByStatus?: string ): Promise { - const status = (filterByStatus?.split(',') as JobStatus[]) ?? undefined; + const status = (filterByStatus?.split(',') as JobStatusType[]) ?? undefined; return this.queueService.deleteJobs({ status }); } @@ -36,7 +36,7 @@ export class QueueController { public async getJobs( @Query('status') filterByStatus?: string ): Promise { - const status = (filterByStatus?.split(',') as JobStatus[]) ?? undefined; + const status = (filterByStatus?.split(',') as JobStatusType[]) ?? undefined; return this.queueService.getJobs({ status }); } diff --git a/apps/api/src/app/admin/queue/queue.service.ts b/apps/api/src/app/admin/queue/queue.service.ts index 747c4d6fb..23355ebdb 100644 --- a/apps/api/src/app/admin/queue/queue.service.ts +++ b/apps/api/src/app/admin/queue/queue.service.ts @@ -1,13 +1,14 @@ import { DATA_GATHERING_QUEUE, + JobStatusType, PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE, QUEUE_JOB_STATUS_LIST } from '@ghostfolio/common/config'; import { AdminJobs } from '@ghostfolio/common/interfaces'; -import { InjectQueue } from '@nestjs/bull'; +import { InjectQueue } from '@nestjs/bullmq'; import { Injectable } from '@nestjs/common'; -import { JobStatus, Queue } from 'bull'; +import { Queue } from 'bullmq'; @Injectable() export class QueueService { @@ -29,15 +30,15 @@ export class QueueService { } public async deleteJobs({ - status = QUEUE_JOB_STATUS_LIST + status = [...QUEUE_JOB_STATUS_LIST] }: { - status?: JobStatus[]; + status?: JobStatusType[]; }) { for (const statusItem of status) { const queueStatus = statusItem === 'waiting' ? 'wait' : statusItem; - await this.dataGatheringQueue.clean(300, queueStatus); - await this.portfolioSnapshotQueue.clean(300, queueStatus); + await this.dataGatheringQueue.clean(300, 0, queueStatus); + await this.portfolioSnapshotQueue.clean(300, 0, queueStatus); } } @@ -53,10 +54,10 @@ export class QueueService { public async getJobs({ limit = 1000, - status = QUEUE_JOB_STATUS_LIST + status = [...QUEUE_JOB_STATUS_LIST] }: { limit?: number; - status?: JobStatus[]; + status?: JobStatusType[]; }): Promise { const [dataGatheringJobs, portfolioSnapshotJobs] = await Promise.all([ this.dataGatheringQueue.getJobs(status), diff --git a/apps/api/src/app/app.module.ts b/apps/api/src/app/app.module.ts index 89f52e1ea..0c6707b91 100644 --- a/apps/api/src/app/app.module.ts +++ b/apps/api/src/app/app.module.ts @@ -14,7 +14,7 @@ import { SUPPORTED_LANGUAGE_CODES } from '@ghostfolio/common/config'; -import { BullModule } from '@nestjs/bull'; +import { BullModule } from '@nestjs/bullmq'; import { MiddlewareConsumer, Module, NestModule } from '@nestjs/common'; import { ConfigModule } from '@nestjs/config'; import { EventEmitterModule } from '@nestjs/event-emitter'; @@ -70,7 +70,7 @@ import { UserModule } from './user/user.module'; AuthModule, BenchmarksModule, BullModule.forRoot({ - redis: { + connection: { db: parseInt(process.env.REDIS_DB ?? '0', 10), host: process.env.REDIS_HOST, password: process.env.REDIS_PASSWORD, diff --git a/apps/api/src/app/portfolio/calculator/portfolio-calculator.ts b/apps/api/src/app/portfolio/calculator/portfolio-calculator.ts index 2e58a4ef5..778354bfd 100644 --- a/apps/api/src/app/portfolio/calculator/portfolio-calculator.ts +++ b/apps/api/src/app/portfolio/calculator/portfolio-calculator.ts @@ -1163,7 +1163,9 @@ export abstract class PortfolioCalculator { const job = await this.portfolioSnapshotService.getJob(jobId); if (job) { - await job.finished(); + await job.waitUntilFinished( + this.portfolioSnapshotService.getQueueEvents() + ); } await this.initialize(); diff --git a/apps/api/src/services/queues/data-gathering/data-gathering.module.ts b/apps/api/src/services/queues/data-gathering/data-gathering.module.ts index b51823476..76454dc8e 100644 --- a/apps/api/src/services/queues/data-gathering/data-gathering.module.ts +++ b/apps/api/src/services/queues/data-gathering/data-gathering.module.ts @@ -9,19 +9,14 @@ import { DataGatheringService } from '@ghostfolio/api/services/queues/data-gathe import { SymbolProfileModule } from '@ghostfolio/api/services/symbol-profile/symbol-profile.module'; import { DATA_GATHERING_QUEUE } from '@ghostfolio/common/config'; -import { BullModule } from '@nestjs/bull'; +import { BullModule } from '@nestjs/bullmq'; import { Module } from '@nestjs/common'; -import ms from 'ms'; import { DataGatheringProcessor } from './data-gathering.processor'; @Module({ imports: [ BullModule.registerQueue({ - limiter: { - duration: ms('4 seconds'), - max: 1 - }, name: DATA_GATHERING_QUEUE }), ConfigurationModule, diff --git a/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts b/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts index 1a4038652..172dd1666 100644 --- a/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts +++ b/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts @@ -5,18 +5,16 @@ import { MarketDataService } from '@ghostfolio/api/services/market-data/market-d import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile/symbol-profile.service'; import { DATA_GATHERING_QUEUE, - DEFAULT_PROCESSOR_GATHER_ASSET_PROFILE_CONCURRENCY, - DEFAULT_PROCESSOR_GATHER_HISTORICAL_MARKET_DATA_CONCURRENCY, GATHER_ASSET_PROFILE_PROCESS_JOB_NAME, GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME } from '@ghostfolio/common/config'; import { DATE_FORMAT, getStartOfUtcDate } from '@ghostfolio/common/helper'; import { AssetProfileIdentifier } from '@ghostfolio/common/interfaces'; -import { Process, Processor } from '@nestjs/bull'; +import { Processor, WorkerHost } from '@nestjs/bullmq'; import { Injectable, Logger } from '@nestjs/common'; import { Prisma } from '@prisma/client'; -import { Job } from 'bull'; +import { Job, UnrecoverableError } from 'bullmq'; import { addDays, format, @@ -26,28 +24,45 @@ import { isBefore, parseISO } from 'date-fns'; +import ms from 'ms'; import { DataGatheringService } from './data-gathering.service'; @Injectable() -@Processor(DATA_GATHERING_QUEUE) -export class DataGatheringProcessor { +@Processor(DATA_GATHERING_QUEUE, { + concurrency: parseInt( + process.env.PROCESSOR_GATHER_ASSET_PROFILE_CONCURRENCY ?? + process.env.PROCESSOR_GATHER_HISTORICAL_MARKET_DATA_CONCURRENCY ?? + '1', + 10 + ), + limiter: { + max: 1, + duration: ms('4 seconds') + } +}) +export class DataGatheringProcessor extends WorkerHost { public constructor( private readonly dataGatheringService: DataGatheringService, private readonly dataProviderService: DataProviderService, private readonly marketDataService: MarketDataService, private readonly symbolProfileService: SymbolProfileService - ) {} - - @Process({ - concurrency: parseInt( - process.env.PROCESSOR_GATHER_ASSET_PROFILE_CONCURRENCY ?? - DEFAULT_PROCESSOR_GATHER_ASSET_PROFILE_CONCURRENCY.toString(), - 10 - ), - name: GATHER_ASSET_PROFILE_PROCESS_JOB_NAME - }) - public async gatherAssetProfile(job: Job) { + ) { + super(); + } + + public async process(job: Job): Promise { + switch (job.name) { + case GATHER_ASSET_PROFILE_PROCESS_JOB_NAME: + return this.gatherAssetProfile(job); + case GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME: + return this.gatherHistoricalMarketData(job); + default: + throw new Error(`Unknown job name: ${job.name}`); + } + } + + private async gatherAssetProfile(job: Job) { const { dataSource, symbol } = job.data; try { @@ -79,7 +94,9 @@ export class DataGatheringProcessor { `DataGatheringProcessor (${GATHER_ASSET_PROFILE_PROCESS_JOB_NAME})` ); - return job.discard(); + throw new UnrecoverableError( + `Asset ${symbol} (${dataSource}) has been delisted` + ); } Logger.error( @@ -91,15 +108,7 @@ export class DataGatheringProcessor { } } - @Process({ - concurrency: parseInt( - process.env.PROCESSOR_GATHER_HISTORICAL_MARKET_DATA_CONCURRENCY ?? - DEFAULT_PROCESSOR_GATHER_HISTORICAL_MARKET_DATA_CONCURRENCY.toString(), - 10 - ), - name: GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME - }) - public async gatherHistoricalMarketData(job: Job) { + private async gatherHistoricalMarketData(job: Job) { const { dataSource, date, force, symbol } = job.data; try { @@ -191,7 +200,9 @@ export class DataGatheringProcessor { `DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME})` ); - return job.discard(); + throw new UnrecoverableError( + `Asset ${symbol} (${dataSource}) has been delisted` + ); } Logger.error( diff --git a/apps/api/src/services/queues/data-gathering/data-gathering.service.ts b/apps/api/src/services/queues/data-gathering/data-gathering.service.ts index cec63c3eb..ac3954655 100644 --- a/apps/api/src/services/queues/data-gathering/data-gathering.service.ts +++ b/apps/api/src/services/queues/data-gathering/data-gathering.service.ts @@ -24,10 +24,10 @@ import { BenchmarkProperty } from '@ghostfolio/common/interfaces'; -import { InjectQueue } from '@nestjs/bull'; +import { InjectQueue } from '@nestjs/bullmq'; import { Inject, Injectable, Logger } from '@nestjs/common'; import { DataSource } from '@prisma/client'; -import { JobOptions, Queue } from 'bull'; +import { JobsOptions, Queue } from 'bullmq'; import { format, min, subDays, subMilliseconds, subYears } from 'date-fns'; import { isEmpty } from 'lodash'; import ms, { StringValue } from 'ms'; @@ -53,13 +53,13 @@ export class DataGatheringService { }: { data: any; name: string; - opts?: JobOptions; + opts?: JobsOptions; }) { return this.dataGatheringQueue.add(name, data, opts); } public async addJobsToQueue( - jobs: { data: any; name: string; opts?: JobOptions }[] + jobs: { data: any; name: string; opts?: JobsOptions }[] ) { return this.dataGatheringQueue.addBulk(jobs); } diff --git a/apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.module.ts b/apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.module.ts index 958636334..34e75e17b 100644 --- a/apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.module.ts +++ b/apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.module.ts @@ -8,12 +8,9 @@ import { DataProviderModule } from '@ghostfolio/api/services/data-provider/data- import { ExchangeRateDataModule } from '@ghostfolio/api/services/exchange-rate-data/exchange-rate-data.module'; import { MarketDataModule } from '@ghostfolio/api/services/market-data/market-data.module'; import { PortfolioSnapshotService } from '@ghostfolio/api/services/queues/portfolio-snapshot/portfolio-snapshot.service'; -import { - DEFAULT_PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_TIMEOUT, - PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE -} from '@ghostfolio/common/config'; +import { PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE } from '@ghostfolio/common/config'; -import { BullModule } from '@nestjs/bull'; +import { BullModule } from '@nestjs/bullmq'; import { Module } from '@nestjs/common'; import { PortfolioSnapshotProcessor } from './portfolio-snapshot.processor'; @@ -23,14 +20,7 @@ import { PortfolioSnapshotProcessor } from './portfolio-snapshot.processor'; imports: [ AccountBalanceModule, BullModule.registerQueue({ - name: PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE, - settings: { - lockDuration: parseInt( - process.env.PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_TIMEOUT ?? - DEFAULT_PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_TIMEOUT.toString(), - 10 - ) - } + name: PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE }), ConfigurationModule, DataProviderModule, diff --git a/apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.processor.ts b/apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.processor.ts index 58a0a8f8a..b206a198e 100644 --- a/apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.processor.ts +++ b/apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.processor.ts @@ -7,37 +7,43 @@ import { ConfigurationService } from '@ghostfolio/api/services/configuration/con import { CACHE_TTL_INFINITE, DEFAULT_PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_CONCURRENCY, + DEFAULT_PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_TIMEOUT, PORTFOLIO_SNAPSHOT_PROCESS_JOB_NAME, PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE } from '@ghostfolio/common/config'; -import { Process, Processor } from '@nestjs/bull'; +import { Processor, WorkerHost } from '@nestjs/bullmq'; import { Injectable, Logger } from '@nestjs/common'; -import { Job } from 'bull'; +import { Job } from 'bullmq'; import { addMilliseconds } from 'date-fns'; import { PortfolioSnapshotQueueJob } from './interfaces/portfolio-snapshot-queue-job.interface'; @Injectable() -@Processor(PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE) -export class PortfolioSnapshotProcessor { +@Processor(PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE, { + concurrency: parseInt( + process.env.PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_CONCURRENCY ?? + DEFAULT_PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_CONCURRENCY.toString(), + 10 + ), + lockDuration: parseInt( + process.env.PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_TIMEOUT ?? + DEFAULT_PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_TIMEOUT.toString(), + 10 + ) +}) +export class PortfolioSnapshotProcessor extends WorkerHost { public constructor( private readonly accountBalanceService: AccountBalanceService, private readonly calculatorFactory: PortfolioCalculatorFactory, private readonly configurationService: ConfigurationService, private readonly orderService: OrderService, private readonly redisCacheService: RedisCacheService - ) {} + ) { + super(); + } - @Process({ - concurrency: parseInt( - process.env.PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_CONCURRENCY ?? - DEFAULT_PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_CONCURRENCY.toString(), - 10 - ), - name: PORTFOLIO_SNAPSHOT_PROCESS_JOB_NAME - }) - public async calculatePortfolioSnapshot(job: Job) { + public async process(job: Job): Promise { try { const startTime = performance.now(); diff --git a/apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.service.mock.ts b/apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.service.mock.ts index 898718106..e1c1af97a 100644 --- a/apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.service.mock.ts +++ b/apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.service.mock.ts @@ -1,4 +1,4 @@ -import { Job, JobOptions } from 'bull'; +import { Job, JobsOptions } from 'bullmq'; import { setTimeout } from 'timers/promises'; import { PortfolioSnapshotQueueJob } from './interfaces/portfolio-snapshot-queue-job.interface'; @@ -9,10 +9,10 @@ export const PortfolioSnapshotServiceMock = { }: { data: PortfolioSnapshotQueueJob; name: string; - opts?: JobOptions; + opts?: JobsOptions; }): Promise> { const mockJob: Partial> = { - finished: async () => { + waitUntilFinished: async () => { await setTimeout(100); return Promise.resolve(); diff --git a/apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.service.ts b/apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.service.ts index d7449a9cc..cbd89b9ca 100644 --- a/apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.service.ts +++ b/apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.service.ts @@ -1,17 +1,23 @@ import { PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE } from '@ghostfolio/common/config'; -import { InjectQueue } from '@nestjs/bull'; +import { InjectQueue } from '@nestjs/bullmq'; import { Injectable } from '@nestjs/common'; -import { JobOptions, Queue } from 'bull'; +import { JobsOptions, Queue, QueueEvents } from 'bullmq'; import { PortfolioSnapshotQueueJob } from './interfaces/portfolio-snapshot-queue-job.interface'; @Injectable() export class PortfolioSnapshotService { + private readonly queueEvents: QueueEvents; + public constructor( @InjectQueue(PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE) private readonly portfolioSnapshotQueue: Queue - ) {} + ) { + this.queueEvents = new QueueEvents(PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE, { + connection: this.portfolioSnapshotQueue.opts?.connection as any + }); + } public async addJobToQueue({ data, @@ -20,7 +26,7 @@ export class PortfolioSnapshotService { }: { data: PortfolioSnapshotQueueJob; name: string; - opts?: JobOptions; + opts?: JobsOptions; }) { return this.portfolioSnapshotQueue.add(name, data, opts); } @@ -28,4 +34,8 @@ export class PortfolioSnapshotService { public async getJob(jobId: string) { return this.portfolioSnapshotQueue.getJob(jobId); } + + public getQueueEvents() { + return this.queueEvents; + } } diff --git a/apps/client/src/app/components/admin-jobs/admin-jobs.component.ts b/apps/client/src/app/components/admin-jobs/admin-jobs.component.ts index de70a7b6e..6a89af22d 100644 --- a/apps/client/src/app/components/admin-jobs/admin-jobs.component.ts +++ b/apps/client/src/app/components/admin-jobs/admin-jobs.component.ts @@ -31,7 +31,6 @@ import { MatSelectModule } from '@angular/material/select'; import { MatSort, MatSortModule } from '@angular/material/sort'; import { MatTableDataSource, MatTableModule } from '@angular/material/table'; import { IonIcon } from '@ionic/angular/standalone'; -import { JobStatus } from 'bull'; import { addIcons } from 'ionicons'; import { alertCircleOutline, @@ -194,7 +193,7 @@ export class GfAdminJobsComponent implements OnDestroy, OnInit { this.unsubscribeSubject.complete(); } - private fetchJobs(aStatus?: JobStatus[]) { + private fetchJobs(aStatus?: string[]) { this.isLoading = true; this.adminService diff --git a/libs/common/src/lib/config.ts b/libs/common/src/lib/config.ts index b558ccc42..4ab67991b 100644 --- a/libs/common/src/lib/config.ts +++ b/libs/common/src/lib/config.ts @@ -1,5 +1,5 @@ import { AssetClass, AssetSubClass, DataSource, Type } from '@prisma/client'; -import { JobOptions, JobStatus } from 'bull'; +import { JobsOptions } from 'bullmq'; import ms from 'ms'; export const ghostfolioPrefix = 'GF'; @@ -56,7 +56,7 @@ export const CACHE_TTL_INFINITE = 0; export const DATA_GATHERING_QUEUE = 'DATA_GATHERING_QUEUE'; export const DATA_GATHERING_QUEUE_PRIORITY_HIGH = 1; -export const DATA_GATHERING_QUEUE_PRIORITY_LOW = Number.MAX_SAFE_INTEGER; +export const DATA_GATHERING_QUEUE_PRIORITY_LOW = 2_097_152; export const DATA_GATHERING_QUEUE_PRIORITY_MEDIUM = Math.round( DATA_GATHERING_QUEUE_PRIORITY_LOW / 2 ); @@ -64,8 +64,7 @@ export const DATA_GATHERING_QUEUE_PRIORITY_MEDIUM = Math.round( export const PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE = 'PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE'; export const PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE_PRIORITY_HIGH = 1; -export const PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE_PRIORITY_LOW = - Number.MAX_SAFE_INTEGER; +export const PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE_PRIORITY_LOW = 2_097_152; export const DEFAULT_CURRENCY = 'USD'; export const DEFAULT_DATE_FORMAT_MONTH_YEAR = 'MMM yyyy'; @@ -150,7 +149,7 @@ export const DERIVED_CURRENCIES = [ ]; export const GATHER_ASSET_PROFILE_PROCESS_JOB_NAME = 'GATHER_ASSET_PROFILE'; -export const GATHER_ASSET_PROFILE_PROCESS_JOB_OPTIONS: JobOptions = { +export const GATHER_ASSET_PROFILE_PROCESS_JOB_OPTIONS: JobsOptions = { attempts: 12, backoff: { delay: ms('1 minute'), @@ -161,7 +160,7 @@ export const GATHER_ASSET_PROFILE_PROCESS_JOB_OPTIONS: JobOptions = { export const GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME = 'GATHER_HISTORICAL_MARKET_DATA'; -export const GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_OPTIONS: JobOptions = { +export const GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_OPTIONS: JobsOptions = { attempts: 12, backoff: { delay: ms('1 minute'), @@ -177,7 +176,7 @@ export const INVESTMENT_ACTIVITY_TYPES = [ ] as Type[]; export const PORTFOLIO_SNAPSHOT_PROCESS_JOB_NAME = 'PORTFOLIO'; -export const PORTFOLIO_SNAPSHOT_PROCESS_JOB_OPTIONS: JobOptions = { +export const PORTFOLIO_SNAPSHOT_PROCESS_JOB_OPTIONS: JobsOptions = { removeOnComplete: true }; @@ -219,7 +218,9 @@ export const QUEUE_JOB_STATUS_LIST = [ 'failed', 'paused', 'waiting' -] as JobStatus[]; +] as const; + +export type JobStatusType = (typeof QUEUE_JOB_STATUS_LIST)[number]; export const REPLACE_NAME_PARTS = [ 'Amundi Index Solutions -', diff --git a/libs/common/src/lib/interfaces/admin-jobs.interface.ts b/libs/common/src/lib/interfaces/admin-jobs.interface.ts index b4c91ebc0..8f0a05d1f 100644 --- a/libs/common/src/lib/interfaces/admin-jobs.interface.ts +++ b/libs/common/src/lib/interfaces/admin-jobs.interface.ts @@ -1,4 +1,4 @@ -import { Job, JobStatus } from 'bull'; +import { Job } from 'bullmq'; export interface AdminJobs { jobs: (Pick< @@ -12,6 +12,6 @@ export interface AdminJobs { | 'stacktrace' | 'timestamp' > & { - state: JobStatus | 'stuck'; + state: string; })[]; } diff --git a/libs/ui/src/lib/services/admin.service.ts b/libs/ui/src/lib/services/admin.service.ts index 145f134e3..38e288839 100644 --- a/libs/ui/src/lib/services/admin.service.ts +++ b/libs/ui/src/lib/services/admin.service.ts @@ -28,7 +28,6 @@ import { HttpClient, HttpHeaders, HttpParams } from '@angular/common/http'; import { Inject, Injectable } from '@angular/core'; import { SortDirection } from '@angular/material/sort'; import { DataSource, MarketData, Platform } from '@prisma/client'; -import { JobStatus } from 'bull'; @Injectable({ providedIn: 'root' @@ -51,7 +50,7 @@ export class AdminService { return this.http.delete(`/api/v1/admin/queue/job/${aId}`); } - public deleteJobs({ status }: { status: JobStatus[] }) { + public deleteJobs({ status }: { status: string[] }) { let params = new HttpParams(); if (status?.length > 0) { @@ -129,7 +128,7 @@ export class AdminService { ); } - public fetchJobs({ status }: { status?: JobStatus[] }) { + public fetchJobs({ status }: { status?: string[] }) { let params = new HttpParams(); if (status?.length > 0) {