diff --git a/apps/api/src/app/admin/admin.controller.ts b/apps/api/src/app/admin/admin.controller.ts index c67b443c3..384da8a47 100644 --- a/apps/api/src/app/admin/admin.controller.ts +++ b/apps/api/src/app/admin/admin.controller.ts @@ -3,6 +3,7 @@ import { MarketDataService } from '@ghostfolio/api/services/market-data.service' import { PropertyDto } from '@ghostfolio/api/services/property/property.dto'; import { DATA_GATHERING_QUEUE, + DATA_GATHERING_QUEUE_PRIORITY_HIGH, GATHER_ASSET_PROFILE_PROCESS } from '@ghostfolio/common/config'; import { @@ -31,6 +32,7 @@ import { DataSource, MarketData } from '@prisma/client'; import { Queue } from 'bull'; import { isDate } from 'date-fns'; import { StatusCodes, getReasonPhrase } from 'http-status-codes'; +import ms from 'ms'; import { AdminService } from './admin.service'; import { UpdateMarketDataDto } from './update-market-data.dto'; @@ -82,10 +84,21 @@ export class AdminController { const uniqueAssets = await this.dataGatheringService.getUniqueAssets(); for (const { dataSource, symbol } of uniqueAssets) { - await this.dataGatheringQueue.add(GATHER_ASSET_PROFILE_PROCESS, { - dataSource, - symbol - }); + await this.dataGatheringQueue.add( + GATHER_ASSET_PROFILE_PROCESS, + { + dataSource, + symbol + }, + { + attempts: 20, + backoff: { + delay: ms('1 minute'), + type: 'exponential' + }, + priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH + } + ); } this.dataGatheringService.gatherMax(); @@ -109,10 +122,21 @@ export class AdminController { const uniqueAssets = await this.dataGatheringService.getUniqueAssets(); for (const { dataSource, symbol } of uniqueAssets) { - await this.dataGatheringQueue.add(GATHER_ASSET_PROFILE_PROCESS, { - dataSource, - symbol - }); + await this.dataGatheringQueue.add( + GATHER_ASSET_PROFILE_PROCESS, + { + dataSource, + symbol + }, + { + attempts: 20, + backoff: { + delay: ms('1 minute'), + type: 'exponential' + }, + priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH + } + ); } } @@ -134,10 +158,21 @@ export class AdminController { ); } - await this.dataGatheringQueue.add(GATHER_ASSET_PROFILE_PROCESS, { - dataSource, - symbol - }); + await this.dataGatheringQueue.add( + GATHER_ASSET_PROFILE_PROCESS, + { + dataSource, + symbol + }, + { + attempts: 20, + backoff: { + delay: ms('1 minute'), + type: 'exponential' + }, + priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH + } + ); } @Post('gather/:dataSource/:symbol') diff --git a/apps/api/src/app/admin/queue/queue.controller.ts b/apps/api/src/app/admin/queue/queue.controller.ts index 16ec2efe4..e8c785785 100644 --- a/apps/api/src/app/admin/queue/queue.controller.ts +++ b/apps/api/src/app/admin/queue/queue.controller.ts @@ -3,9 +3,11 @@ import { hasPermission, permissions } from '@ghostfolio/common/permissions'; import type { RequestWithUser } from '@ghostfolio/common/types'; import { Controller, + Delete, Get, HttpException, Inject, + Param, UseGuards } from '@nestjs/common'; import { REQUEST } from '@nestjs/core'; @@ -21,7 +23,7 @@ export class QueueController { @Inject(REQUEST) private readonly request: RequestWithUser ) {} - @Get('jobs') + @Get('job') @UseGuards(AuthGuard('jwt')) public async getJobs(): Promise { if ( @@ -38,4 +40,22 @@ export class QueueController { return this.queueService.getJobs({}); } + + @Delete('job/:id') + @UseGuards(AuthGuard('jwt')) + public async deleteJob(@Param('id') id: string): Promise { + if ( + !hasPermission( + this.request.user.permissions, + permissions.accessAdminControl + ) + ) { + throw new HttpException( + getReasonPhrase(StatusCodes.FORBIDDEN), + StatusCodes.FORBIDDEN + ); + } + + return this.queueService.deleteJob(id); + } } diff --git a/apps/api/src/app/admin/queue/queue.service.ts b/apps/api/src/app/admin/queue/queue.service.ts index f8c5560ec..4ba09687a 100644 --- a/apps/api/src/app/admin/queue/queue.service.ts +++ b/apps/api/src/app/admin/queue/queue.service.ts @@ -11,6 +11,10 @@ export class QueueService { private readonly dataGatheringQueue: Queue ) {} + public async deleteJob(aId: string) { + return (await this.dataGatheringQueue.getJob(aId))?.remove(); + } + public async getJobs({ limit = 1000 }: { @@ -25,8 +29,23 @@ export class QueueService { 'waiting' ]); + const jobsWithState = await Promise.all( + jobs.slice(0, limit).map(async (job) => { + return { + attemptsMade: job.attemptsMade + 1, + data: job.data, + finishedOn: job.finishedOn, + id: job.id, + name: job.name, + stacktrace: job.stacktrace, + state: await job.getState(), + timestamp: job.timestamp + }; + }) + ); + return { - jobs: jobs.slice(0, limit) + jobs: jobsWithState }; } } diff --git a/apps/api/src/app/order/order.service.ts b/apps/api/src/app/order/order.service.ts index 7b09ec559..26a067a15 100644 --- a/apps/api/src/app/order/order.service.ts +++ b/apps/api/src/app/order/order.service.ts @@ -6,6 +6,7 @@ import { PrismaService } from '@ghostfolio/api/services/prisma.service'; import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile.service'; import { DATA_GATHERING_QUEUE, + DATA_GATHERING_QUEUE_PRIORITY_HIGH, GATHER_ASSET_PROFILE_PROCESS } from '@ghostfolio/common/config'; import { Filter } from '@ghostfolio/common/interfaces'; @@ -24,6 +25,7 @@ import Big from 'big.js'; import { Queue } from 'bull'; import { endOfToday, isAfter } from 'date-fns'; import { groupBy } from 'lodash'; +import ms from 'ms'; import { v4 as uuidv4 } from 'uuid'; import { Activity } from './interfaces/activities.interface'; @@ -120,10 +122,21 @@ export class OrderService { data.SymbolProfile.connectOrCreate.create.symbol.toUpperCase(); } - await this.dataGatheringQueue.add(GATHER_ASSET_PROFILE_PROCESS, { - dataSource: data.SymbolProfile.connectOrCreate.create.dataSource, - symbol: data.SymbolProfile.connectOrCreate.create.symbol - }); + await this.dataGatheringQueue.add( + GATHER_ASSET_PROFILE_PROCESS, + { + dataSource: data.SymbolProfile.connectOrCreate.create.dataSource, + symbol: data.SymbolProfile.connectOrCreate.create.symbol + }, + { + attempts: 20, + backoff: { + delay: ms('1 minute'), + type: 'exponential' + }, + priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH + } + ); const isDraft = isAfter(data.date as Date, endOfToday()); @@ -138,8 +151,6 @@ export class OrderService { ]); } - await this.cacheService.flush(); - delete data.accountId; delete data.assetClass; delete data.assetSubClass; @@ -330,8 +341,6 @@ export class OrderService { } } - await this.cacheService.flush(); - delete data.assetClass; delete data.assetSubClass; delete data.currency; diff --git a/apps/api/src/services/cron.service.ts b/apps/api/src/services/cron.service.ts index 40051b9ce..b19d676dc 100644 --- a/apps/api/src/services/cron.service.ts +++ b/apps/api/src/services/cron.service.ts @@ -1,11 +1,13 @@ import { DATA_GATHERING_QUEUE, + DATA_GATHERING_QUEUE_PRIORITY_HIGH, GATHER_ASSET_PROFILE_PROCESS } from '@ghostfolio/common/config'; import { InjectQueue } from '@nestjs/bull'; import { Injectable } from '@nestjs/common'; import { Cron, CronExpression } from '@nestjs/schedule'; import { Queue } from 'bull'; +import ms from 'ms'; import { DataGatheringService } from './data-gathering.service'; import { ExchangeRateDataService } from './exchange-rate-data.service'; @@ -41,10 +43,21 @@ export class CronService { const uniqueAssets = await this.dataGatheringService.getUniqueAssets(); for (const { dataSource, symbol } of uniqueAssets) { - await this.dataGatheringQueue.add(GATHER_ASSET_PROFILE_PROCESS, { - dataSource, - symbol - }); + await this.dataGatheringQueue.add( + GATHER_ASSET_PROFILE_PROCESS, + { + dataSource, + symbol + }, + { + attempts: 20, + backoff: { + delay: ms('1 minute'), + type: 'exponential' + }, + priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH + } + ); } } } diff --git a/apps/api/src/services/data-gathering.module.ts b/apps/api/src/services/data-gathering.module.ts index e8e98058c..2a179878b 100644 --- a/apps/api/src/services/data-gathering.module.ts +++ b/apps/api/src/services/data-gathering.module.ts @@ -6,6 +6,7 @@ import { PrismaModule } from '@ghostfolio/api/services/prisma.module'; import { DATA_GATHERING_QUEUE } from '@ghostfolio/common/config'; import { BullModule } from '@nestjs/bull'; import { Module } from '@nestjs/common'; +import ms from 'ms'; import { DataGatheringProcessor } from './data-gathering.processor'; import { ExchangeRateDataModule } from './exchange-rate-data.module'; @@ -14,6 +15,10 @@ import { SymbolProfileModule } from './symbol-profile.module'; @Module({ imports: [ BullModule.registerQueue({ + limiter: { + duration: ms('1 second'), + max: 1 + }, name: DATA_GATHERING_QUEUE }), ConfigurationModule, diff --git a/apps/api/src/services/data-gathering.processor.ts b/apps/api/src/services/data-gathering.processor.ts index de8d8eb4e..37b3ec325 100644 --- a/apps/api/src/services/data-gathering.processor.ts +++ b/apps/api/src/services/data-gathering.processor.ts @@ -1,19 +1,34 @@ import { DATA_GATHERING_QUEUE, - GATHER_ASSET_PROFILE_PROCESS + GATHER_ASSET_PROFILE_PROCESS, + GATHER_HISTORICAL_MARKET_DATA_PROCESS } from '@ghostfolio/common/config'; +import { DATE_FORMAT } from '@ghostfolio/common/helper'; import { UniqueAsset } from '@ghostfolio/common/interfaces'; import { Process, Processor } from '@nestjs/bull'; import { Injectable, Logger } from '@nestjs/common'; import { Job } from 'bull'; +import { + isBefore, + getYear, + getMonth, + getDate, + format, + parseISO +} from 'date-fns'; import { DataGatheringService } from './data-gathering.service'; +import { DataProviderService } from './data-provider/data-provider.service'; +import { IDataGatheringItem } from './interfaces/interfaces'; +import { PrismaService } from './prisma.service'; @Injectable() @Processor(DATA_GATHERING_QUEUE) export class DataGatheringProcessor { public constructor( - private readonly dataGatheringService: DataGatheringService + private readonly dataGatheringService: DataGatheringService, + private readonly dataProviderService: DataProviderService, + private readonly prismaService: PrismaService ) {} @Process(GATHER_ASSET_PROFILE_PROCESS) @@ -21,7 +36,96 @@ export class DataGatheringProcessor { try { await this.dataGatheringService.gatherAssetProfiles([job.data]); } catch (error) { - Logger.error(error, 'DataGatheringProcessor'); + Logger.error( + error, + `DataGatheringProcessor (${GATHER_ASSET_PROFILE_PROCESS})` + ); + + throw new Error(error); + } + } + + @Process(GATHER_HISTORICAL_MARKET_DATA_PROCESS) + public async gatherHistoricalMarketData(job: Job) { + try { + const { dataSource, date, symbol } = job.data; + + const historicalData = await this.dataProviderService.getHistoricalRaw( + [{ dataSource, symbol }], + parseISO((date)), + new Date() + ); + + let currentDate = parseISO((date)); + let lastMarketPrice: number; + + while ( + isBefore( + currentDate, + new Date( + Date.UTC( + getYear(new Date()), + getMonth(new Date()), + getDate(new Date()), + 0 + ) + ) + ) + ) { + if ( + historicalData[symbol]?.[format(currentDate, DATE_FORMAT)] + ?.marketPrice + ) { + lastMarketPrice = + historicalData[symbol]?.[format(currentDate, DATE_FORMAT)] + ?.marketPrice; + } + + if (lastMarketPrice) { + try { + await this.prismaService.marketData.create({ + data: { + dataSource, + symbol, + date: new Date( + Date.UTC( + getYear(currentDate), + getMonth(currentDate), + getDate(currentDate), + 0 + ) + ), + marketPrice: lastMarketPrice + } + }); + } catch {} + } else { + Logger.warn( + `Failed to gather data for symbol ${symbol} from ${dataSource} at ${format( + currentDate, + DATE_FORMAT + )}.`, + `DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS})` + ); + } + + // Count month one up for iteration + currentDate = new Date( + Date.UTC( + getYear(currentDate), + getMonth(currentDate), + getDate(currentDate) + 1, + 0 + ) + ); + } + } catch (error) { + Logger.error( + error, + `DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS})` + ); + + throw new Error(error); } } } diff --git a/apps/api/src/services/data-gathering.service.ts b/apps/api/src/services/data-gathering.service.ts index 507e1e146..020013c85 100644 --- a/apps/api/src/services/data-gathering.service.ts +++ b/apps/api/src/services/data-gathering.service.ts @@ -1,21 +1,19 @@ import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile.service'; import { + DATA_GATHERING_QUEUE, + DATA_GATHERING_QUEUE_PRIORITY_LOW, + GATHER_HISTORICAL_MARKET_DATA_PROCESS, PROPERTY_LAST_DATA_GATHERING, PROPERTY_LOCKED_DATA_GATHERING } from '@ghostfolio/common/config'; import { DATE_FORMAT, resetHours } from '@ghostfolio/common/helper'; import { UniqueAsset } from '@ghostfolio/common/interfaces'; +import { InjectQueue } from '@nestjs/bull'; import { Inject, Injectable, Logger } from '@nestjs/common'; import { DataSource } from '@prisma/client'; -import { - differenceInHours, - format, - getDate, - getMonth, - getYear, - isBefore, - subDays -} from 'date-fns'; +import { Queue } from 'bull'; +import { differenceInHours, format, subDays } from 'date-fns'; +import ms from 'ms'; import { DataProviderService } from './data-provider/data-provider.service'; import { DataEnhancerInterface } from './data-provider/interfaces/data-enhancer.interface'; @@ -30,6 +28,8 @@ export class DataGatheringService { public constructor( @Inject('DataEnhancers') private readonly dataEnhancers: DataEnhancerInterface[], + @InjectQueue(DATA_GATHERING_QUEUE) + private readonly dataGatheringQueue: Queue, private readonly dataProviderService: DataProviderService, private readonly exchangeRateDataService: ExchangeRateDataService, private readonly prismaService: PrismaService, @@ -50,10 +50,10 @@ export class DataGatheringService { } }); - const symbols = await this.getSymbols7D(); + const dataGatheringItems = await this.getSymbols7D(); try { - await this.gatherSymbols(symbols); + await this.gatherSymbols(dataGatheringItems); await this.prismaService.property.upsert({ create: { @@ -334,107 +334,27 @@ export class DataGatheringService { } public async gatherSymbols(aSymbolsWithStartDate: IDataGatheringItem[]) { - let hasError = false; - let symbolCounter = 0; - for (const { dataSource, date, symbol } of aSymbolsWithStartDate) { if (dataSource === 'MANUAL') { continue; } - this.dataGatheringProgress = symbolCounter / aSymbolsWithStartDate.length; - - try { - const historicalData = await this.dataProviderService.getHistoricalRaw( - [{ dataSource, symbol }], + await this.dataGatheringQueue.add( + GATHER_HISTORICAL_MARKET_DATA_PROCESS, + { + dataSource, date, - new Date() - ); - - let currentDate = date; - let lastMarketPrice: number; - - while ( - isBefore( - currentDate, - new Date( - Date.UTC( - getYear(new Date()), - getMonth(new Date()), - getDate(new Date()), - 0 - ) - ) - ) - ) { - if ( - historicalData[symbol]?.[format(currentDate, DATE_FORMAT)] - ?.marketPrice - ) { - lastMarketPrice = - historicalData[symbol]?.[format(currentDate, DATE_FORMAT)] - ?.marketPrice; - } - - if (lastMarketPrice) { - try { - await this.prismaService.marketData.create({ - data: { - dataSource, - symbol, - date: new Date( - Date.UTC( - getYear(currentDate), - getMonth(currentDate), - getDate(currentDate), - 0 - ) - ), - marketPrice: lastMarketPrice - } - }); - } catch {} - } else { - Logger.warn( - `Failed to gather data for symbol ${symbol} from ${dataSource} at ${format( - currentDate, - DATE_FORMAT - )}.`, - 'DataGatheringService' - ); - } - - // Count month one up for iteration - currentDate = new Date( - Date.UTC( - getYear(currentDate), - getMonth(currentDate), - getDate(currentDate) + 1, - 0 - ) - ); + symbol + }, + { + attempts: 20, + backoff: { + delay: ms('1 minute'), + type: 'exponential' + }, + priority: DATA_GATHERING_QUEUE_PRIORITY_LOW } - } catch (error) { - hasError = true; - Logger.error(error, 'DataGatheringService'); - } - - if (symbolCounter > 0 && symbolCounter % 100 === 0) { - Logger.log( - `Data gathering progress: ${( - this.dataGatheringProgress * 100 - ).toFixed(2)}%`, - 'DataGatheringService' - ); - } - - symbolCounter += 1; - } - - await this.exchangeRateDataService.initialize(); - - if (hasError) { - throw ''; + ); } } diff --git a/apps/api/src/services/interfaces/interfaces.ts b/apps/api/src/services/interfaces/interfaces.ts index dbe3dfa4f..1148dd6af 100644 --- a/apps/api/src/services/interfaces/interfaces.ts +++ b/apps/api/src/services/interfaces/interfaces.ts @@ -1,3 +1,4 @@ +import { UniqueAsset } from '@ghostfolio/common/interfaces'; import { MarketState } from '@ghostfolio/common/types'; import { Account, @@ -32,8 +33,6 @@ export interface IDataProviderResponse { marketState: MarketState; } -export interface IDataGatheringItem { - dataSource: DataSource; +export interface IDataGatheringItem extends UniqueAsset { date?: Date; - symbol: string; } diff --git a/apps/client/src/app/components/admin-jobs/admin-jobs.component.ts b/apps/client/src/app/components/admin-jobs/admin-jobs.component.ts index ad94cb93e..e9620b5e3 100644 --- a/apps/client/src/app/components/admin-jobs/admin-jobs.component.ts +++ b/apps/client/src/app/components/admin-jobs/admin-jobs.component.ts @@ -53,6 +53,15 @@ export class AdminJobsComponent implements OnDestroy, OnInit { this.fetchJobs(); } + public onDeleteJob(aId: string) { + this.adminService + .deleteJob(aId) + .pipe(takeUntil(this.unsubscribeSubject)) + .subscribe(() => { + this.fetchJobs(); + }); + } + public onViewStacktrace(aStacktrace: AdminJobs['jobs'][0]['stacktrace']) { alert(JSON.stringify(aStacktrace, null, ' ')); } diff --git a/apps/client/src/app/components/admin-jobs/admin-jobs.html b/apps/client/src/app/components/admin-jobs/admin-jobs.html index c17bdcd2b..31fe1ae56 100644 --- a/apps/client/src/app/components/admin-jobs/admin-jobs.html +++ b/apps/client/src/app/components/admin-jobs/admin-jobs.html @@ -4,10 +4,11 @@ - + + @@ -17,10 +18,28 @@ - - + + + @@ -29,21 +48,19 @@ diff --git a/apps/client/src/app/services/admin.service.ts b/apps/client/src/app/services/admin.service.ts index 208f931aa..7b99acc82 100644 --- a/apps/client/src/app/services/admin.service.ts +++ b/apps/client/src/app/services/admin.service.ts @@ -18,6 +18,10 @@ import { Observable, map } from 'rxjs'; export class AdminService { public constructor(private http: HttpClient) {} + public deleteJob(aId: string) { + return this.http.delete(`/api/v1/admin/queue/job/${aId}`); + } + public deleteProfileData({ dataSource, symbol }: UniqueAsset) { return this.http.delete( `/api/v1/admin/profile-data/${dataSource}/${symbol}` @@ -44,7 +48,7 @@ export class AdminService { } public fetchJobs() { - return this.http.get(`/api/v1/admin/queue/jobs`); + return this.http.get(`/api/v1/admin/queue/job`); } public gatherMax() { diff --git a/libs/common/src/lib/config.ts b/libs/common/src/lib/config.ts index 410d0498f..b0207f309 100644 --- a/libs/common/src/lib/config.ts +++ b/libs/common/src/lib/config.ts @@ -43,10 +43,14 @@ export const warnColorRgb = { export const ASSET_SUB_CLASS_EMERGENCY_FUND = 'EMERGENCY_FUND'; export const DATA_GATHERING_QUEUE = 'DATA_GATHERING_QUEUE'; +export const DATA_GATHERING_QUEUE_PRIORITY_LOW = Number.MAX_SAFE_INTEGER; +export const DATA_GATHERING_QUEUE_PRIORITY_HIGH = 1; export const DEFAULT_DATE_FORMAT_MONTH_YEAR = 'MMM yyyy'; export const GATHER_ASSET_PROFILE_PROCESS = 'GATHER_ASSET_PROFILE'; +export const GATHER_HISTORICAL_MARKET_DATA_PROCESS = + 'GATHER_HISTORICAL_MARKET_DATA'; export const PROPERTY_BENCHMARKS = 'BENCHMARKS'; export const PROPERTY_COUPONS = 'COUPONS'; diff --git a/libs/common/src/lib/interfaces/admin-jobs.interface.ts b/libs/common/src/lib/interfaces/admin-jobs.interface.ts index 5379eec69..25e937626 100644 --- a/libs/common/src/lib/interfaces/admin-jobs.interface.ts +++ b/libs/common/src/lib/interfaces/admin-jobs.interface.ts @@ -1,5 +1,16 @@ -import { Job } from 'bull'; +import { Job, JobStatus } from 'bull'; export interface AdminJobs { - jobs: Job[]; + jobs: (Pick< + Job, + | 'attemptsMade' + | 'data' + | 'finishedOn' + | 'id' + | 'name' + | 'stacktrace' + | 'timestamp' + > & { + state: JobStatus | 'stuck'; + })[]; }
## Type Data Source SymbolAttempts Created Finished Status
{{ job.id }}{{ job.name }}{{ job.id }} + + + + Asset Profile + + + Historical Market Data + + + {{ job.data?.dataSource }} {{ job.data?.symbol }} + {{ job.attemptsMade }} + {{ job.timestamp | date: defaultDateTimeFormat }} - - - - + + +