Browse Source

Migrate from bull to bullmq

pull/6296/head
Thomas Kaul 2 days ago
parent
commit
b8b4a349d3
  1. 6
      apps/api/src/app/admin/queue/queue.controller.ts
  2. 17
      apps/api/src/app/admin/queue/queue.service.ts
  3. 4
      apps/api/src/app/app.module.ts
  4. 4
      apps/api/src/app/portfolio/calculator/portfolio-calculator.ts
  5. 7
      apps/api/src/services/queues/data-gathering/data-gathering.module.ts
  6. 67
      apps/api/src/services/queues/data-gathering/data-gathering.processor.ts
  7. 8
      apps/api/src/services/queues/data-gathering/data-gathering.service.ts
  8. 16
      apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.module.ts
  9. 34
      apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.processor.ts
  10. 6
      apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.service.mock.ts
  11. 18
      apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.service.ts
  12. 3
      apps/client/src/app/components/admin-jobs/admin-jobs.component.ts
  13. 17
      libs/common/src/lib/config.ts
  14. 4
      libs/common/src/lib/interfaces/admin-jobs.interface.ts
  15. 5
      libs/ui/src/lib/services/admin.service.ts

6
apps/api/src/app/admin/queue/queue.controller.ts

@ -1,5 +1,6 @@
import { HasPermission } from '@ghostfolio/api/decorators/has-permission.decorator'; import { HasPermission } from '@ghostfolio/api/decorators/has-permission.decorator';
import { HasPermissionGuard } from '@ghostfolio/api/guards/has-permission.guard'; import { HasPermissionGuard } from '@ghostfolio/api/guards/has-permission.guard';
import { JobStatusType } from '@ghostfolio/common/config';
import { AdminJobs } from '@ghostfolio/common/interfaces'; import { AdminJobs } from '@ghostfolio/common/interfaces';
import { permissions } from '@ghostfolio/common/permissions'; import { permissions } from '@ghostfolio/common/permissions';
@ -12,7 +13,6 @@ import {
UseGuards UseGuards
} from '@nestjs/common'; } from '@nestjs/common';
import { AuthGuard } from '@nestjs/passport'; import { AuthGuard } from '@nestjs/passport';
import { JobStatus } from 'bull';
import { QueueService } from './queue.service'; import { QueueService } from './queue.service';
@ -26,7 +26,7 @@ export class QueueController {
public async deleteJobs( public async deleteJobs(
@Query('status') filterByStatus?: string @Query('status') filterByStatus?: string
): Promise<void> { ): Promise<void> {
const status = (filterByStatus?.split(',') as JobStatus[]) ?? undefined; const status = (filterByStatus?.split(',') as JobStatusType[]) ?? undefined;
return this.queueService.deleteJobs({ status }); return this.queueService.deleteJobs({ status });
} }
@ -36,7 +36,7 @@ export class QueueController {
public async getJobs( public async getJobs(
@Query('status') filterByStatus?: string @Query('status') filterByStatus?: string
): Promise<AdminJobs> { ): Promise<AdminJobs> {
const status = (filterByStatus?.split(',') as JobStatus[]) ?? undefined; const status = (filterByStatus?.split(',') as JobStatusType[]) ?? undefined;
return this.queueService.getJobs({ status }); return this.queueService.getJobs({ status });
} }

17
apps/api/src/app/admin/queue/queue.service.ts

@ -1,13 +1,14 @@
import { import {
DATA_GATHERING_QUEUE, DATA_GATHERING_QUEUE,
JobStatusType,
PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE, PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE,
QUEUE_JOB_STATUS_LIST QUEUE_JOB_STATUS_LIST
} from '@ghostfolio/common/config'; } from '@ghostfolio/common/config';
import { AdminJobs } from '@ghostfolio/common/interfaces'; import { AdminJobs } from '@ghostfolio/common/interfaces';
import { InjectQueue } from '@nestjs/bull'; import { InjectQueue } from '@nestjs/bullmq';
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { JobStatus, Queue } from 'bull'; import { Queue } from 'bullmq';
@Injectable() @Injectable()
export class QueueService { export class QueueService {
@ -29,15 +30,15 @@ export class QueueService {
} }
public async deleteJobs({ public async deleteJobs({
status = QUEUE_JOB_STATUS_LIST status = [...QUEUE_JOB_STATUS_LIST]
}: { }: {
status?: JobStatus[]; status?: JobStatusType[];
}) { }) {
for (const statusItem of status) { for (const statusItem of status) {
const queueStatus = statusItem === 'waiting' ? 'wait' : statusItem; const queueStatus = statusItem === 'waiting' ? 'wait' : statusItem;
await this.dataGatheringQueue.clean(300, queueStatus); await this.dataGatheringQueue.clean(300, 0, queueStatus);
await this.portfolioSnapshotQueue.clean(300, queueStatus); await this.portfolioSnapshotQueue.clean(300, 0, queueStatus);
} }
} }
@ -53,10 +54,10 @@ export class QueueService {
public async getJobs({ public async getJobs({
limit = 1000, limit = 1000,
status = QUEUE_JOB_STATUS_LIST status = [...QUEUE_JOB_STATUS_LIST]
}: { }: {
limit?: number; limit?: number;
status?: JobStatus[]; status?: JobStatusType[];
}): Promise<AdminJobs> { }): Promise<AdminJobs> {
const [dataGatheringJobs, portfolioSnapshotJobs] = await Promise.all([ const [dataGatheringJobs, portfolioSnapshotJobs] = await Promise.all([
this.dataGatheringQueue.getJobs(status), this.dataGatheringQueue.getJobs(status),

4
apps/api/src/app/app.module.ts

@ -14,7 +14,7 @@ import {
SUPPORTED_LANGUAGE_CODES SUPPORTED_LANGUAGE_CODES
} from '@ghostfolio/common/config'; } from '@ghostfolio/common/config';
import { BullModule } from '@nestjs/bull'; import { BullModule } from '@nestjs/bullmq';
import { MiddlewareConsumer, Module, NestModule } from '@nestjs/common'; import { MiddlewareConsumer, Module, NestModule } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config'; import { ConfigModule } from '@nestjs/config';
import { EventEmitterModule } from '@nestjs/event-emitter'; import { EventEmitterModule } from '@nestjs/event-emitter';
@ -70,7 +70,7 @@ import { UserModule } from './user/user.module';
AuthModule, AuthModule,
BenchmarksModule, BenchmarksModule,
BullModule.forRoot({ BullModule.forRoot({
redis: { connection: {
db: parseInt(process.env.REDIS_DB ?? '0', 10), db: parseInt(process.env.REDIS_DB ?? '0', 10),
host: process.env.REDIS_HOST, host: process.env.REDIS_HOST,
password: process.env.REDIS_PASSWORD, password: process.env.REDIS_PASSWORD,

4
apps/api/src/app/portfolio/calculator/portfolio-calculator.ts

@ -1163,7 +1163,9 @@ export abstract class PortfolioCalculator {
const job = await this.portfolioSnapshotService.getJob(jobId); const job = await this.portfolioSnapshotService.getJob(jobId);
if (job) { if (job) {
await job.finished(); await job.waitUntilFinished(
this.portfolioSnapshotService.getQueueEvents()
);
} }
await this.initialize(); await this.initialize();

7
apps/api/src/services/queues/data-gathering/data-gathering.module.ts

@ -9,19 +9,14 @@ import { DataGatheringService } from '@ghostfolio/api/services/queues/data-gathe
import { SymbolProfileModule } from '@ghostfolio/api/services/symbol-profile/symbol-profile.module'; import { SymbolProfileModule } from '@ghostfolio/api/services/symbol-profile/symbol-profile.module';
import { DATA_GATHERING_QUEUE } from '@ghostfolio/common/config'; import { DATA_GATHERING_QUEUE } from '@ghostfolio/common/config';
import { BullModule } from '@nestjs/bull'; import { BullModule } from '@nestjs/bullmq';
import { Module } from '@nestjs/common'; import { Module } from '@nestjs/common';
import ms from 'ms';
import { DataGatheringProcessor } from './data-gathering.processor'; import { DataGatheringProcessor } from './data-gathering.processor';
@Module({ @Module({
imports: [ imports: [
BullModule.registerQueue({ BullModule.registerQueue({
limiter: {
duration: ms('4 seconds'),
max: 1
},
name: DATA_GATHERING_QUEUE name: DATA_GATHERING_QUEUE
}), }),
ConfigurationModule, ConfigurationModule,

67
apps/api/src/services/queues/data-gathering/data-gathering.processor.ts

@ -5,18 +5,16 @@ import { MarketDataService } from '@ghostfolio/api/services/market-data/market-d
import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile/symbol-profile.service'; import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile/symbol-profile.service';
import { import {
DATA_GATHERING_QUEUE, DATA_GATHERING_QUEUE,
DEFAULT_PROCESSOR_GATHER_ASSET_PROFILE_CONCURRENCY,
DEFAULT_PROCESSOR_GATHER_HISTORICAL_MARKET_DATA_CONCURRENCY,
GATHER_ASSET_PROFILE_PROCESS_JOB_NAME, GATHER_ASSET_PROFILE_PROCESS_JOB_NAME,
GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME
} from '@ghostfolio/common/config'; } from '@ghostfolio/common/config';
import { DATE_FORMAT, getStartOfUtcDate } from '@ghostfolio/common/helper'; import { DATE_FORMAT, getStartOfUtcDate } from '@ghostfolio/common/helper';
import { AssetProfileIdentifier } from '@ghostfolio/common/interfaces'; import { AssetProfileIdentifier } from '@ghostfolio/common/interfaces';
import { Process, Processor } from '@nestjs/bull'; import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Injectable, Logger } from '@nestjs/common'; import { Injectable, Logger } from '@nestjs/common';
import { Prisma } from '@prisma/client'; import { Prisma } from '@prisma/client';
import { Job } from 'bull'; import { Job, UnrecoverableError } from 'bullmq';
import { import {
addDays, addDays,
format, format,
@ -26,28 +24,45 @@ import {
isBefore, isBefore,
parseISO parseISO
} from 'date-fns'; } from 'date-fns';
import ms from 'ms';
import { DataGatheringService } from './data-gathering.service'; import { DataGatheringService } from './data-gathering.service';
@Injectable() @Injectable()
@Processor(DATA_GATHERING_QUEUE) @Processor(DATA_GATHERING_QUEUE, {
export class DataGatheringProcessor { concurrency: parseInt(
process.env.PROCESSOR_GATHER_ASSET_PROFILE_CONCURRENCY ??
process.env.PROCESSOR_GATHER_HISTORICAL_MARKET_DATA_CONCURRENCY ??
'1',
10
),
limiter: {
max: 1,
duration: ms('4 seconds')
}
})
export class DataGatheringProcessor extends WorkerHost {
public constructor( public constructor(
private readonly dataGatheringService: DataGatheringService, private readonly dataGatheringService: DataGatheringService,
private readonly dataProviderService: DataProviderService, private readonly dataProviderService: DataProviderService,
private readonly marketDataService: MarketDataService, private readonly marketDataService: MarketDataService,
private readonly symbolProfileService: SymbolProfileService private readonly symbolProfileService: SymbolProfileService
) {} ) {
super();
@Process({ }
concurrency: parseInt(
process.env.PROCESSOR_GATHER_ASSET_PROFILE_CONCURRENCY ?? public async process(job: Job): Promise<any> {
DEFAULT_PROCESSOR_GATHER_ASSET_PROFILE_CONCURRENCY.toString(), switch (job.name) {
10 case GATHER_ASSET_PROFILE_PROCESS_JOB_NAME:
), return this.gatherAssetProfile(job);
name: GATHER_ASSET_PROFILE_PROCESS_JOB_NAME case GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME:
}) return this.gatherHistoricalMarketData(job);
public async gatherAssetProfile(job: Job<AssetProfileIdentifier>) { default:
throw new Error(`Unknown job name: ${job.name}`);
}
}
private async gatherAssetProfile(job: Job<AssetProfileIdentifier>) {
const { dataSource, symbol } = job.data; const { dataSource, symbol } = job.data;
try { try {
@ -79,7 +94,9 @@ export class DataGatheringProcessor {
`DataGatheringProcessor (${GATHER_ASSET_PROFILE_PROCESS_JOB_NAME})` `DataGatheringProcessor (${GATHER_ASSET_PROFILE_PROCESS_JOB_NAME})`
); );
return job.discard(); throw new UnrecoverableError(
`Asset ${symbol} (${dataSource}) has been delisted`
);
} }
Logger.error( Logger.error(
@ -91,15 +108,7 @@ export class DataGatheringProcessor {
} }
} }
@Process({ private async gatherHistoricalMarketData(job: Job<DataGatheringItem>) {
concurrency: parseInt(
process.env.PROCESSOR_GATHER_HISTORICAL_MARKET_DATA_CONCURRENCY ??
DEFAULT_PROCESSOR_GATHER_HISTORICAL_MARKET_DATA_CONCURRENCY.toString(),
10
),
name: GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME
})
public async gatherHistoricalMarketData(job: Job<DataGatheringItem>) {
const { dataSource, date, force, symbol } = job.data; const { dataSource, date, force, symbol } = job.data;
try { try {
@ -191,7 +200,9 @@ export class DataGatheringProcessor {
`DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME})` `DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME})`
); );
return job.discard(); throw new UnrecoverableError(
`Asset ${symbol} (${dataSource}) has been delisted`
);
} }
Logger.error( Logger.error(

8
apps/api/src/services/queues/data-gathering/data-gathering.service.ts

@ -24,10 +24,10 @@ import {
BenchmarkProperty BenchmarkProperty
} from '@ghostfolio/common/interfaces'; } from '@ghostfolio/common/interfaces';
import { InjectQueue } from '@nestjs/bull'; import { InjectQueue } from '@nestjs/bullmq';
import { Inject, Injectable, Logger } from '@nestjs/common'; import { Inject, Injectable, Logger } from '@nestjs/common';
import { DataSource } from '@prisma/client'; import { DataSource } from '@prisma/client';
import { JobOptions, Queue } from 'bull'; import { JobsOptions, Queue } from 'bullmq';
import { format, min, subDays, subMilliseconds, subYears } from 'date-fns'; import { format, min, subDays, subMilliseconds, subYears } from 'date-fns';
import { isEmpty } from 'lodash'; import { isEmpty } from 'lodash';
import ms, { StringValue } from 'ms'; import ms, { StringValue } from 'ms';
@ -53,13 +53,13 @@ export class DataGatheringService {
}: { }: {
data: any; data: any;
name: string; name: string;
opts?: JobOptions; opts?: JobsOptions;
}) { }) {
return this.dataGatheringQueue.add(name, data, opts); return this.dataGatheringQueue.add(name, data, opts);
} }
public async addJobsToQueue( public async addJobsToQueue(
jobs: { data: any; name: string; opts?: JobOptions }[] jobs: { data: any; name: string; opts?: JobsOptions }[]
) { ) {
return this.dataGatheringQueue.addBulk(jobs); return this.dataGatheringQueue.addBulk(jobs);
} }

16
apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.module.ts

@ -8,12 +8,9 @@ import { DataProviderModule } from '@ghostfolio/api/services/data-provider/data-
import { ExchangeRateDataModule } from '@ghostfolio/api/services/exchange-rate-data/exchange-rate-data.module'; import { ExchangeRateDataModule } from '@ghostfolio/api/services/exchange-rate-data/exchange-rate-data.module';
import { MarketDataModule } from '@ghostfolio/api/services/market-data/market-data.module'; import { MarketDataModule } from '@ghostfolio/api/services/market-data/market-data.module';
import { PortfolioSnapshotService } from '@ghostfolio/api/services/queues/portfolio-snapshot/portfolio-snapshot.service'; import { PortfolioSnapshotService } from '@ghostfolio/api/services/queues/portfolio-snapshot/portfolio-snapshot.service';
import { import { PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE } from '@ghostfolio/common/config';
DEFAULT_PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_TIMEOUT,
PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE
} from '@ghostfolio/common/config';
import { BullModule } from '@nestjs/bull'; import { BullModule } from '@nestjs/bullmq';
import { Module } from '@nestjs/common'; import { Module } from '@nestjs/common';
import { PortfolioSnapshotProcessor } from './portfolio-snapshot.processor'; import { PortfolioSnapshotProcessor } from './portfolio-snapshot.processor';
@ -23,14 +20,7 @@ import { PortfolioSnapshotProcessor } from './portfolio-snapshot.processor';
imports: [ imports: [
AccountBalanceModule, AccountBalanceModule,
BullModule.registerQueue({ BullModule.registerQueue({
name: PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE, name: PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE
settings: {
lockDuration: parseInt(
process.env.PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_TIMEOUT ??
DEFAULT_PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_TIMEOUT.toString(),
10
)
}
}), }),
ConfigurationModule, ConfigurationModule,
DataProviderModule, DataProviderModule,

34
apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.processor.ts

@ -7,37 +7,43 @@ import { ConfigurationService } from '@ghostfolio/api/services/configuration/con
import { import {
CACHE_TTL_INFINITE, CACHE_TTL_INFINITE,
DEFAULT_PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_CONCURRENCY, DEFAULT_PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_CONCURRENCY,
DEFAULT_PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_TIMEOUT,
PORTFOLIO_SNAPSHOT_PROCESS_JOB_NAME, PORTFOLIO_SNAPSHOT_PROCESS_JOB_NAME,
PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE
} from '@ghostfolio/common/config'; } from '@ghostfolio/common/config';
import { Process, Processor } from '@nestjs/bull'; import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Injectable, Logger } from '@nestjs/common'; import { Injectable, Logger } from '@nestjs/common';
import { Job } from 'bull'; import { Job } from 'bullmq';
import { addMilliseconds } from 'date-fns'; import { addMilliseconds } from 'date-fns';
import { PortfolioSnapshotQueueJob } from './interfaces/portfolio-snapshot-queue-job.interface'; import { PortfolioSnapshotQueueJob } from './interfaces/portfolio-snapshot-queue-job.interface';
@Injectable() @Injectable()
@Processor(PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE) @Processor(PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE, {
export class PortfolioSnapshotProcessor { concurrency: parseInt(
process.env.PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_CONCURRENCY ??
DEFAULT_PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_CONCURRENCY.toString(),
10
),
lockDuration: parseInt(
process.env.PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_TIMEOUT ??
DEFAULT_PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_TIMEOUT.toString(),
10
)
})
export class PortfolioSnapshotProcessor extends WorkerHost {
public constructor( public constructor(
private readonly accountBalanceService: AccountBalanceService, private readonly accountBalanceService: AccountBalanceService,
private readonly calculatorFactory: PortfolioCalculatorFactory, private readonly calculatorFactory: PortfolioCalculatorFactory,
private readonly configurationService: ConfigurationService, private readonly configurationService: ConfigurationService,
private readonly orderService: OrderService, private readonly orderService: OrderService,
private readonly redisCacheService: RedisCacheService private readonly redisCacheService: RedisCacheService
) {} ) {
super();
}
@Process({ public async process(job: Job<PortfolioSnapshotQueueJob>): Promise<any> {
concurrency: parseInt(
process.env.PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_CONCURRENCY ??
DEFAULT_PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_CONCURRENCY.toString(),
10
),
name: PORTFOLIO_SNAPSHOT_PROCESS_JOB_NAME
})
public async calculatePortfolioSnapshot(job: Job<PortfolioSnapshotQueueJob>) {
try { try {
const startTime = performance.now(); const startTime = performance.now();

6
apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.service.mock.ts

@ -1,4 +1,4 @@
import { Job, JobOptions } from 'bull'; import { Job, JobsOptions } from 'bullmq';
import { setTimeout } from 'timers/promises'; import { setTimeout } from 'timers/promises';
import { PortfolioSnapshotQueueJob } from './interfaces/portfolio-snapshot-queue-job.interface'; import { PortfolioSnapshotQueueJob } from './interfaces/portfolio-snapshot-queue-job.interface';
@ -9,10 +9,10 @@ export const PortfolioSnapshotServiceMock = {
}: { }: {
data: PortfolioSnapshotQueueJob; data: PortfolioSnapshotQueueJob;
name: string; name: string;
opts?: JobOptions; opts?: JobsOptions;
}): Promise<Job<any>> { }): Promise<Job<any>> {
const mockJob: Partial<Job<any>> = { const mockJob: Partial<Job<any>> = {
finished: async () => { waitUntilFinished: async () => {
await setTimeout(100); await setTimeout(100);
return Promise.resolve(); return Promise.resolve();

18
apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.service.ts

@ -1,17 +1,23 @@
import { PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE } from '@ghostfolio/common/config'; import { PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE } from '@ghostfolio/common/config';
import { InjectQueue } from '@nestjs/bull'; import { InjectQueue } from '@nestjs/bullmq';
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { JobOptions, Queue } from 'bull'; import { JobsOptions, Queue, QueueEvents } from 'bullmq';
import { PortfolioSnapshotQueueJob } from './interfaces/portfolio-snapshot-queue-job.interface'; import { PortfolioSnapshotQueueJob } from './interfaces/portfolio-snapshot-queue-job.interface';
@Injectable() @Injectable()
export class PortfolioSnapshotService { export class PortfolioSnapshotService {
private readonly queueEvents: QueueEvents;
public constructor( public constructor(
@InjectQueue(PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE) @InjectQueue(PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE)
private readonly portfolioSnapshotQueue: Queue private readonly portfolioSnapshotQueue: Queue
) {} ) {
this.queueEvents = new QueueEvents(PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE, {
connection: this.portfolioSnapshotQueue.opts?.connection as any
});
}
public async addJobToQueue({ public async addJobToQueue({
data, data,
@ -20,7 +26,7 @@ export class PortfolioSnapshotService {
}: { }: {
data: PortfolioSnapshotQueueJob; data: PortfolioSnapshotQueueJob;
name: string; name: string;
opts?: JobOptions; opts?: JobsOptions;
}) { }) {
return this.portfolioSnapshotQueue.add(name, data, opts); return this.portfolioSnapshotQueue.add(name, data, opts);
} }
@ -28,4 +34,8 @@ export class PortfolioSnapshotService {
public async getJob(jobId: string) { public async getJob(jobId: string) {
return this.portfolioSnapshotQueue.getJob(jobId); return this.portfolioSnapshotQueue.getJob(jobId);
} }
public getQueueEvents() {
return this.queueEvents;
}
} }

3
apps/client/src/app/components/admin-jobs/admin-jobs.component.ts

@ -31,7 +31,6 @@ import { MatSelectModule } from '@angular/material/select';
import { MatSort, MatSortModule } from '@angular/material/sort'; import { MatSort, MatSortModule } from '@angular/material/sort';
import { MatTableDataSource, MatTableModule } from '@angular/material/table'; import { MatTableDataSource, MatTableModule } from '@angular/material/table';
import { IonIcon } from '@ionic/angular/standalone'; import { IonIcon } from '@ionic/angular/standalone';
import { JobStatus } from 'bull';
import { addIcons } from 'ionicons'; import { addIcons } from 'ionicons';
import { import {
alertCircleOutline, alertCircleOutline,
@ -194,7 +193,7 @@ export class GfAdminJobsComponent implements OnDestroy, OnInit {
this.unsubscribeSubject.complete(); this.unsubscribeSubject.complete();
} }
private fetchJobs(aStatus?: JobStatus[]) { private fetchJobs(aStatus?: string[]) {
this.isLoading = true; this.isLoading = true;
this.adminService this.adminService

17
libs/common/src/lib/config.ts

@ -1,5 +1,5 @@
import { AssetClass, AssetSubClass, DataSource, Type } from '@prisma/client'; import { AssetClass, AssetSubClass, DataSource, Type } from '@prisma/client';
import { JobOptions, JobStatus } from 'bull'; import { JobsOptions } from 'bullmq';
import ms from 'ms'; import ms from 'ms';
export const ghostfolioPrefix = 'GF'; export const ghostfolioPrefix = 'GF';
@ -56,7 +56,7 @@ export const CACHE_TTL_INFINITE = 0;
export const DATA_GATHERING_QUEUE = 'DATA_GATHERING_QUEUE'; export const DATA_GATHERING_QUEUE = 'DATA_GATHERING_QUEUE';
export const DATA_GATHERING_QUEUE_PRIORITY_HIGH = 1; export const DATA_GATHERING_QUEUE_PRIORITY_HIGH = 1;
export const DATA_GATHERING_QUEUE_PRIORITY_LOW = Number.MAX_SAFE_INTEGER; export const DATA_GATHERING_QUEUE_PRIORITY_LOW = 2_097_152;
export const DATA_GATHERING_QUEUE_PRIORITY_MEDIUM = Math.round( export const DATA_GATHERING_QUEUE_PRIORITY_MEDIUM = Math.round(
DATA_GATHERING_QUEUE_PRIORITY_LOW / 2 DATA_GATHERING_QUEUE_PRIORITY_LOW / 2
); );
@ -64,8 +64,7 @@ export const DATA_GATHERING_QUEUE_PRIORITY_MEDIUM = Math.round(
export const PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE = export const PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE =
'PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE'; 'PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE';
export const PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE_PRIORITY_HIGH = 1; export const PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE_PRIORITY_HIGH = 1;
export const PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE_PRIORITY_LOW = export const PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE_PRIORITY_LOW = 2_097_152;
Number.MAX_SAFE_INTEGER;
export const DEFAULT_CURRENCY = 'USD'; export const DEFAULT_CURRENCY = 'USD';
export const DEFAULT_DATE_FORMAT_MONTH_YEAR = 'MMM yyyy'; export const DEFAULT_DATE_FORMAT_MONTH_YEAR = 'MMM yyyy';
@ -150,7 +149,7 @@ export const DERIVED_CURRENCIES = [
]; ];
export const GATHER_ASSET_PROFILE_PROCESS_JOB_NAME = 'GATHER_ASSET_PROFILE'; export const GATHER_ASSET_PROFILE_PROCESS_JOB_NAME = 'GATHER_ASSET_PROFILE';
export const GATHER_ASSET_PROFILE_PROCESS_JOB_OPTIONS: JobOptions = { export const GATHER_ASSET_PROFILE_PROCESS_JOB_OPTIONS: JobsOptions = {
attempts: 12, attempts: 12,
backoff: { backoff: {
delay: ms('1 minute'), delay: ms('1 minute'),
@ -161,7 +160,7 @@ export const GATHER_ASSET_PROFILE_PROCESS_JOB_OPTIONS: JobOptions = {
export const GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME = export const GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME =
'GATHER_HISTORICAL_MARKET_DATA'; 'GATHER_HISTORICAL_MARKET_DATA';
export const GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_OPTIONS: JobOptions = { export const GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_OPTIONS: JobsOptions = {
attempts: 12, attempts: 12,
backoff: { backoff: {
delay: ms('1 minute'), delay: ms('1 minute'),
@ -177,7 +176,7 @@ export const INVESTMENT_ACTIVITY_TYPES = [
] as Type[]; ] as Type[];
export const PORTFOLIO_SNAPSHOT_PROCESS_JOB_NAME = 'PORTFOLIO'; export const PORTFOLIO_SNAPSHOT_PROCESS_JOB_NAME = 'PORTFOLIO';
export const PORTFOLIO_SNAPSHOT_PROCESS_JOB_OPTIONS: JobOptions = { export const PORTFOLIO_SNAPSHOT_PROCESS_JOB_OPTIONS: JobsOptions = {
removeOnComplete: true removeOnComplete: true
}; };
@ -219,7 +218,9 @@ export const QUEUE_JOB_STATUS_LIST = [
'failed', 'failed',
'paused', 'paused',
'waiting' 'waiting'
] as JobStatus[]; ] as const;
export type JobStatusType = (typeof QUEUE_JOB_STATUS_LIST)[number];
export const REPLACE_NAME_PARTS = [ export const REPLACE_NAME_PARTS = [
'Amundi Index Solutions -', 'Amundi Index Solutions -',

4
libs/common/src/lib/interfaces/admin-jobs.interface.ts

@ -1,4 +1,4 @@
import { Job, JobStatus } from 'bull'; import { Job } from 'bullmq';
export interface AdminJobs { export interface AdminJobs {
jobs: (Pick< jobs: (Pick<
@ -12,6 +12,6 @@ export interface AdminJobs {
| 'stacktrace' | 'stacktrace'
| 'timestamp' | 'timestamp'
> & { > & {
state: JobStatus | 'stuck'; state: string;
})[]; })[];
} }

5
libs/ui/src/lib/services/admin.service.ts

@ -28,7 +28,6 @@ import { HttpClient, HttpHeaders, HttpParams } from '@angular/common/http';
import { Inject, Injectable } from '@angular/core'; import { Inject, Injectable } from '@angular/core';
import { SortDirection } from '@angular/material/sort'; import { SortDirection } from '@angular/material/sort';
import { DataSource, MarketData, Platform } from '@prisma/client'; import { DataSource, MarketData, Platform } from '@prisma/client';
import { JobStatus } from 'bull';
@Injectable({ @Injectable({
providedIn: 'root' providedIn: 'root'
@ -51,7 +50,7 @@ export class AdminService {
return this.http.delete<void>(`/api/v1/admin/queue/job/${aId}`); return this.http.delete<void>(`/api/v1/admin/queue/job/${aId}`);
} }
public deleteJobs({ status }: { status: JobStatus[] }) { public deleteJobs({ status }: { status: string[] }) {
let params = new HttpParams(); let params = new HttpParams();
if (status?.length > 0) { if (status?.length > 0) {
@ -129,7 +128,7 @@ export class AdminService {
); );
} }
public fetchJobs({ status }: { status?: JobStatus[] }) { public fetchJobs({ status }: { status?: string[] }) {
let params = new HttpParams(); let params = new HttpParams();
if (status?.length > 0) { if (status?.length > 0) {

Loading…
Cancel
Save