Thomas Kaul 23 hours ago
committed by GitHub
parent
commit
222ccfc205
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 6
      apps/api/src/app/admin/queue/queue.controller.ts
  2. 17
      apps/api/src/app/admin/queue/queue.service.ts
  3. 4
      apps/api/src/app/app.module.ts
  4. 4
      apps/api/src/app/portfolio/calculator/portfolio-calculator.ts
  5. 7
      apps/api/src/services/queues/data-gathering/data-gathering.module.ts
  6. 67
      apps/api/src/services/queues/data-gathering/data-gathering.processor.ts
  7. 8
      apps/api/src/services/queues/data-gathering/data-gathering.service.ts
  8. 16
      apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.module.ts
  9. 34
      apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.processor.ts
  10. 6
      apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.service.mock.ts
  11. 18
      apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.service.ts
  12. 3
      apps/client/src/app/components/admin-jobs/admin-jobs.component.ts
  13. 17
      libs/common/src/lib/config.ts
  14. 4
      libs/common/src/lib/interfaces/admin-jobs.interface.ts
  15. 5
      libs/ui/src/lib/services/admin.service.ts
  16. 88
      package-lock.json
  17. 4
      package.json

6
apps/api/src/app/admin/queue/queue.controller.ts

@ -1,5 +1,6 @@
import { HasPermission } from '@ghostfolio/api/decorators/has-permission.decorator'; import { HasPermission } from '@ghostfolio/api/decorators/has-permission.decorator';
import { HasPermissionGuard } from '@ghostfolio/api/guards/has-permission.guard'; import { HasPermissionGuard } from '@ghostfolio/api/guards/has-permission.guard';
import { JobStatusType } from '@ghostfolio/common/config';
import { AdminJobs } from '@ghostfolio/common/interfaces'; import { AdminJobs } from '@ghostfolio/common/interfaces';
import { permissions } from '@ghostfolio/common/permissions'; import { permissions } from '@ghostfolio/common/permissions';
@ -12,7 +13,6 @@ import {
UseGuards UseGuards
} from '@nestjs/common'; } from '@nestjs/common';
import { AuthGuard } from '@nestjs/passport'; import { AuthGuard } from '@nestjs/passport';
import { JobStatus } from 'bull';
import { QueueService } from './queue.service'; import { QueueService } from './queue.service';
@ -26,7 +26,7 @@ export class QueueController {
public async deleteJobs( public async deleteJobs(
@Query('status') filterByStatus?: string @Query('status') filterByStatus?: string
): Promise<void> { ): Promise<void> {
const status = (filterByStatus?.split(',') as JobStatus[]) ?? undefined; const status = (filterByStatus?.split(',') as JobStatusType[]) ?? undefined;
return this.queueService.deleteJobs({ status }); return this.queueService.deleteJobs({ status });
} }
@ -36,7 +36,7 @@ export class QueueController {
public async getJobs( public async getJobs(
@Query('status') filterByStatus?: string @Query('status') filterByStatus?: string
): Promise<AdminJobs> { ): Promise<AdminJobs> {
const status = (filterByStatus?.split(',') as JobStatus[]) ?? undefined; const status = (filterByStatus?.split(',') as JobStatusType[]) ?? undefined;
return this.queueService.getJobs({ status }); return this.queueService.getJobs({ status });
} }

17
apps/api/src/app/admin/queue/queue.service.ts

@ -1,13 +1,14 @@
import { import {
DATA_GATHERING_QUEUE, DATA_GATHERING_QUEUE,
JobStatusType,
PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE, PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE,
QUEUE_JOB_STATUS_LIST QUEUE_JOB_STATUS_LIST
} from '@ghostfolio/common/config'; } from '@ghostfolio/common/config';
import { AdminJobs } from '@ghostfolio/common/interfaces'; import { AdminJobs } from '@ghostfolio/common/interfaces';
import { InjectQueue } from '@nestjs/bull'; import { InjectQueue } from '@nestjs/bullmq';
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { JobStatus, Queue } from 'bull'; import { Queue } from 'bullmq';
@Injectable() @Injectable()
export class QueueService { export class QueueService {
@ -29,15 +30,15 @@ export class QueueService {
} }
public async deleteJobs({ public async deleteJobs({
status = QUEUE_JOB_STATUS_LIST status = [...QUEUE_JOB_STATUS_LIST]
}: { }: {
status?: JobStatus[]; status?: JobStatusType[];
}) { }) {
for (const statusItem of status) { for (const statusItem of status) {
const queueStatus = statusItem === 'waiting' ? 'wait' : statusItem; const queueStatus = statusItem === 'waiting' ? 'wait' : statusItem;
await this.dataGatheringQueue.clean(300, queueStatus); await this.dataGatheringQueue.clean(300, 0, queueStatus);
await this.portfolioSnapshotQueue.clean(300, queueStatus); await this.portfolioSnapshotQueue.clean(300, 0, queueStatus);
} }
} }
@ -53,10 +54,10 @@ export class QueueService {
public async getJobs({ public async getJobs({
limit = 1000, limit = 1000,
status = QUEUE_JOB_STATUS_LIST status = [...QUEUE_JOB_STATUS_LIST]
}: { }: {
limit?: number; limit?: number;
status?: JobStatus[]; status?: JobStatusType[];
}): Promise<AdminJobs> { }): Promise<AdminJobs> {
const [dataGatheringJobs, portfolioSnapshotJobs] = await Promise.all([ const [dataGatheringJobs, portfolioSnapshotJobs] = await Promise.all([
this.dataGatheringQueue.getJobs(status), this.dataGatheringQueue.getJobs(status),

4
apps/api/src/app/app.module.ts

@ -14,7 +14,7 @@ import {
SUPPORTED_LANGUAGE_CODES SUPPORTED_LANGUAGE_CODES
} from '@ghostfolio/common/config'; } from '@ghostfolio/common/config';
import { BullModule } from '@nestjs/bull'; import { BullModule } from '@nestjs/bullmq';
import { MiddlewareConsumer, Module, NestModule } from '@nestjs/common'; import { MiddlewareConsumer, Module, NestModule } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config'; import { ConfigModule } from '@nestjs/config';
import { EventEmitterModule } from '@nestjs/event-emitter'; import { EventEmitterModule } from '@nestjs/event-emitter';
@ -70,7 +70,7 @@ import { UserModule } from './user/user.module';
AuthModule, AuthModule,
BenchmarksModule, BenchmarksModule,
BullModule.forRoot({ BullModule.forRoot({
redis: { connection: {
db: parseInt(process.env.REDIS_DB ?? '0', 10), db: parseInt(process.env.REDIS_DB ?? '0', 10),
host: process.env.REDIS_HOST, host: process.env.REDIS_HOST,
password: process.env.REDIS_PASSWORD, password: process.env.REDIS_PASSWORD,

4
apps/api/src/app/portfolio/calculator/portfolio-calculator.ts

@ -1163,7 +1163,9 @@ export abstract class PortfolioCalculator {
const job = await this.portfolioSnapshotService.getJob(jobId); const job = await this.portfolioSnapshotService.getJob(jobId);
if (job) { if (job) {
await job.finished(); await job.waitUntilFinished(
this.portfolioSnapshotService.getQueueEvents()
);
} }
await this.initialize(); await this.initialize();

7
apps/api/src/services/queues/data-gathering/data-gathering.module.ts

@ -9,19 +9,14 @@ import { DataGatheringService } from '@ghostfolio/api/services/queues/data-gathe
import { SymbolProfileModule } from '@ghostfolio/api/services/symbol-profile/symbol-profile.module'; import { SymbolProfileModule } from '@ghostfolio/api/services/symbol-profile/symbol-profile.module';
import { DATA_GATHERING_QUEUE } from '@ghostfolio/common/config'; import { DATA_GATHERING_QUEUE } from '@ghostfolio/common/config';
import { BullModule } from '@nestjs/bull'; import { BullModule } from '@nestjs/bullmq';
import { Module } from '@nestjs/common'; import { Module } from '@nestjs/common';
import ms from 'ms';
import { DataGatheringProcessor } from './data-gathering.processor'; import { DataGatheringProcessor } from './data-gathering.processor';
@Module({ @Module({
imports: [ imports: [
BullModule.registerQueue({ BullModule.registerQueue({
limiter: {
duration: ms('4 seconds'),
max: 1
},
name: DATA_GATHERING_QUEUE name: DATA_GATHERING_QUEUE
}), }),
ConfigurationModule, ConfigurationModule,

67
apps/api/src/services/queues/data-gathering/data-gathering.processor.ts

@ -5,18 +5,16 @@ import { MarketDataService } from '@ghostfolio/api/services/market-data/market-d
import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile/symbol-profile.service'; import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile/symbol-profile.service';
import { import {
DATA_GATHERING_QUEUE, DATA_GATHERING_QUEUE,
DEFAULT_PROCESSOR_GATHER_ASSET_PROFILE_CONCURRENCY,
DEFAULT_PROCESSOR_GATHER_HISTORICAL_MARKET_DATA_CONCURRENCY,
GATHER_ASSET_PROFILE_PROCESS_JOB_NAME, GATHER_ASSET_PROFILE_PROCESS_JOB_NAME,
GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME
} from '@ghostfolio/common/config'; } from '@ghostfolio/common/config';
import { DATE_FORMAT, getStartOfUtcDate } from '@ghostfolio/common/helper'; import { DATE_FORMAT, getStartOfUtcDate } from '@ghostfolio/common/helper';
import { AssetProfileIdentifier } from '@ghostfolio/common/interfaces'; import { AssetProfileIdentifier } from '@ghostfolio/common/interfaces';
import { Process, Processor } from '@nestjs/bull'; import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Injectable, Logger } from '@nestjs/common'; import { Injectable, Logger } from '@nestjs/common';
import { Prisma } from '@prisma/client'; import { Prisma } from '@prisma/client';
import { Job } from 'bull'; import { Job, UnrecoverableError } from 'bullmq';
import { import {
addDays, addDays,
format, format,
@ -26,28 +24,45 @@ import {
isBefore, isBefore,
parseISO parseISO
} from 'date-fns'; } from 'date-fns';
import ms from 'ms';
import { DataGatheringService } from './data-gathering.service'; import { DataGatheringService } from './data-gathering.service';
@Injectable() @Injectable()
@Processor(DATA_GATHERING_QUEUE) @Processor(DATA_GATHERING_QUEUE, {
export class DataGatheringProcessor { concurrency: parseInt(
process.env.PROCESSOR_GATHER_ASSET_PROFILE_CONCURRENCY ??
process.env.PROCESSOR_GATHER_HISTORICAL_MARKET_DATA_CONCURRENCY ??
'1',
10
),
limiter: {
max: 1,
duration: ms('4 seconds')
}
})
export class DataGatheringProcessor extends WorkerHost {
public constructor( public constructor(
private readonly dataGatheringService: DataGatheringService, private readonly dataGatheringService: DataGatheringService,
private readonly dataProviderService: DataProviderService, private readonly dataProviderService: DataProviderService,
private readonly marketDataService: MarketDataService, private readonly marketDataService: MarketDataService,
private readonly symbolProfileService: SymbolProfileService private readonly symbolProfileService: SymbolProfileService
) {} ) {
super();
@Process({ }
concurrency: parseInt(
process.env.PROCESSOR_GATHER_ASSET_PROFILE_CONCURRENCY ?? public async process(job: Job): Promise<any> {
DEFAULT_PROCESSOR_GATHER_ASSET_PROFILE_CONCURRENCY.toString(), switch (job.name) {
10 case GATHER_ASSET_PROFILE_PROCESS_JOB_NAME:
), return this.gatherAssetProfile(job);
name: GATHER_ASSET_PROFILE_PROCESS_JOB_NAME case GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME:
}) return this.gatherHistoricalMarketData(job);
public async gatherAssetProfile(job: Job<AssetProfileIdentifier>) { default:
throw new Error(`Unknown job name: ${job.name}`);
}
}
private async gatherAssetProfile(job: Job<AssetProfileIdentifier>) {
const { dataSource, symbol } = job.data; const { dataSource, symbol } = job.data;
try { try {
@ -79,7 +94,9 @@ export class DataGatheringProcessor {
`DataGatheringProcessor (${GATHER_ASSET_PROFILE_PROCESS_JOB_NAME})` `DataGatheringProcessor (${GATHER_ASSET_PROFILE_PROCESS_JOB_NAME})`
); );
return job.discard(); throw new UnrecoverableError(
`Asset ${symbol} (${dataSource}) has been delisted`
);
} }
Logger.error( Logger.error(
@ -91,15 +108,7 @@ export class DataGatheringProcessor {
} }
} }
@Process({ private async gatherHistoricalMarketData(job: Job<DataGatheringItem>) {
concurrency: parseInt(
process.env.PROCESSOR_GATHER_HISTORICAL_MARKET_DATA_CONCURRENCY ??
DEFAULT_PROCESSOR_GATHER_HISTORICAL_MARKET_DATA_CONCURRENCY.toString(),
10
),
name: GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME
})
public async gatherHistoricalMarketData(job: Job<DataGatheringItem>) {
const { dataSource, date, force, symbol } = job.data; const { dataSource, date, force, symbol } = job.data;
try { try {
@ -191,7 +200,9 @@ export class DataGatheringProcessor {
`DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME})` `DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME})`
); );
return job.discard(); throw new UnrecoverableError(
`Asset ${symbol} (${dataSource}) has been delisted`
);
} }
Logger.error( Logger.error(

8
apps/api/src/services/queues/data-gathering/data-gathering.service.ts

@ -24,10 +24,10 @@ import {
BenchmarkProperty BenchmarkProperty
} from '@ghostfolio/common/interfaces'; } from '@ghostfolio/common/interfaces';
import { InjectQueue } from '@nestjs/bull'; import { InjectQueue } from '@nestjs/bullmq';
import { Inject, Injectable, Logger } from '@nestjs/common'; import { Inject, Injectable, Logger } from '@nestjs/common';
import { DataSource } from '@prisma/client'; import { DataSource } from '@prisma/client';
import { JobOptions, Queue } from 'bull'; import { JobsOptions, Queue } from 'bullmq';
import { format, min, subDays, subMilliseconds, subYears } from 'date-fns'; import { format, min, subDays, subMilliseconds, subYears } from 'date-fns';
import { isEmpty } from 'lodash'; import { isEmpty } from 'lodash';
import ms, { StringValue } from 'ms'; import ms, { StringValue } from 'ms';
@ -53,13 +53,13 @@ export class DataGatheringService {
}: { }: {
data: any; data: any;
name: string; name: string;
opts?: JobOptions; opts?: JobsOptions;
}) { }) {
return this.dataGatheringQueue.add(name, data, opts); return this.dataGatheringQueue.add(name, data, opts);
} }
public async addJobsToQueue( public async addJobsToQueue(
jobs: { data: any; name: string; opts?: JobOptions }[] jobs: { data: any; name: string; opts?: JobsOptions }[]
) { ) {
return this.dataGatheringQueue.addBulk(jobs); return this.dataGatheringQueue.addBulk(jobs);
} }

16
apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.module.ts

@ -8,12 +8,9 @@ import { DataProviderModule } from '@ghostfolio/api/services/data-provider/data-
import { ExchangeRateDataModule } from '@ghostfolio/api/services/exchange-rate-data/exchange-rate-data.module'; import { ExchangeRateDataModule } from '@ghostfolio/api/services/exchange-rate-data/exchange-rate-data.module';
import { MarketDataModule } from '@ghostfolio/api/services/market-data/market-data.module'; import { MarketDataModule } from '@ghostfolio/api/services/market-data/market-data.module';
import { PortfolioSnapshotService } from '@ghostfolio/api/services/queues/portfolio-snapshot/portfolio-snapshot.service'; import { PortfolioSnapshotService } from '@ghostfolio/api/services/queues/portfolio-snapshot/portfolio-snapshot.service';
import { import { PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE } from '@ghostfolio/common/config';
DEFAULT_PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_TIMEOUT,
PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE
} from '@ghostfolio/common/config';
import { BullModule } from '@nestjs/bull'; import { BullModule } from '@nestjs/bullmq';
import { Module } from '@nestjs/common'; import { Module } from '@nestjs/common';
import { PortfolioSnapshotProcessor } from './portfolio-snapshot.processor'; import { PortfolioSnapshotProcessor } from './portfolio-snapshot.processor';
@ -23,14 +20,7 @@ import { PortfolioSnapshotProcessor } from './portfolio-snapshot.processor';
imports: [ imports: [
AccountBalanceModule, AccountBalanceModule,
BullModule.registerQueue({ BullModule.registerQueue({
name: PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE, name: PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE
settings: {
lockDuration: parseInt(
process.env.PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_TIMEOUT ??
DEFAULT_PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_TIMEOUT.toString(),
10
)
}
}), }),
ConfigurationModule, ConfigurationModule,
DataProviderModule, DataProviderModule,

34
apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.processor.ts

@ -7,37 +7,43 @@ import { ConfigurationService } from '@ghostfolio/api/services/configuration/con
import { import {
CACHE_TTL_INFINITE, CACHE_TTL_INFINITE,
DEFAULT_PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_CONCURRENCY, DEFAULT_PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_CONCURRENCY,
DEFAULT_PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_TIMEOUT,
PORTFOLIO_SNAPSHOT_PROCESS_JOB_NAME, PORTFOLIO_SNAPSHOT_PROCESS_JOB_NAME,
PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE
} from '@ghostfolio/common/config'; } from '@ghostfolio/common/config';
import { Process, Processor } from '@nestjs/bull'; import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Injectable, Logger } from '@nestjs/common'; import { Injectable, Logger } from '@nestjs/common';
import { Job } from 'bull'; import { Job } from 'bullmq';
import { addMilliseconds } from 'date-fns'; import { addMilliseconds } from 'date-fns';
import { PortfolioSnapshotQueueJob } from './interfaces/portfolio-snapshot-queue-job.interface'; import { PortfolioSnapshotQueueJob } from './interfaces/portfolio-snapshot-queue-job.interface';
@Injectable() @Injectable()
@Processor(PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE) @Processor(PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE, {
export class PortfolioSnapshotProcessor { concurrency: parseInt(
process.env.PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_CONCURRENCY ??
DEFAULT_PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_CONCURRENCY.toString(),
10
),
lockDuration: parseInt(
process.env.PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_TIMEOUT ??
DEFAULT_PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_TIMEOUT.toString(),
10
)
})
export class PortfolioSnapshotProcessor extends WorkerHost {
public constructor( public constructor(
private readonly accountBalanceService: AccountBalanceService, private readonly accountBalanceService: AccountBalanceService,
private readonly calculatorFactory: PortfolioCalculatorFactory, private readonly calculatorFactory: PortfolioCalculatorFactory,
private readonly configurationService: ConfigurationService, private readonly configurationService: ConfigurationService,
private readonly orderService: OrderService, private readonly orderService: OrderService,
private readonly redisCacheService: RedisCacheService private readonly redisCacheService: RedisCacheService
) {} ) {
super();
}
@Process({ public async process(job: Job<PortfolioSnapshotQueueJob>): Promise<any> {
concurrency: parseInt(
process.env.PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_CONCURRENCY ??
DEFAULT_PROCESSOR_PORTFOLIO_SNAPSHOT_COMPUTATION_CONCURRENCY.toString(),
10
),
name: PORTFOLIO_SNAPSHOT_PROCESS_JOB_NAME
})
public async calculatePortfolioSnapshot(job: Job<PortfolioSnapshotQueueJob>) {
try { try {
const startTime = performance.now(); const startTime = performance.now();

6
apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.service.mock.ts

@ -1,4 +1,4 @@
import { Job, JobOptions } from 'bull'; import { Job, JobsOptions } from 'bullmq';
import { setTimeout } from 'timers/promises'; import { setTimeout } from 'timers/promises';
import { PortfolioSnapshotQueueJob } from './interfaces/portfolio-snapshot-queue-job.interface'; import { PortfolioSnapshotQueueJob } from './interfaces/portfolio-snapshot-queue-job.interface';
@ -9,10 +9,10 @@ export const PortfolioSnapshotServiceMock = {
}: { }: {
data: PortfolioSnapshotQueueJob; data: PortfolioSnapshotQueueJob;
name: string; name: string;
opts?: JobOptions; opts?: JobsOptions;
}): Promise<Job<any>> { }): Promise<Job<any>> {
const mockJob: Partial<Job<any>> = { const mockJob: Partial<Job<any>> = {
finished: async () => { waitUntilFinished: async () => {
await setTimeout(100); await setTimeout(100);
return Promise.resolve(); return Promise.resolve();

18
apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.service.ts

@ -1,17 +1,23 @@
import { PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE } from '@ghostfolio/common/config'; import { PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE } from '@ghostfolio/common/config';
import { InjectQueue } from '@nestjs/bull'; import { InjectQueue } from '@nestjs/bullmq';
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { JobOptions, Queue } from 'bull'; import { JobsOptions, Queue, QueueEvents } from 'bullmq';
import { PortfolioSnapshotQueueJob } from './interfaces/portfolio-snapshot-queue-job.interface'; import { PortfolioSnapshotQueueJob } from './interfaces/portfolio-snapshot-queue-job.interface';
@Injectable() @Injectable()
export class PortfolioSnapshotService { export class PortfolioSnapshotService {
private readonly queueEvents: QueueEvents;
public constructor( public constructor(
@InjectQueue(PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE) @InjectQueue(PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE)
private readonly portfolioSnapshotQueue: Queue private readonly portfolioSnapshotQueue: Queue
) {} ) {
this.queueEvents = new QueueEvents(PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE, {
connection: this.portfolioSnapshotQueue.opts?.connection as any
});
}
public async addJobToQueue({ public async addJobToQueue({
data, data,
@ -20,7 +26,7 @@ export class PortfolioSnapshotService {
}: { }: {
data: PortfolioSnapshotQueueJob; data: PortfolioSnapshotQueueJob;
name: string; name: string;
opts?: JobOptions; opts?: JobsOptions;
}) { }) {
return this.portfolioSnapshotQueue.add(name, data, opts); return this.portfolioSnapshotQueue.add(name, data, opts);
} }
@ -28,4 +34,8 @@ export class PortfolioSnapshotService {
public async getJob(jobId: string) { public async getJob(jobId: string) {
return this.portfolioSnapshotQueue.getJob(jobId); return this.portfolioSnapshotQueue.getJob(jobId);
} }
public getQueueEvents() {
return this.queueEvents;
}
} }

3
apps/client/src/app/components/admin-jobs/admin-jobs.component.ts

@ -31,7 +31,6 @@ import { MatSelectModule } from '@angular/material/select';
import { MatSort, MatSortModule } from '@angular/material/sort'; import { MatSort, MatSortModule } from '@angular/material/sort';
import { MatTableDataSource, MatTableModule } from '@angular/material/table'; import { MatTableDataSource, MatTableModule } from '@angular/material/table';
import { IonIcon } from '@ionic/angular/standalone'; import { IonIcon } from '@ionic/angular/standalone';
import { JobStatus } from 'bull';
import { addIcons } from 'ionicons'; import { addIcons } from 'ionicons';
import { import {
alertCircleOutline, alertCircleOutline,
@ -194,7 +193,7 @@ export class GfAdminJobsComponent implements OnDestroy, OnInit {
this.unsubscribeSubject.complete(); this.unsubscribeSubject.complete();
} }
private fetchJobs(aStatus?: JobStatus[]) { private fetchJobs(aStatus?: string[]) {
this.isLoading = true; this.isLoading = true;
this.adminService this.adminService

17
libs/common/src/lib/config.ts

@ -1,5 +1,5 @@
import { AssetClass, AssetSubClass, DataSource, Type } from '@prisma/client'; import { AssetClass, AssetSubClass, DataSource, Type } from '@prisma/client';
import { JobOptions, JobStatus } from 'bull'; import { JobsOptions } from 'bullmq';
import ms from 'ms'; import ms from 'ms';
export const ghostfolioPrefix = 'GF'; export const ghostfolioPrefix = 'GF';
@ -56,7 +56,7 @@ export const CACHE_TTL_INFINITE = 0;
export const DATA_GATHERING_QUEUE = 'DATA_GATHERING_QUEUE'; export const DATA_GATHERING_QUEUE = 'DATA_GATHERING_QUEUE';
export const DATA_GATHERING_QUEUE_PRIORITY_HIGH = 1; export const DATA_GATHERING_QUEUE_PRIORITY_HIGH = 1;
export const DATA_GATHERING_QUEUE_PRIORITY_LOW = Number.MAX_SAFE_INTEGER; export const DATA_GATHERING_QUEUE_PRIORITY_LOW = 2_097_152;
export const DATA_GATHERING_QUEUE_PRIORITY_MEDIUM = Math.round( export const DATA_GATHERING_QUEUE_PRIORITY_MEDIUM = Math.round(
DATA_GATHERING_QUEUE_PRIORITY_LOW / 2 DATA_GATHERING_QUEUE_PRIORITY_LOW / 2
); );
@ -64,8 +64,7 @@ export const DATA_GATHERING_QUEUE_PRIORITY_MEDIUM = Math.round(
export const PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE = export const PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE =
'PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE'; 'PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE';
export const PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE_PRIORITY_HIGH = 1; export const PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE_PRIORITY_HIGH = 1;
export const PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE_PRIORITY_LOW = export const PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE_PRIORITY_LOW = 2_097_152;
Number.MAX_SAFE_INTEGER;
export const DEFAULT_CURRENCY = 'USD'; export const DEFAULT_CURRENCY = 'USD';
export const DEFAULT_DATE_FORMAT_MONTH_YEAR = 'MMM yyyy'; export const DEFAULT_DATE_FORMAT_MONTH_YEAR = 'MMM yyyy';
@ -150,7 +149,7 @@ export const DERIVED_CURRENCIES = [
]; ];
export const GATHER_ASSET_PROFILE_PROCESS_JOB_NAME = 'GATHER_ASSET_PROFILE'; export const GATHER_ASSET_PROFILE_PROCESS_JOB_NAME = 'GATHER_ASSET_PROFILE';
export const GATHER_ASSET_PROFILE_PROCESS_JOB_OPTIONS: JobOptions = { export const GATHER_ASSET_PROFILE_PROCESS_JOB_OPTIONS: JobsOptions = {
attempts: 12, attempts: 12,
backoff: { backoff: {
delay: ms('1 minute'), delay: ms('1 minute'),
@ -161,7 +160,7 @@ export const GATHER_ASSET_PROFILE_PROCESS_JOB_OPTIONS: JobOptions = {
export const GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME = export const GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME =
'GATHER_HISTORICAL_MARKET_DATA'; 'GATHER_HISTORICAL_MARKET_DATA';
export const GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_OPTIONS: JobOptions = { export const GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_OPTIONS: JobsOptions = {
attempts: 12, attempts: 12,
backoff: { backoff: {
delay: ms('1 minute'), delay: ms('1 minute'),
@ -177,7 +176,7 @@ export const INVESTMENT_ACTIVITY_TYPES = [
] as Type[]; ] as Type[];
export const PORTFOLIO_SNAPSHOT_PROCESS_JOB_NAME = 'PORTFOLIO'; export const PORTFOLIO_SNAPSHOT_PROCESS_JOB_NAME = 'PORTFOLIO';
export const PORTFOLIO_SNAPSHOT_PROCESS_JOB_OPTIONS: JobOptions = { export const PORTFOLIO_SNAPSHOT_PROCESS_JOB_OPTIONS: JobsOptions = {
removeOnComplete: true removeOnComplete: true
}; };
@ -219,7 +218,9 @@ export const QUEUE_JOB_STATUS_LIST = [
'failed', 'failed',
'paused', 'paused',
'waiting' 'waiting'
] as JobStatus[]; ] as const;
export type JobStatusType = (typeof QUEUE_JOB_STATUS_LIST)[number];
export const REPLACE_NAME_PARTS = [ export const REPLACE_NAME_PARTS = [
'Amundi Index Solutions -', 'Amundi Index Solutions -',

4
libs/common/src/lib/interfaces/admin-jobs.interface.ts

@ -1,4 +1,4 @@
import { Job, JobStatus } from 'bull'; import { Job } from 'bullmq';
export interface AdminJobs { export interface AdminJobs {
jobs: (Pick< jobs: (Pick<
@ -12,6 +12,6 @@ export interface AdminJobs {
| 'stacktrace' | 'stacktrace'
| 'timestamp' | 'timestamp'
> & { > & {
state: JobStatus | 'stuck'; state: string;
})[]; })[];
} }

5
libs/ui/src/lib/services/admin.service.ts

@ -28,7 +28,6 @@ import { HttpClient, HttpHeaders, HttpParams } from '@angular/common/http';
import { Inject, Injectable } from '@angular/core'; import { Inject, Injectable } from '@angular/core';
import { SortDirection } from '@angular/material/sort'; import { SortDirection } from '@angular/material/sort';
import { DataSource, MarketData, Platform } from '@prisma/client'; import { DataSource, MarketData, Platform } from '@prisma/client';
import { JobStatus } from 'bull';
@Injectable({ @Injectable({
providedIn: 'root' providedIn: 'root'
@ -51,7 +50,7 @@ export class AdminService {
return this.http.delete<void>(`/api/v1/admin/queue/job/${aId}`); return this.http.delete<void>(`/api/v1/admin/queue/job/${aId}`);
} }
public deleteJobs({ status }: { status: JobStatus[] }) { public deleteJobs({ status }: { status: string[] }) {
let params = new HttpParams(); let params = new HttpParams();
if (status?.length > 0) { if (status?.length > 0) {
@ -129,7 +128,7 @@ export class AdminService {
); );
} }
public fetchJobs({ status }: { status?: JobStatus[] }) { public fetchJobs({ status }: { status?: string[] }) {
let params = new HttpParams(); let params = new HttpParams();
if (status?.length > 0) { if (status?.length > 0) {

88
package-lock.json

@ -26,7 +26,7 @@
"@internationalized/number": "3.6.5", "@internationalized/number": "3.6.5",
"@ionic/angular": "8.7.8", "@ionic/angular": "8.7.8",
"@keyv/redis": "4.4.0", "@keyv/redis": "4.4.0",
"@nestjs/bull": "11.0.4", "@nestjs/bullmq": "11.0.4",
"@nestjs/cache-manager": "3.0.1", "@nestjs/cache-manager": "3.0.1",
"@nestjs/common": "11.1.8", "@nestjs/common": "11.1.8",
"@nestjs/config": "4.0.2", "@nestjs/config": "4.0.2",
@ -45,7 +45,7 @@
"alphavantage": "2.2.0", "alphavantage": "2.2.0",
"big.js": "7.0.1", "big.js": "7.0.1",
"bootstrap": "4.6.2", "bootstrap": "4.6.2",
"bull": "4.16.5", "bullmq": "5.67.3",
"chart.js": "4.5.1", "chart.js": "4.5.1",
"chartjs-adapter-date-fns": "3.0.0", "chartjs-adapter-date-fns": "3.0.0",
"chartjs-chart-treemap": "3.1.0", "chartjs-chart-treemap": "3.1.0",
@ -5038,9 +5038,9 @@
} }
}, },
"node_modules/@ioredis/commands": { "node_modules/@ioredis/commands": {
"version": "1.4.0", "version": "1.5.0",
"resolved": "https://registry.npmjs.org/@ioredis/commands/-/commands-1.4.0.tgz", "resolved": "https://registry.npmjs.org/@ioredis/commands/-/commands-1.5.0.tgz",
"integrity": "sha512-aFT2yemJJo+TZCmieA7qnYGQooOS7QfNmYrzGtsYd3g9j5iDP8AimYYAesf79ohjbLG12XxC4nG5DyEnC88AsQ==", "integrity": "sha512-eUgLqrMf8nJkZxT24JvVRrQya1vZkQh8BBeYNwGDqa5I0VUi8ACx7uFvAaLxintokpTenkK6DASvo/bvNbBGow==",
"license": "MIT" "license": "MIT"
}, },
"node_modules/@isaacs/balanced-match": { "node_modules/@isaacs/balanced-match": {
@ -7502,32 +7502,32 @@
"@tybys/wasm-util": "^0.10.1" "@tybys/wasm-util": "^0.10.1"
} }
}, },
"node_modules/@nestjs/bull": { "node_modules/@nestjs/bull-shared": {
"version": "11.0.4", "version": "11.0.4",
"resolved": "https://registry.npmjs.org/@nestjs/bull/-/bull-11.0.4.tgz", "resolved": "https://registry.npmjs.org/@nestjs/bull-shared/-/bull-shared-11.0.4.tgz",
"integrity": "sha512-QVz2PR/rJF/isy7otVnMTSqLf/O71p9Ka7lBZt9Gm+NQFv8fcH2L11GL7TA0whyCcw/kAX5iRepUXz/wed4JoA==", "integrity": "sha512-VBJcDHSAzxQnpcDfA0kt9MTGUD1XZzfByV70su0W0eDCQ9aqIEBlzWRW21tv9FG9dIut22ysgDidshdjlnczLw==",
"license": "MIT", "license": "MIT",
"dependencies": { "dependencies": {
"@nestjs/bull-shared": "^11.0.4",
"tslib": "2.8.1" "tslib": "2.8.1"
}, },
"peerDependencies": { "peerDependencies": {
"@nestjs/common": "^8.0.0 || ^9.0.0 || ^10.0.0 || ^11.0.0", "@nestjs/common": "^10.0.0 || ^11.0.0",
"@nestjs/core": "^8.0.0 || ^9.0.0 || ^10.0.0 || ^11.0.0", "@nestjs/core": "^10.0.0 || ^11.0.0"
"bull": "^3.3 || ^4.0.0"
} }
}, },
"node_modules/@nestjs/bull-shared": { "node_modules/@nestjs/bullmq": {
"version": "11.0.4", "version": "11.0.4",
"resolved": "https://registry.npmjs.org/@nestjs/bull-shared/-/bull-shared-11.0.4.tgz", "resolved": "https://registry.npmjs.org/@nestjs/bullmq/-/bullmq-11.0.4.tgz",
"integrity": "sha512-VBJcDHSAzxQnpcDfA0kt9MTGUD1XZzfByV70su0W0eDCQ9aqIEBlzWRW21tv9FG9dIut22ysgDidshdjlnczLw==", "integrity": "sha512-wBzK9raAVG0/6NTMdvLGM4/FQ1lsB35/pYS8L6a0SDgkTiLpd7mAjQ8R692oMx5s7IjvgntaZOuTUrKYLNfIkA==",
"license": "MIT", "license": "MIT",
"dependencies": { "dependencies": {
"@nestjs/bull-shared": "^11.0.4",
"tslib": "2.8.1" "tslib": "2.8.1"
}, },
"peerDependencies": { "peerDependencies": {
"@nestjs/common": "^10.0.0 || ^11.0.0", "@nestjs/common": "^10.0.0 || ^11.0.0",
"@nestjs/core": "^10.0.0 || ^11.0.0" "@nestjs/core": "^10.0.0 || ^11.0.0",
"bullmq": "^3.0.0 || ^4.0.0 || ^5.0.0"
} }
}, },
"node_modules/@nestjs/cache-manager": { "node_modules/@nestjs/cache-manager": {
@ -15636,31 +15636,19 @@
"integrity": "sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ==", "integrity": "sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ==",
"license": "MIT" "license": "MIT"
}, },
"node_modules/bull": { "node_modules/bullmq": {
"version": "4.16.5", "version": "5.67.3",
"resolved": "https://registry.npmjs.org/bull/-/bull-4.16.5.tgz", "resolved": "https://registry.npmjs.org/bullmq/-/bullmq-5.67.3.tgz",
"integrity": "sha512-lDsx2BzkKe7gkCYiT5Acj02DpTwDznl/VNN7Psn7M3USPG7Vs/BaClZJJTAG+ufAR9++N1/NiUTdaFBWDIl5TQ==", "integrity": "sha512-eeQobOJn8M0Rj8tcZCVFLrimZgJQallJH1JpclOoyut2nDNkDwTEPMVcZzLeSR2fGeIVbfJTjU96F563Qkge5A==",
"license": "MIT", "license": "MIT",
"dependencies": { "dependencies": {
"cron-parser": "^4.9.0", "cron-parser": "4.9.0",
"get-port": "^5.1.1", "ioredis": "5.9.2",
"ioredis": "^5.3.2", "msgpackr": "1.11.5",
"lodash": "^4.17.21", "node-abort-controller": "3.1.1",
"msgpackr": "^1.11.2", "semver": "7.7.3",
"semver": "^7.5.2", "tslib": "2.8.1",
"uuid": "^8.3.0" "uuid": "11.1.0"
},
"engines": {
"node": ">=12"
}
},
"node_modules/bull/node_modules/uuid": {
"version": "8.3.2",
"resolved": "https://registry.npmjs.org/uuid/-/uuid-8.3.2.tgz",
"integrity": "sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==",
"license": "MIT",
"bin": {
"uuid": "dist/bin/uuid"
} }
}, },
"node_modules/bundle-name": { "node_modules/bundle-name": {
@ -20891,18 +20879,6 @@
"node": ">=8.0.0" "node": ">=8.0.0"
} }
}, },
"node_modules/get-port": {
"version": "5.1.1",
"resolved": "https://registry.npmjs.org/get-port/-/get-port-5.1.1.tgz",
"integrity": "sha512-g/Q1aTSDOxFpchXC4i8ZWvxA1lnPqx/JHqcpIw0/LX9T8x/GBbi6YnlN5nhaKIFkT8oFsscUKgDJYxfwfS6QsQ==",
"license": "MIT",
"engines": {
"node": ">=8"
},
"funding": {
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/get-proto": { "node_modules/get-proto": {
"version": "1.0.1", "version": "1.0.1",
"resolved": "https://registry.npmjs.org/get-proto/-/get-proto-1.0.1.tgz", "resolved": "https://registry.npmjs.org/get-proto/-/get-proto-1.0.1.tgz",
@ -22188,12 +22164,12 @@
} }
}, },
"node_modules/ioredis": { "node_modules/ioredis": {
"version": "5.8.2", "version": "5.9.2",
"resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.8.2.tgz", "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.9.2.tgz",
"integrity": "sha512-C6uC+kleiIMmjViJINWk80sOQw5lEzse1ZmvD+S/s8p8CWapftSaC+kocGTx6xrbrJ4WmYQGC08ffHLr6ToR6Q==", "integrity": "sha512-tAAg/72/VxOUW7RQSX1pIxJVucYKcjFjfvj60L57jrZpYCHC3XN0WCQ3sNYL4Gmvv+7GPvTAjc+KSdeNuE8oWQ==",
"license": "MIT", "license": "MIT",
"dependencies": { "dependencies": {
"@ioredis/commands": "1.4.0", "@ioredis/commands": "1.5.0",
"cluster-key-slot": "^1.1.0", "cluster-key-slot": "^1.1.0",
"debug": "^4.3.4", "debug": "^4.3.4",
"denque": "^2.1.0", "denque": "^2.1.0",
@ -26283,7 +26259,6 @@
"version": "3.1.1", "version": "3.1.1",
"resolved": "https://registry.npmjs.org/node-abort-controller/-/node-abort-controller-3.1.1.tgz", "resolved": "https://registry.npmjs.org/node-abort-controller/-/node-abort-controller-3.1.1.tgz",
"integrity": "sha512-AGK2yQKIjRuqnc6VkX2Xj5d+QW8xZ87pa1UK6yA6ouUyuxfHuMP6umE5QK7UmTeOAymo+Zx1Fxiuw9rVx8taHQ==", "integrity": "sha512-AGK2yQKIjRuqnc6VkX2Xj5d+QW8xZ87pa1UK6yA6ouUyuxfHuMP6umE5QK7UmTeOAymo+Zx1Fxiuw9rVx8taHQ==",
"dev": true,
"license": "MIT" "license": "MIT"
}, },
"node_modules/node-addon-api": { "node_modules/node-addon-api": {
@ -33913,7 +33888,6 @@
"https://github.com/sponsors/ctavan" "https://github.com/sponsors/ctavan"
], ],
"license": "MIT", "license": "MIT",
"optional": true,
"bin": { "bin": {
"uuid": "dist/esm/bin/uuid" "uuid": "dist/esm/bin/uuid"
} }

4
package.json

@ -70,7 +70,7 @@
"@internationalized/number": "3.6.5", "@internationalized/number": "3.6.5",
"@ionic/angular": "8.7.8", "@ionic/angular": "8.7.8",
"@keyv/redis": "4.4.0", "@keyv/redis": "4.4.0",
"@nestjs/bull": "11.0.4", "@nestjs/bullmq": "11.0.4",
"@nestjs/cache-manager": "3.0.1", "@nestjs/cache-manager": "3.0.1",
"@nestjs/common": "11.1.8", "@nestjs/common": "11.1.8",
"@nestjs/config": "4.0.2", "@nestjs/config": "4.0.2",
@ -89,7 +89,7 @@
"alphavantage": "2.2.0", "alphavantage": "2.2.0",
"big.js": "7.0.1", "big.js": "7.0.1",
"bootstrap": "4.6.2", "bootstrap": "4.6.2",
"bull": "4.16.5", "bullmq": "5.67.3",
"chart.js": "4.5.1", "chart.js": "4.5.1",
"chartjs-adapter-date-fns": "3.0.0", "chartjs-adapter-date-fns": "3.0.0",
"chartjs-chart-treemap": "3.1.0", "chartjs-chart-treemap": "3.1.0",

Loading…
Cancel
Save