From 557e3a06762ca99a9b8e98c2a46fa68ce86c367f Mon Sep 17 00:00:00 2001 From: Thomas Kaul <4159106+dtslvr@users.noreply.github.com> Date: Sat, 11 Jun 2022 13:40:15 +0200 Subject: [PATCH] Feature/migrate historical market data gathering to queue design pattern (#991) * Migrate historical market data gathering to queue * Filter and delete jobs * Detect duplicate jobs * Update changelog --- CHANGELOG.md | 2 + apps/api/src/app/admin/admin.controller.ts | 62 +++- apps/api/src/app/admin/admin.service.ts | 22 -- .../src/app/admin/queue/queue.controller.ts | 52 ++- apps/api/src/app/admin/queue/queue.service.ts | 59 ++- apps/api/src/app/app.controller.ts | 17 +- apps/api/src/app/cache/cache.controller.ts | 6 +- apps/api/src/app/cache/cache.module.ts | 5 +- apps/api/src/app/cache/cache.service.ts | 15 - apps/api/src/app/info/info.service.ts | 8 - apps/api/src/app/order/order.service.ts | 28 +- apps/api/src/services/cron.service.ts | 24 +- .../api/src/services/data-gathering.module.ts | 5 + .../src/services/data-gathering.processor.ts | 107 +++++- .../src/services/data-gathering.service.ts | 350 +++--------------- .../alpha-vantage/alpha-vantage.service.ts | 11 +- .../eod-historical-data.service.ts | 9 +- .../ghostfolio-scraper-api.service.ts | 9 +- .../google-sheets/google-sheets.service.ts | 9 +- .../rakuten-rapid-api.service.ts | 9 +- .../yahoo-finance/yahoo-finance.service.ts | 18 +- .../api/src/services/interfaces/interfaces.ts | 5 +- .../admin-jobs/admin-jobs.component.ts | 41 +- .../app/components/admin-jobs/admin-jobs.html | 89 ++++- .../admin-jobs/admin-jobs.module.ts | 11 +- .../admin-overview.component.ts | 91 ++--- .../admin-overview/admin-overview.html | 38 +- apps/client/src/app/services/admin.service.ts | 39 +- libs/common/src/lib/config.ts | 39 +- .../lib/interfaces/admin-data.interface.ts | 2 - .../lib/interfaces/admin-jobs.interface.ts | 15 +- 31 files changed, 627 insertions(+), 570 deletions(-) delete mode 100644 apps/api/src/app/cache/cache.service.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 723fd8aac..ae0c8b80d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- Migrated the historical market data gathering to the queue design pattern +- Extended the queue jobs view in the admin control panel by the number of attempts and the status - Refreshed the cryptocurrencies list to support more coins by default - Increased the historical data chart of the _Fear & Greed Index_ (market mood) to 180 days - Upgraded `chart.js` from version `3.7.0` to `3.8.0` diff --git a/apps/api/src/app/admin/admin.controller.ts b/apps/api/src/app/admin/admin.controller.ts index c67b443c3..b13ba5b5e 100644 --- a/apps/api/src/app/admin/admin.controller.ts +++ b/apps/api/src/app/admin/admin.controller.ts @@ -2,8 +2,8 @@ import { DataGatheringService } from '@ghostfolio/api/services/data-gathering.se import { MarketDataService } from '@ghostfolio/api/services/market-data.service'; import { PropertyDto } from '@ghostfolio/api/services/property/property.dto'; import { - DATA_GATHERING_QUEUE, - GATHER_ASSET_PROFILE_PROCESS + GATHER_ASSET_PROFILE_PROCESS, + GATHER_ASSET_PROFILE_PROCESS_OPTIONS } from '@ghostfolio/common/config'; import { AdminData, @@ -12,7 +12,6 @@ import { } from '@ghostfolio/common/interfaces'; import { hasPermission, permissions } from '@ghostfolio/common/permissions'; import type { RequestWithUser } from '@ghostfolio/common/types'; -import { InjectQueue } from '@nestjs/bull'; import { Body, Controller, @@ -28,7 +27,6 @@ import { import { REQUEST } from '@nestjs/core'; import { AuthGuard } from '@nestjs/passport'; import { DataSource, MarketData } from '@prisma/client'; -import { Queue } from 'bull'; import { isDate } from 'date-fns'; import { StatusCodes, getReasonPhrase } from 'http-status-codes'; @@ -39,8 +37,6 @@ import { UpdateMarketDataDto } from './update-market-data.dto'; export class AdminController { public constructor( private readonly adminService: AdminService, - @InjectQueue(DATA_GATHERING_QUEUE) - private readonly dataGatheringQueue: Queue, private readonly dataGatheringService: DataGatheringService, private readonly marketDataService: MarketDataService, @Inject(REQUEST) private readonly request: RequestWithUser @@ -64,6 +60,24 @@ export class AdminController { return this.adminService.get(); } + @Post('gather') + @UseGuards(AuthGuard('jwt')) + public async gather7Days(): Promise { + if ( + !hasPermission( + this.request.user.permissions, + permissions.accessAdminControl + ) + ) { + throw new HttpException( + getReasonPhrase(StatusCodes.FORBIDDEN), + StatusCodes.FORBIDDEN + ); + } + + this.dataGatheringService.gather7Days(); + } + @Post('gather/max') @UseGuards(AuthGuard('jwt')) public async gatherMax(): Promise { @@ -82,10 +96,14 @@ export class AdminController { const uniqueAssets = await this.dataGatheringService.getUniqueAssets(); for (const { dataSource, symbol } of uniqueAssets) { - await this.dataGatheringQueue.add(GATHER_ASSET_PROFILE_PROCESS, { - dataSource, - symbol - }); + await this.dataGatheringService.addJobToQueue( + GATHER_ASSET_PROFILE_PROCESS, + { + dataSource, + symbol + }, + GATHER_ASSET_PROFILE_PROCESS_OPTIONS + ); } this.dataGatheringService.gatherMax(); @@ -109,10 +127,14 @@ export class AdminController { const uniqueAssets = await this.dataGatheringService.getUniqueAssets(); for (const { dataSource, symbol } of uniqueAssets) { - await this.dataGatheringQueue.add(GATHER_ASSET_PROFILE_PROCESS, { - dataSource, - symbol - }); + await this.dataGatheringService.addJobToQueue( + GATHER_ASSET_PROFILE_PROCESS, + { + dataSource, + symbol + }, + GATHER_ASSET_PROFILE_PROCESS_OPTIONS + ); } } @@ -134,10 +156,14 @@ export class AdminController { ); } - await this.dataGatheringQueue.add(GATHER_ASSET_PROFILE_PROCESS, { - dataSource, - symbol - }); + await this.dataGatheringService.addJobToQueue( + GATHER_ASSET_PROFILE_PROCESS, + { + dataSource, + symbol + }, + GATHER_ASSET_PROFILE_PROCESS_OPTIONS + ); } @Post('gather/:dataSource/:symbol') diff --git a/apps/api/src/app/admin/admin.service.ts b/apps/api/src/app/admin/admin.service.ts index 3b2392bfc..5e240a473 100644 --- a/apps/api/src/app/admin/admin.service.ts +++ b/apps/api/src/app/admin/admin.service.ts @@ -42,8 +42,6 @@ export class AdminService { public async get(): Promise { return { - dataGatheringProgress: - await this.dataGatheringService.getDataGatheringProgress(), exchangeRates: this.exchangeRateDataService .getCurrencies() .filter((currency) => { @@ -60,7 +58,6 @@ export class AdminService { ) }; }), - lastDataGathering: await this.getLastDataGathering(), settings: await this.propertyService.get(), transactionCount: await this.prismaService.order.count(), userCount: await this.prismaService.user.count(), @@ -161,30 +158,11 @@ export class AdminService { if (key === PROPERTY_CURRENCIES) { await this.exchangeRateDataService.initialize(); - await this.dataGatheringService.reset(); } return response; } - private async getLastDataGathering() { - const lastDataGathering = - await this.dataGatheringService.getLastDataGathering(); - - if (lastDataGathering) { - return lastDataGathering; - } - - const dataGatheringInProgress = - await this.dataGatheringService.getIsInProgress(); - - if (dataGatheringInProgress) { - return 'IN_PROGRESS'; - } - - return undefined; - } - private async getUsersWithAnalytics(): Promise { const usersWithAnalytics = await this.prismaService.user.findMany({ orderBy: { diff --git a/apps/api/src/app/admin/queue/queue.controller.ts b/apps/api/src/app/admin/queue/queue.controller.ts index 16ec2efe4..1dce79c9d 100644 --- a/apps/api/src/app/admin/queue/queue.controller.ts +++ b/apps/api/src/app/admin/queue/queue.controller.ts @@ -3,13 +3,17 @@ import { hasPermission, permissions } from '@ghostfolio/common/permissions'; import type { RequestWithUser } from '@ghostfolio/common/types'; import { Controller, + Delete, Get, HttpException, Inject, + Param, + Query, UseGuards } from '@nestjs/common'; import { REQUEST } from '@nestjs/core'; import { AuthGuard } from '@nestjs/passport'; +import { JobStatus } from 'bull'; import { StatusCodes, getReasonPhrase } from 'http-status-codes'; import { QueueService } from './queue.service'; @@ -21,9 +25,11 @@ export class QueueController { @Inject(REQUEST) private readonly request: RequestWithUser ) {} - @Get('jobs') + @Delete('job') @UseGuards(AuthGuard('jwt')) - public async getJobs(): Promise { + public async deleteJobs( + @Query('status') filterByStatus?: string + ): Promise { if ( !hasPermission( this.request.user.permissions, @@ -36,6 +42,46 @@ export class QueueController { ); } - return this.queueService.getJobs({}); + const status = filterByStatus?.split(',') ?? undefined; + return this.queueService.deleteJobs({ status }); + } + + @Get('job') + @UseGuards(AuthGuard('jwt')) + public async getJobs( + @Query('status') filterByStatus?: string + ): Promise { + if ( + !hasPermission( + this.request.user.permissions, + permissions.accessAdminControl + ) + ) { + throw new HttpException( + getReasonPhrase(StatusCodes.FORBIDDEN), + StatusCodes.FORBIDDEN + ); + } + + const status = filterByStatus?.split(',') ?? undefined; + return this.queueService.getJobs({ status }); + } + + @Delete('job/:id') + @UseGuards(AuthGuard('jwt')) + public async deleteJob(@Param('id') id: string): Promise { + if ( + !hasPermission( + this.request.user.permissions, + permissions.accessAdminControl + ) + ) { + throw new HttpException( + getReasonPhrase(StatusCodes.FORBIDDEN), + StatusCodes.FORBIDDEN + ); + } + + return this.queueService.deleteJob(id); } } diff --git a/apps/api/src/app/admin/queue/queue.service.ts b/apps/api/src/app/admin/queue/queue.service.ts index f8c5560ec..ebaab6d94 100644 --- a/apps/api/src/app/admin/queue/queue.service.ts +++ b/apps/api/src/app/admin/queue/queue.service.ts @@ -1,8 +1,11 @@ -import { DATA_GATHERING_QUEUE } from '@ghostfolio/common/config'; +import { + DATA_GATHERING_QUEUE, + QUEUE_JOB_STATUS_LIST +} from '@ghostfolio/common/config'; import { AdminJobs } from '@ghostfolio/common/interfaces'; import { InjectQueue } from '@nestjs/bull'; -import { Injectable } from '@nestjs/common'; -import { Queue } from 'bull'; +import { Injectable, Logger } from '@nestjs/common'; +import { JobStatus, Queue } from 'bull'; @Injectable() export class QueueService { @@ -11,22 +14,52 @@ export class QueueService { private readonly dataGatheringQueue: Queue ) {} + public async deleteJob(aId: string) { + return (await this.dataGatheringQueue.getJob(aId))?.remove(); + } + + public async deleteJobs({ + status = QUEUE_JOB_STATUS_LIST + }: { + status?: JobStatus[]; + }) { + const jobs = await this.dataGatheringQueue.getJobs(status); + + for (const job of jobs) { + try { + await job.remove(); + } catch (error) { + Logger.warn(error, 'QueueService'); + } + } + } + public async getJobs({ - limit = 1000 + limit = 1000, + status = QUEUE_JOB_STATUS_LIST }: { limit?: number; + status?: JobStatus[]; }): Promise { - const jobs = await this.dataGatheringQueue.getJobs([ - 'active', - 'completed', - 'delayed', - 'failed', - 'paused', - 'waiting' - ]); + const jobs = await this.dataGatheringQueue.getJobs(status); + + const jobsWithState = await Promise.all( + jobs.slice(0, limit).map(async (job) => { + return { + attemptsMade: job.attemptsMade + 1, + data: job.data, + finishedOn: job.finishedOn, + id: job.id, + name: job.name, + stacktrace: job.stacktrace, + state: await job.getState(), + timestamp: job.timestamp + }; + }) + ); return { - jobs: jobs.slice(0, limit) + jobs: jobsWithState }; } } diff --git a/apps/api/src/app/app.controller.ts b/apps/api/src/app/app.controller.ts index 4d024f117..f7704f9e2 100644 --- a/apps/api/src/app/app.controller.ts +++ b/apps/api/src/app/app.controller.ts @@ -1,21 +1,6 @@ -import { DataGatheringService } from '@ghostfolio/api/services/data-gathering.service'; import { Controller } from '@nestjs/common'; @Controller() export class AppController { - public constructor( - private readonly dataGatheringService: DataGatheringService - ) { - this.initialize(); - } - - private async initialize() { - const isDataGatheringInProgress = - await this.dataGatheringService.getIsInProgress(); - - if (isDataGatheringInProgress) { - // Prepare for automatical data gathering, if hung up in progress state - await this.dataGatheringService.reset(); - } - } + public constructor() {} } diff --git a/apps/api/src/app/cache/cache.controller.ts b/apps/api/src/app/cache/cache.controller.ts index 4783266b4..4d8aac5d3 100644 --- a/apps/api/src/app/cache/cache.controller.ts +++ b/apps/api/src/app/cache/cache.controller.ts @@ -1,4 +1,3 @@ -import { CacheService } from '@ghostfolio/api/app/cache/cache.service'; import { RedisCacheService } from '@ghostfolio/api/app/redis-cache/redis-cache.service'; import { hasPermission, permissions } from '@ghostfolio/common/permissions'; import type { RequestWithUser } from '@ghostfolio/common/types'; @@ -16,7 +15,6 @@ import { StatusCodes, getReasonPhrase } from 'http-status-codes'; @Controller('cache') export class CacheController { public constructor( - private readonly cacheService: CacheService, private readonly redisCacheService: RedisCacheService, @Inject(REQUEST) private readonly request: RequestWithUser ) {} @@ -36,8 +34,6 @@ export class CacheController { ); } - this.redisCacheService.reset(); - - return this.cacheService.flush(); + return this.redisCacheService.reset(); } } diff --git a/apps/api/src/app/cache/cache.module.ts b/apps/api/src/app/cache/cache.module.ts index 7b427b7a0..c079c7942 100644 --- a/apps/api/src/app/cache/cache.module.ts +++ b/apps/api/src/app/cache/cache.module.ts @@ -1,4 +1,3 @@ -import { CacheService } from '@ghostfolio/api/app/cache/cache.service'; import { RedisCacheModule } from '@ghostfolio/api/app/redis-cache/redis-cache.module'; import { ConfigurationModule } from '@ghostfolio/api/services/configuration.module'; import { DataGatheringModule } from '@ghostfolio/api/services/data-gathering.module'; @@ -11,7 +10,6 @@ import { Module } from '@nestjs/common'; import { CacheController } from './cache.controller'; @Module({ - exports: [CacheService], controllers: [CacheController], imports: [ ConfigurationModule, @@ -21,7 +19,6 @@ import { CacheController } from './cache.controller'; PrismaModule, RedisCacheModule, SymbolProfileModule - ], - providers: [CacheService] + ] }) export class CacheModule {} diff --git a/apps/api/src/app/cache/cache.service.ts b/apps/api/src/app/cache/cache.service.ts deleted file mode 100644 index 69c5fe66a..000000000 --- a/apps/api/src/app/cache/cache.service.ts +++ /dev/null @@ -1,15 +0,0 @@ -import { DataGatheringService } from '@ghostfolio/api/services/data-gathering.service'; -import { Injectable } from '@nestjs/common'; - -@Injectable() -export class CacheService { - public constructor( - private readonly dataGaterhingService: DataGatheringService - ) {} - - public async flush(): Promise { - await this.dataGaterhingService.reset(); - - return; - } -} diff --git a/apps/api/src/app/info/info.service.ts b/apps/api/src/app/info/info.service.ts index 440f90aa1..04651dd86 100644 --- a/apps/api/src/app/info/info.service.ts +++ b/apps/api/src/app/info/info.service.ts @@ -106,7 +106,6 @@ export class InfoService { baseCurrency: this.configurationService.get('BASE_CURRENCY'), currencies: this.exchangeRateDataService.getCurrencies(), demoAuthToken: this.getDemoAuthToken(), - lastDataGathering: await this.getLastDataGathering(), statistics: await this.getStatistics(), subscriptions: await this.getSubscriptions(), tags: await this.tagService.get() @@ -215,13 +214,6 @@ export class InfoService { }); } - private async getLastDataGathering() { - const lastDataGathering = - await this.dataGatheringService.getLastDataGathering(); - - return lastDataGathering ?? null; - } - private async getStatistics() { if (!this.configurationService.get('ENABLE_FEATURE_STATISTICS')) { return undefined; diff --git a/apps/api/src/app/order/order.service.ts b/apps/api/src/app/order/order.service.ts index 7b09ec559..0be1523c1 100644 --- a/apps/api/src/app/order/order.service.ts +++ b/apps/api/src/app/order/order.service.ts @@ -1,16 +1,14 @@ import { AccountService } from '@ghostfolio/api/app/account/account.service'; -import { CacheService } from '@ghostfolio/api/app/cache/cache.service'; import { DataGatheringService } from '@ghostfolio/api/services/data-gathering.service'; import { ExchangeRateDataService } from '@ghostfolio/api/services/exchange-rate-data.service'; import { PrismaService } from '@ghostfolio/api/services/prisma.service'; import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile.service'; import { - DATA_GATHERING_QUEUE, - GATHER_ASSET_PROFILE_PROCESS + GATHER_ASSET_PROFILE_PROCESS, + GATHER_ASSET_PROFILE_PROCESS_OPTIONS } from '@ghostfolio/common/config'; import { Filter } from '@ghostfolio/common/interfaces'; import { OrderWithAccount } from '@ghostfolio/common/types'; -import { InjectQueue } from '@nestjs/bull'; import { Injectable } from '@nestjs/common'; import { AssetClass, @@ -21,7 +19,6 @@ import { Type as TypeOfOrder } from '@prisma/client'; import Big from 'big.js'; -import { Queue } from 'bull'; import { endOfToday, isAfter } from 'date-fns'; import { groupBy } from 'lodash'; import { v4 as uuidv4 } from 'uuid'; @@ -32,11 +29,8 @@ import { Activity } from './interfaces/activities.interface'; export class OrderService { public constructor( private readonly accountService: AccountService, - private readonly cacheService: CacheService, - @InjectQueue(DATA_GATHERING_QUEUE) - private readonly dataGatheringQueue: Queue, - private readonly exchangeRateDataService: ExchangeRateDataService, private readonly dataGatheringService: DataGatheringService, + private readonly exchangeRateDataService: ExchangeRateDataService, private readonly prismaService: PrismaService, private readonly symbolProfileService: SymbolProfileService ) {} @@ -120,10 +114,14 @@ export class OrderService { data.SymbolProfile.connectOrCreate.create.symbol.toUpperCase(); } - await this.dataGatheringQueue.add(GATHER_ASSET_PROFILE_PROCESS, { - dataSource: data.SymbolProfile.connectOrCreate.create.dataSource, - symbol: data.SymbolProfile.connectOrCreate.create.symbol - }); + await this.dataGatheringService.addJobToQueue( + GATHER_ASSET_PROFILE_PROCESS, + { + dataSource: data.SymbolProfile.connectOrCreate.create.dataSource, + symbol: data.SymbolProfile.connectOrCreate.create.symbol + }, + GATHER_ASSET_PROFILE_PROCESS_OPTIONS + ); const isDraft = isAfter(data.date as Date, endOfToday()); @@ -138,8 +136,6 @@ export class OrderService { ]); } - await this.cacheService.flush(); - delete data.accountId; delete data.assetClass; delete data.assetSubClass; @@ -330,8 +326,6 @@ export class OrderService { } } - await this.cacheService.flush(); - delete data.assetClass; delete data.assetSubClass; delete data.currency; diff --git a/apps/api/src/services/cron.service.ts b/apps/api/src/services/cron.service.ts index 40051b9ce..727ff0998 100644 --- a/apps/api/src/services/cron.service.ts +++ b/apps/api/src/services/cron.service.ts @@ -1,11 +1,9 @@ import { - DATA_GATHERING_QUEUE, - GATHER_ASSET_PROFILE_PROCESS + GATHER_ASSET_PROFILE_PROCESS, + GATHER_ASSET_PROFILE_PROCESS_OPTIONS } from '@ghostfolio/common/config'; -import { InjectQueue } from '@nestjs/bull'; import { Injectable } from '@nestjs/common'; import { Cron, CronExpression } from '@nestjs/schedule'; -import { Queue } from 'bull'; import { DataGatheringService } from './data-gathering.service'; import { ExchangeRateDataService } from './exchange-rate-data.service'; @@ -14,15 +12,13 @@ import { TwitterBotService } from './twitter-bot/twitter-bot.service'; @Injectable() export class CronService { public constructor( - @InjectQueue(DATA_GATHERING_QUEUE) - private readonly dataGatheringQueue: Queue, private readonly dataGatheringService: DataGatheringService, private readonly exchangeRateDataService: ExchangeRateDataService, private readonly twitterBotService: TwitterBotService ) {} - @Cron(CronExpression.EVERY_MINUTE) - public async runEveryMinute() { + @Cron(CronExpression.EVERY_HOUR) + public async runEveryHour() { await this.dataGatheringService.gather7Days(); } @@ -41,10 +37,14 @@ export class CronService { const uniqueAssets = await this.dataGatheringService.getUniqueAssets(); for (const { dataSource, symbol } of uniqueAssets) { - await this.dataGatheringQueue.add(GATHER_ASSET_PROFILE_PROCESS, { - dataSource, - symbol - }); + await this.dataGatheringService.addJobToQueue( + GATHER_ASSET_PROFILE_PROCESS, + { + dataSource, + symbol + }, + GATHER_ASSET_PROFILE_PROCESS_OPTIONS + ); } } } diff --git a/apps/api/src/services/data-gathering.module.ts b/apps/api/src/services/data-gathering.module.ts index e8e98058c..7e6fdc029 100644 --- a/apps/api/src/services/data-gathering.module.ts +++ b/apps/api/src/services/data-gathering.module.ts @@ -6,6 +6,7 @@ import { PrismaModule } from '@ghostfolio/api/services/prisma.module'; import { DATA_GATHERING_QUEUE } from '@ghostfolio/common/config'; import { BullModule } from '@nestjs/bull'; import { Module } from '@nestjs/common'; +import ms from 'ms'; import { DataGatheringProcessor } from './data-gathering.processor'; import { ExchangeRateDataModule } from './exchange-rate-data.module'; @@ -14,6 +15,10 @@ import { SymbolProfileModule } from './symbol-profile.module'; @Module({ imports: [ BullModule.registerQueue({ + limiter: { + duration: ms('5 seconds'), + max: 1 + }, name: DATA_GATHERING_QUEUE }), ConfigurationModule, diff --git a/apps/api/src/services/data-gathering.processor.ts b/apps/api/src/services/data-gathering.processor.ts index de8d8eb4e..7e2a27642 100644 --- a/apps/api/src/services/data-gathering.processor.ts +++ b/apps/api/src/services/data-gathering.processor.ts @@ -1,19 +1,34 @@ import { DATA_GATHERING_QUEUE, - GATHER_ASSET_PROFILE_PROCESS + GATHER_ASSET_PROFILE_PROCESS, + GATHER_HISTORICAL_MARKET_DATA_PROCESS } from '@ghostfolio/common/config'; +import { DATE_FORMAT } from '@ghostfolio/common/helper'; import { UniqueAsset } from '@ghostfolio/common/interfaces'; import { Process, Processor } from '@nestjs/bull'; import { Injectable, Logger } from '@nestjs/common'; import { Job } from 'bull'; +import { + format, + getDate, + getMonth, + getYear, + isBefore, + parseISO +} from 'date-fns'; import { DataGatheringService } from './data-gathering.service'; +import { DataProviderService } from './data-provider/data-provider.service'; +import { IDataGatheringItem } from './interfaces/interfaces'; +import { PrismaService } from './prisma.service'; @Injectable() @Processor(DATA_GATHERING_QUEUE) export class DataGatheringProcessor { public constructor( - private readonly dataGatheringService: DataGatheringService + private readonly dataGatheringService: DataGatheringService, + private readonly dataProviderService: DataProviderService, + private readonly prismaService: PrismaService ) {} @Process(GATHER_ASSET_PROFILE_PROCESS) @@ -21,7 +36,93 @@ export class DataGatheringProcessor { try { await this.dataGatheringService.gatherAssetProfiles([job.data]); } catch (error) { - Logger.error(error, 'DataGatheringProcessor'); + Logger.error( + error, + `DataGatheringProcessor (${GATHER_ASSET_PROFILE_PROCESS})` + ); + + throw new Error(error); + } + } + + @Process(GATHER_HISTORICAL_MARKET_DATA_PROCESS) + public async gatherHistoricalMarketData(job: Job) { + try { + const { dataSource, date, symbol } = job.data; + + const historicalData = await this.dataProviderService.getHistoricalRaw( + [{ dataSource, symbol }], + parseISO((date)), + new Date() + ); + + let currentDate = parseISO((date)); + let lastMarketPrice: number; + + while ( + isBefore( + currentDate, + new Date( + Date.UTC( + getYear(new Date()), + getMonth(new Date()), + getDate(new Date()), + 0 + ) + ) + ) + ) { + if ( + historicalData[symbol]?.[format(currentDate, DATE_FORMAT)] + ?.marketPrice + ) { + lastMarketPrice = + historicalData[symbol]?.[format(currentDate, DATE_FORMAT)] + ?.marketPrice; + } + + if (lastMarketPrice) { + try { + await this.prismaService.marketData.create({ + data: { + dataSource, + symbol, + date: new Date( + Date.UTC( + getYear(currentDate), + getMonth(currentDate), + getDate(currentDate), + 0 + ) + ), + marketPrice: lastMarketPrice + } + }); + } catch {} + } + + // Count month one up for iteration + currentDate = new Date( + Date.UTC( + getYear(currentDate), + getMonth(currentDate), + getDate(currentDate) + 1, + 0 + ) + ); + } + + Logger.log( + `Historical market data gathering has been completed for ${symbol} (${dataSource}).`, + `DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS})` + ); + } catch (error) { + Logger.error( + error, + `DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS})` + ); + + throw new Error(error); } } } diff --git a/apps/api/src/services/data-gathering.service.ts b/apps/api/src/services/data-gathering.service.ts index 507e1e146..5bc669e6e 100644 --- a/apps/api/src/services/data-gathering.service.ts +++ b/apps/api/src/services/data-gathering.service.ts @@ -1,21 +1,17 @@ import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile.service'; import { - PROPERTY_LAST_DATA_GATHERING, - PROPERTY_LOCKED_DATA_GATHERING + DATA_GATHERING_QUEUE, + GATHER_HISTORICAL_MARKET_DATA_PROCESS, + GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS, + QUEUE_JOB_STATUS_LIST } from '@ghostfolio/common/config'; import { DATE_FORMAT, resetHours } from '@ghostfolio/common/helper'; import { UniqueAsset } from '@ghostfolio/common/interfaces'; +import { InjectQueue } from '@nestjs/bull'; import { Inject, Injectable, Logger } from '@nestjs/common'; import { DataSource } from '@prisma/client'; -import { - differenceInHours, - format, - getDate, - getMonth, - getYear, - isBefore, - subDays -} from 'date-fns'; +import { JobOptions, Queue } from 'bull'; +import { format, subDays } from 'date-fns'; import { DataProviderService } from './data-provider/data-provider.service'; import { DataEnhancerInterface } from './data-provider/interfaces/data-enhancer.interface'; @@ -25,167 +21,48 @@ import { PrismaService } from './prisma.service'; @Injectable() export class DataGatheringService { - private dataGatheringProgress: number; - public constructor( @Inject('DataEnhancers') private readonly dataEnhancers: DataEnhancerInterface[], + @InjectQueue(DATA_GATHERING_QUEUE) + private readonly dataGatheringQueue: Queue, private readonly dataProviderService: DataProviderService, private readonly exchangeRateDataService: ExchangeRateDataService, private readonly prismaService: PrismaService, private readonly symbolProfileService: SymbolProfileService ) {} - public async gather7Days() { - const isDataGatheringNeeded = await this.isDataGatheringNeeded(); - - if (isDataGatheringNeeded) { - Logger.log('7d data gathering has been started.', 'DataGatheringService'); - console.time('data-gathering-7d'); - - await this.prismaService.property.create({ - data: { - key: PROPERTY_LOCKED_DATA_GATHERING, - value: new Date().toISOString() - } - }); - - const symbols = await this.getSymbols7D(); - - try { - await this.gatherSymbols(symbols); - - await this.prismaService.property.upsert({ - create: { - key: PROPERTY_LAST_DATA_GATHERING, - value: new Date().toISOString() - }, - update: { value: new Date().toISOString() }, - where: { key: PROPERTY_LAST_DATA_GATHERING } - }); - } catch (error) { - Logger.error(error, 'DataGatheringService'); - } - - await this.prismaService.property.delete({ - where: { - key: PROPERTY_LOCKED_DATA_GATHERING - } - }); + public async addJobToQueue(name: string, data: any, options?: JobOptions) { + const hasJob = await this.hasJob(name, data); + if (hasJob) { Logger.log( - '7d data gathering has been completed.', + `Job ${name} with data ${JSON.stringify(data)} already exists.`, 'DataGatheringService' ); - console.timeEnd('data-gathering-7d'); + } else { + return this.dataGatheringQueue.add(name, data, options); } } - public async gatherMax() { - const isDataGatheringLocked = await this.prismaService.property.findUnique({ - where: { key: PROPERTY_LOCKED_DATA_GATHERING } - }); - - if (!isDataGatheringLocked) { - Logger.log( - 'Max data gathering has been started.', - 'DataGatheringService' - ); - console.time('data-gathering-max'); - - await this.prismaService.property.create({ - data: { - key: PROPERTY_LOCKED_DATA_GATHERING, - value: new Date().toISOString() - } - }); - - const symbols = await this.getSymbolsMax(); - - try { - await this.gatherSymbols(symbols); - - await this.prismaService.property.upsert({ - create: { - key: PROPERTY_LAST_DATA_GATHERING, - value: new Date().toISOString() - }, - update: { value: new Date().toISOString() }, - where: { key: PROPERTY_LAST_DATA_GATHERING } - }); - } catch (error) { - Logger.error(error, 'DataGatheringService'); - } - - await this.prismaService.property.delete({ - where: { - key: PROPERTY_LOCKED_DATA_GATHERING - } - }); + public async gather7Days() { + const dataGatheringItems = await this.getSymbols7D(); + await this.gatherSymbols(dataGatheringItems); + } - Logger.log( - 'Max data gathering has been completed.', - 'DataGatheringService' - ); - console.timeEnd('data-gathering-max'); - } + public async gatherMax() { + const dataGatheringItems = await this.getSymbolsMax(); + await this.gatherSymbols(dataGatheringItems); } public async gatherSymbol({ dataSource, symbol }: UniqueAsset) { - const isDataGatheringLocked = await this.prismaService.property.findUnique({ - where: { key: PROPERTY_LOCKED_DATA_GATHERING } - }); - - if (!isDataGatheringLocked) { - Logger.log( - `Symbol data gathering for ${symbol} has been started.`, - 'DataGatheringService' - ); - console.time('data-gathering-symbol'); - - await this.prismaService.property.create({ - data: { - key: PROPERTY_LOCKED_DATA_GATHERING, - value: new Date().toISOString() - } - }); - - const symbols = (await this.getSymbolsMax()).filter( - (dataGatheringItem) => { - return ( - dataGatheringItem.dataSource === dataSource && - dataGatheringItem.symbol === symbol - ); - } - ); - - try { - await this.gatherSymbols(symbols); - - await this.prismaService.property.upsert({ - create: { - key: PROPERTY_LAST_DATA_GATHERING, - value: new Date().toISOString() - }, - update: { value: new Date().toISOString() }, - where: { key: PROPERTY_LAST_DATA_GATHERING } - }); - } catch (error) { - Logger.error(error, 'DataGatheringService'); - } - - await this.prismaService.property.delete({ - where: { - key: PROPERTY_LOCKED_DATA_GATHERING - } - }); - - Logger.log( - `Symbol data gathering for ${symbol} has been completed.`, - 'DataGatheringService' + const symbols = (await this.getSymbolsMax()).filter((dataGatheringItem) => { + return ( + dataGatheringItem.dataSource === dataSource && + dataGatheringItem.symbol === symbol ); - console.timeEnd('data-gathering-symbol'); - } + }); + await this.gatherSymbols(symbols); } public async gatherSymbolForDate({ @@ -235,15 +112,6 @@ export class DataGatheringService { uniqueAssets = await this.getUniqueAssets(); } - Logger.log( - `Asset profile data gathering has been started for ${uniqueAssets - .map(({ dataSource, symbol }) => { - return `${symbol} (${dataSource})`; - }) - .join(',')}.`, - 'DataGatheringService' - ); - const assetProfiles = await this.dataProviderService.getAssetProfiles( uniqueAssets ); @@ -334,136 +202,21 @@ export class DataGatheringService { } public async gatherSymbols(aSymbolsWithStartDate: IDataGatheringItem[]) { - let hasError = false; - let symbolCounter = 0; - for (const { dataSource, date, symbol } of aSymbolsWithStartDate) { if (dataSource === 'MANUAL') { continue; } - this.dataGatheringProgress = symbolCounter / aSymbolsWithStartDate.length; - - try { - const historicalData = await this.dataProviderService.getHistoricalRaw( - [{ dataSource, symbol }], + await this.addJobToQueue( + GATHER_HISTORICAL_MARKET_DATA_PROCESS, + { + dataSource, date, - new Date() - ); - - let currentDate = date; - let lastMarketPrice: number; - - while ( - isBefore( - currentDate, - new Date( - Date.UTC( - getYear(new Date()), - getMonth(new Date()), - getDate(new Date()), - 0 - ) - ) - ) - ) { - if ( - historicalData[symbol]?.[format(currentDate, DATE_FORMAT)] - ?.marketPrice - ) { - lastMarketPrice = - historicalData[symbol]?.[format(currentDate, DATE_FORMAT)] - ?.marketPrice; - } - - if (lastMarketPrice) { - try { - await this.prismaService.marketData.create({ - data: { - dataSource, - symbol, - date: new Date( - Date.UTC( - getYear(currentDate), - getMonth(currentDate), - getDate(currentDate), - 0 - ) - ), - marketPrice: lastMarketPrice - } - }); - } catch {} - } else { - Logger.warn( - `Failed to gather data for symbol ${symbol} from ${dataSource} at ${format( - currentDate, - DATE_FORMAT - )}.`, - 'DataGatheringService' - ); - } - - // Count month one up for iteration - currentDate = new Date( - Date.UTC( - getYear(currentDate), - getMonth(currentDate), - getDate(currentDate) + 1, - 0 - ) - ); - } - } catch (error) { - hasError = true; - Logger.error(error, 'DataGatheringService'); - } - - if (symbolCounter > 0 && symbolCounter % 100 === 0) { - Logger.log( - `Data gathering progress: ${( - this.dataGatheringProgress * 100 - ).toFixed(2)}%`, - 'DataGatheringService' - ); - } - - symbolCounter += 1; - } - - await this.exchangeRateDataService.initialize(); - - if (hasError) { - throw ''; - } - } - - public async getDataGatheringProgress() { - const isInProgress = await this.getIsInProgress(); - - if (isInProgress) { - return this.dataGatheringProgress; - } - - return undefined; - } - - public async getIsInProgress() { - return await this.prismaService.property.findUnique({ - where: { key: PROPERTY_LOCKED_DATA_GATHERING } - }); - } - - public async getLastDataGathering() { - const lastDataGathering = await this.prismaService.property.findUnique({ - where: { key: PROPERTY_LAST_DATA_GATHERING } - }); - - if (lastDataGathering?.value) { - return new Date(lastDataGathering.value); + symbol + }, + GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS + ); } - - return undefined; } public async getSymbolsMax(): Promise { @@ -534,19 +287,6 @@ export class DataGatheringService { }); } - public async reset() { - Logger.log('Data gathering has been reset.', 'DataGatheringService'); - - await this.prismaService.property.deleteMany({ - where: { - OR: [ - { key: PROPERTY_LAST_DATA_GATHERING }, - { key: PROPERTY_LOCKED_DATA_GATHERING } - ] - } - }); - } - private async getSymbols7D(): Promise { const startDate = subDays(resetHours(new Date()), 7); @@ -610,15 +350,17 @@ export class DataGatheringService { return [...currencyPairsToGather, ...symbolProfilesToGather]; } - private async isDataGatheringNeeded() { - const lastDataGathering = await this.getLastDataGathering(); + private async hasJob(name: string, data: any) { + const jobs = await this.dataGatheringQueue.getJobs( + QUEUE_JOB_STATUS_LIST.filter((status) => { + return status !== 'completed'; + }) + ); - const isDataGatheringLocked = await this.prismaService.property.findUnique({ - where: { key: PROPERTY_LOCKED_DATA_GATHERING } + return jobs.some((job) => { + return ( + job.name === name && JSON.stringify(job.data) === JSON.stringify(data) + ); }); - - const diffInHours = differenceInHours(new Date(), lastDataGathering); - - return (diffInHours >= 1 || !lastDataGathering) && !isDataGatheringLocked; } } diff --git a/apps/api/src/services/data-provider/alpha-vantage/alpha-vantage.service.ts b/apps/api/src/services/data-provider/alpha-vantage/alpha-vantage.service.ts index bff966fe3..41bd715b1 100644 --- a/apps/api/src/services/data-provider/alpha-vantage/alpha-vantage.service.ts +++ b/apps/api/src/services/data-provider/alpha-vantage/alpha-vantage.service.ts @@ -9,7 +9,7 @@ import { DATE_FORMAT } from '@ghostfolio/common/helper'; import { Granularity } from '@ghostfolio/common/types'; import { Injectable, Logger } from '@nestjs/common'; import { DataSource, SymbolProfile } from '@prisma/client'; -import { isAfter, isBefore, parse } from 'date-fns'; +import { format, isAfter, isBefore, parse } from 'date-fns'; import { IAlphaVantageHistoricalResponse } from './interfaces/interfaces'; @@ -76,9 +76,12 @@ export class AlphaVantageService implements DataProviderInterface { return response; } catch (error) { - Logger.error(error, 'AlphaVantageService'); - - return {}; + throw new Error( + `Could not get historical market data for ${aSymbol} (${this.getName()}) from ${format( + from, + DATE_FORMAT + )} to ${format(to, DATE_FORMAT)}: [${error.name}] ${error.message}` + ); } } diff --git a/apps/api/src/services/data-provider/eod-historical-data/eod-historical-data.service.ts b/apps/api/src/services/data-provider/eod-historical-data/eod-historical-data.service.ts index bb0401f00..1dd3c7aff 100644 --- a/apps/api/src/services/data-provider/eod-historical-data/eod-historical-data.service.ts +++ b/apps/api/src/services/data-provider/eod-historical-data/eod-historical-data.service.ts @@ -72,10 +72,13 @@ export class EodHistoricalDataService implements DataProviderInterface { { [aSymbol]: {} } ); } catch (error) { - Logger.error(error, 'EodHistoricalDataService'); + throw new Error( + `Could not get historical market data for ${aSymbol} (${this.getName()}) from ${format( + from, + DATE_FORMAT + )} to ${format(to, DATE_FORMAT)}: [${error.name}] ${error.message}` + ); } - - return {}; } public getName(): DataSource { diff --git a/apps/api/src/services/data-provider/ghostfolio-scraper-api/ghostfolio-scraper-api.service.ts b/apps/api/src/services/data-provider/ghostfolio-scraper-api/ghostfolio-scraper-api.service.ts index 7186ea7ec..373922fe1 100644 --- a/apps/api/src/services/data-provider/ghostfolio-scraper-api/ghostfolio-scraper-api.service.ts +++ b/apps/api/src/services/data-provider/ghostfolio-scraper-api/ghostfolio-scraper-api.service.ts @@ -87,10 +87,13 @@ export class GhostfolioScraperApiService implements DataProviderInterface { } }; } catch (error) { - Logger.error(error, 'GhostfolioScraperApiService'); + throw new Error( + `Could not get historical market data for ${aSymbol} (${this.getName()}) from ${format( + from, + DATE_FORMAT + )} to ${format(to, DATE_FORMAT)}: [${error.name}] ${error.message}` + ); } - - return {}; } public getName(): DataSource { diff --git a/apps/api/src/services/data-provider/google-sheets/google-sheets.service.ts b/apps/api/src/services/data-provider/google-sheets/google-sheets.service.ts index b196df532..cc6af5241 100644 --- a/apps/api/src/services/data-provider/google-sheets/google-sheets.service.ts +++ b/apps/api/src/services/data-provider/google-sheets/google-sheets.service.ts @@ -71,10 +71,13 @@ export class GoogleSheetsService implements DataProviderInterface { [symbol]: historicalData }; } catch (error) { - Logger.error(error, 'GoogleSheetsService'); + throw new Error( + `Could not get historical market data for ${aSymbol} (${this.getName()}) from ${format( + from, + DATE_FORMAT + )} to ${format(to, DATE_FORMAT)}: [${error.name}] ${error.message}` + ); } - - return {}; } public getName(): DataSource { diff --git a/apps/api/src/services/data-provider/rakuten-rapid-api/rakuten-rapid-api.service.ts b/apps/api/src/services/data-provider/rakuten-rapid-api/rakuten-rapid-api.service.ts index 2a516c5ef..7d92ae354 100644 --- a/apps/api/src/services/data-provider/rakuten-rapid-api/rakuten-rapid-api.service.ts +++ b/apps/api/src/services/data-provider/rakuten-rapid-api/rakuten-rapid-api.service.ts @@ -90,7 +90,14 @@ export class RakutenRapidApiService implements DataProviderInterface { } }; } - } catch (error) {} + } catch (error) { + throw new Error( + `Could not get historical market data for ${aSymbol} (${this.getName()}) from ${format( + from, + DATE_FORMAT + )} to ${format(to, DATE_FORMAT)}: [${error.name}] ${error.message}` + ); + } return {}; } diff --git a/apps/api/src/services/data-provider/yahoo-finance/yahoo-finance.service.ts b/apps/api/src/services/data-provider/yahoo-finance/yahoo-finance.service.ts index f277d9ac9..81f8c43fc 100644 --- a/apps/api/src/services/data-provider/yahoo-finance/yahoo-finance.service.ts +++ b/apps/api/src/services/data-provider/yahoo-finance/yahoo-finance.service.ts @@ -131,7 +131,13 @@ export class YahooFinanceService implements DataProviderInterface { if (url) { response.url = url; } - } catch {} + } catch (error) { + throw new Error( + `Could not get asset profile for ${aSymbol} (${this.getName()}): [${ + error.name + }] ${error.message}` + ); + } return response; } @@ -185,12 +191,12 @@ export class YahooFinanceService implements DataProviderInterface { return response; } catch (error) { - Logger.warn( - `Skipping yahooFinance2.getHistorical("${aSymbol}"): [${error.name}] ${error.message}`, - 'YahooFinanceService' + throw new Error( + `Could not get historical market data for ${aSymbol} (${this.getName()}) from ${format( + from, + DATE_FORMAT + )} to ${format(to, DATE_FORMAT)}: [${error.name}] ${error.message}` ); - - return {}; } } diff --git a/apps/api/src/services/interfaces/interfaces.ts b/apps/api/src/services/interfaces/interfaces.ts index dbe3dfa4f..1148dd6af 100644 --- a/apps/api/src/services/interfaces/interfaces.ts +++ b/apps/api/src/services/interfaces/interfaces.ts @@ -1,3 +1,4 @@ +import { UniqueAsset } from '@ghostfolio/common/interfaces'; import { MarketState } from '@ghostfolio/common/types'; import { Account, @@ -32,8 +33,6 @@ export interface IDataProviderResponse { marketState: MarketState; } -export interface IDataGatheringItem { - dataSource: DataSource; +export interface IDataGatheringItem extends UniqueAsset { date?: Date; - symbol: string; } diff --git a/apps/client/src/app/components/admin-jobs/admin-jobs.component.ts b/apps/client/src/app/components/admin-jobs/admin-jobs.component.ts index ad94cb93e..aba2fd29b 100644 --- a/apps/client/src/app/components/admin-jobs/admin-jobs.component.ts +++ b/apps/client/src/app/components/admin-jobs/admin-jobs.component.ts @@ -5,10 +5,13 @@ import { OnDestroy, OnInit } from '@angular/core'; +import { FormBuilder, FormGroup } from '@angular/forms'; import { AdminService } from '@ghostfolio/client/services/admin.service'; import { UserService } from '@ghostfolio/client/services/user/user.service'; +import { QUEUE_JOB_STATUS_LIST } from '@ghostfolio/common/config'; import { getDateWithTimeFormatString } from '@ghostfolio/common/helper'; import { AdminJobs, User } from '@ghostfolio/common/interfaces'; +import { JobStatus } from 'bull'; import { Subject } from 'rxjs'; import { takeUntil } from 'rxjs/operators'; @@ -20,7 +23,9 @@ import { takeUntil } from 'rxjs/operators'; }) export class AdminJobsComponent implements OnDestroy, OnInit { public defaultDateTimeFormat: string; + public filterForm: FormGroup; public jobs: AdminJobs['jobs'] = []; + public statusFilterOptions = QUEUE_JOB_STATUS_LIST; public user: User; private unsubscribeSubject = new Subject(); @@ -31,6 +36,7 @@ export class AdminJobsComponent implements OnDestroy, OnInit { public constructor( private adminService: AdminService, private changeDetectorRef: ChangeDetectorRef, + private formBuilder: FormBuilder, private userService: UserService ) { this.userService.stateChanged @@ -50,9 +56,40 @@ export class AdminJobsComponent implements OnDestroy, OnInit { * Initializes the controller */ public ngOnInit() { + this.filterForm = this.formBuilder.group({ + status: [] + }); + + this.filterForm.valueChanges + .pipe(takeUntil(this.unsubscribeSubject)) + .subscribe(() => { + const currentFilter = this.filterForm.get('status').value; + this.fetchJobs(currentFilter ? [currentFilter] : undefined); + }); + this.fetchJobs(); } + public onDeleteJob(aId: string) { + this.adminService + .deleteJob(aId) + .pipe(takeUntil(this.unsubscribeSubject)) + .subscribe(() => { + this.fetchJobs(); + }); + } + + public onDeleteJobs() { + const currentFilter = this.filterForm.get('status').value; + + this.adminService + .deleteJobs({ status: currentFilter ? [currentFilter] : undefined }) + .pipe(takeUntil(this.unsubscribeSubject)) + .subscribe(() => { + this.fetchJobs(currentFilter ? [currentFilter] : undefined); + }); + } + public onViewStacktrace(aStacktrace: AdminJobs['jobs'][0]['stacktrace']) { alert(JSON.stringify(aStacktrace, null, ' ')); } @@ -62,9 +99,9 @@ export class AdminJobsComponent implements OnDestroy, OnInit { this.unsubscribeSubject.complete(); } - private fetchJobs() { + private fetchJobs(aStatus?: JobStatus[]) { this.adminService - .fetchJobs() + .fetchJobs({ status: aStatus }) .pipe(takeUntil(this.unsubscribeSubject)) .subscribe(({ jobs }) => { this.jobs = jobs; diff --git a/apps/client/src/app/components/admin-jobs/admin-jobs.html b/apps/client/src/app/components/admin-jobs/admin-jobs.html index c17bdcd2b..6c48b9576 100644 --- a/apps/client/src/app/components/admin-jobs/admin-jobs.html +++ b/apps/client/src/app/components/admin-jobs/admin-jobs.html @@ -1,13 +1,34 @@
+
+ + + + {{ statusFilterOption }} + + + +
- + - + + @@ -17,10 +38,28 @@ - - - + + + + @@ -29,21 +68,32 @@ diff --git a/apps/client/src/app/components/admin-jobs/admin-jobs.module.ts b/apps/client/src/app/components/admin-jobs/admin-jobs.module.ts index d96591949..93e668097 100644 --- a/apps/client/src/app/components/admin-jobs/admin-jobs.module.ts +++ b/apps/client/src/app/components/admin-jobs/admin-jobs.module.ts @@ -1,13 +1,22 @@ import { CommonModule } from '@angular/common'; import { CUSTOM_ELEMENTS_SCHEMA, NgModule } from '@angular/core'; +import { FormsModule, ReactiveFormsModule } from '@angular/forms'; import { MatButtonModule } from '@angular/material/button'; import { MatMenuModule } from '@angular/material/menu'; +import { MatSelectModule } from '@angular/material/select'; import { AdminJobsComponent } from './admin-jobs.component'; @NgModule({ declarations: [AdminJobsComponent], - imports: [CommonModule, MatButtonModule, MatMenuModule], + imports: [ + CommonModule, + FormsModule, + MatButtonModule, + MatMenuModule, + MatSelectModule, + ReactiveFormsModule + ], schemas: [CUSTOM_ELEMENTS_SCHEMA] }) export class GfAdminJobsModule {} diff --git a/apps/client/src/app/components/admin-overview/admin-overview.component.ts b/apps/client/src/app/components/admin-overview/admin-overview.component.ts index f115c4ca9..f5ac65178 100644 --- a/apps/client/src/app/components/admin-overview/admin-overview.component.ts +++ b/apps/client/src/app/components/admin-overview/admin-overview.component.ts @@ -15,7 +15,6 @@ import { hasPermission, permissions } from '@ghostfolio/common/permissions'; import { differenceInSeconds, formatDistanceToNowStrict, - isValid, parseISO } from 'date-fns'; import { uniq } from 'lodash'; @@ -32,14 +31,11 @@ export class AdminOverviewComponent implements OnDestroy, OnInit { public couponDuration: StringValue = '30 days'; public coupons: Coupon[]; public customCurrencies: string[]; - public dataGatheringInProgress: boolean; - public dataGatheringProgress: number; public exchangeRates: { label1: string; label2: string; value: number }[]; public hasPermissionForSubscription: boolean; public hasPermissionForSystemMessage: boolean; public hasPermissionToToggleReadOnlyMode: boolean; public info: InfoItem; - public lastDataGathering: string; public transactionCount: number; public userCount: number; public user: User; @@ -128,7 +124,7 @@ export class AdminOverviewComponent implements OnDestroy, OnInit { public onDeleteCoupon(aCouponCode: string) { const confirmation = confirm('Do you really want to delete this coupon?'); - if (confirmation) { + if (confirmation === true) { const coupons = this.coupons.filter((coupon) => { return coupon.code !== aCouponCode; }); @@ -139,7 +135,7 @@ export class AdminOverviewComponent implements OnDestroy, OnInit { public onDeleteCurrency(aCurrency: string) { const confirmation = confirm('Do you really want to delete this currency?'); - if (confirmation) { + if (confirmation === true) { const currencies = this.customCurrencies.filter((currency) => { return currency !== aCurrency; }); @@ -152,24 +148,11 @@ export class AdminOverviewComponent implements OnDestroy, OnInit { } public onFlushCache() { - this.cacheService - .flush() - .pipe(takeUntil(this.unsubscribeSubject)) - .subscribe(() => { - setTimeout(() => { - window.location.reload(); - }, 300); - }); - } - - public onGatherMax() { - const confirmation = confirm( - 'This action may take some time. Do you want to proceed?' - ); + const confirmation = confirm('Do you really want to flush the cache?'); if (confirmation === true) { - this.adminService - .gatherMax() + this.cacheService + .flush() .pipe(takeUntil(this.unsubscribeSubject)) .subscribe(() => { setTimeout(() => { @@ -179,6 +162,28 @@ export class AdminOverviewComponent implements OnDestroy, OnInit { } } + public onGather7Days() { + this.adminService + .gather7Days() + .pipe(takeUntil(this.unsubscribeSubject)) + .subscribe(() => { + setTimeout(() => { + window.location.reload(); + }, 300); + }); + } + + public onGatherMax() { + this.adminService + .gatherMax() + .pipe(takeUntil(this.unsubscribeSubject)) + .subscribe(() => { + setTimeout(() => { + window.location.reload(); + }, 300); + }); + } + public onGatherProfileData() { this.adminService .gatherProfileData() @@ -207,39 +212,15 @@ export class AdminOverviewComponent implements OnDestroy, OnInit { this.dataService .fetchAdminData() .pipe(takeUntil(this.unsubscribeSubject)) - .subscribe( - ({ - dataGatheringProgress, - exchangeRates, - lastDataGathering, - settings, - transactionCount, - userCount - }) => { - this.coupons = (settings[PROPERTY_COUPONS] as Coupon[]) ?? []; - this.customCurrencies = settings[PROPERTY_CURRENCIES] as string[]; - this.dataGatheringProgress = dataGatheringProgress; - this.exchangeRates = exchangeRates; - - if (isValid(parseISO(lastDataGathering?.toString()))) { - this.lastDataGathering = formatDistanceToNowStrict( - new Date(lastDataGathering), - { - addSuffix: true - } - ); - } else if (lastDataGathering === 'IN_PROGRESS') { - this.dataGatheringInProgress = true; - } else { - this.lastDataGathering = 'Starting soon...'; - } - - this.transactionCount = transactionCount; - this.userCount = userCount; - - this.changeDetectorRef.markForCheck(); - } - ); + .subscribe(({ exchangeRates, settings, transactionCount, userCount }) => { + this.coupons = (settings[PROPERTY_COUPONS] as Coupon[]) ?? []; + this.customCurrencies = settings[PROPERTY_CURRENCIES] as string[]; + this.exchangeRates = exchangeRates; + this.transactionCount = transactionCount; + this.userCount = userCount; + + this.changeDetectorRef.markForCheck(); + }); } private generateCouponCode(aLength: number) { diff --git a/apps/client/src/app/components/admin-overview/admin-overview.html b/apps/client/src/app/components/admin-overview/admin-overview.html index 6118fccaa..d510582c3 100644 --- a/apps/client/src/app/components/admin-overview/admin-overview.html +++ b/apps/client/src/app/components/admin-overview/admin-overview.html @@ -19,37 +19,30 @@
Data Gathering
-
- {{ lastDataGathering }} - In Progress ({{ dataGatheringProgress | percent : '1.2-2' - }}) -
-
+
@@ -58,7 +51,6 @@ class="mb-2 mr-2" color="accent" mat-flat-button - [disabled]="dataGatheringInProgress" (click)="onGatherProfileData()" > @@ -109,7 +100,6 @@
+
+
Housekeeping
+
+ +
+
diff --git a/apps/client/src/app/services/admin.service.ts b/apps/client/src/app/services/admin.service.ts index 208f931aa..610be4147 100644 --- a/apps/client/src/app/services/admin.service.ts +++ b/apps/client/src/app/services/admin.service.ts @@ -1,4 +1,4 @@ -import { HttpClient } from '@angular/common/http'; +import { HttpClient, HttpParams } from '@angular/common/http'; import { Injectable } from '@angular/core'; import { UpdateMarketDataDto } from '@ghostfolio/api/app/admin/update-market-data.dto'; import { IDataProviderHistoricalResponse } from '@ghostfolio/api/services/interfaces/interfaces'; @@ -9,6 +9,7 @@ import { UniqueAsset } from '@ghostfolio/common/interfaces'; import { DataSource, MarketData } from '@prisma/client'; +import { JobStatus } from 'bull'; import { format, parseISO } from 'date-fns'; import { Observable, map } from 'rxjs'; @@ -18,6 +19,22 @@ import { Observable, map } from 'rxjs'; export class AdminService { public constructor(private http: HttpClient) {} + public deleteJob(aId: string) { + return this.http.delete(`/api/v1/admin/queue/job/${aId}`); + } + + public deleteJobs({ status }: { status: JobStatus[] }) { + let params = new HttpParams(); + + if (status?.length > 0) { + params = params.append('status', status.join(',')); + } + + return this.http.delete('/api/v1/admin/queue/job', { + params + }); + } + public deleteProfileData({ dataSource, symbol }: UniqueAsset) { return this.http.delete( `/api/v1/admin/profile-data/${dataSource}/${symbol}` @@ -43,16 +60,28 @@ export class AdminService { ); } - public fetchJobs() { - return this.http.get(`/api/v1/admin/queue/jobs`); + public fetchJobs({ status }: { status?: JobStatus[] }) { + let params = new HttpParams(); + + if (status?.length > 0) { + params = params.append('status', status.join(',')); + } + + return this.http.get('/api/v1/admin/queue/job', { + params + }); + } + + public gather7Days() { + return this.http.post('/api/v1/admin/gather', {}); } public gatherMax() { - return this.http.post(`/api/v1/admin/gather/max`, {}); + return this.http.post('/api/v1/admin/gather/max', {}); } public gatherProfileData() { - return this.http.post(`/api/v1/admin/gather/profile-data`, {}); + return this.http.post('/api/v1/admin/gather/profile-data', {}); } public gatherProfileDataBySymbol({ dataSource, symbol }: UniqueAsset) { diff --git a/libs/common/src/lib/config.ts b/libs/common/src/lib/config.ts index 410d0498f..e8fc6561b 100644 --- a/libs/common/src/lib/config.ts +++ b/libs/common/src/lib/config.ts @@ -1,4 +1,6 @@ import { DataSource } from '@prisma/client'; +import { JobOptions, JobStatus } from 'bull'; +import ms from 'ms'; import { ToggleOption } from './types'; @@ -43,19 +45,52 @@ export const warnColorRgb = { export const ASSET_SUB_CLASS_EMERGENCY_FUND = 'EMERGENCY_FUND'; export const DATA_GATHERING_QUEUE = 'DATA_GATHERING_QUEUE'; +export const DATA_GATHERING_QUEUE_PRIORITY_LOW = Number.MAX_SAFE_INTEGER; +export const DATA_GATHERING_QUEUE_PRIORITY_HIGH = 1; export const DEFAULT_DATE_FORMAT_MONTH_YEAR = 'MMM yyyy'; export const GATHER_ASSET_PROFILE_PROCESS = 'GATHER_ASSET_PROFILE'; +export const GATHER_ASSET_PROFILE_PROCESS_OPTIONS: JobOptions = { + attempts: 20, + backoff: { + delay: ms('1 minute'), + type: 'exponential' + }, + priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH, + removeOnComplete: { + age: ms('2 weeks') / 1000 + } +}; +export const GATHER_HISTORICAL_MARKET_DATA_PROCESS = + 'GATHER_HISTORICAL_MARKET_DATA'; +export const GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS: JobOptions = { + attempts: 20, + backoff: { + delay: ms('1 minute'), + type: 'exponential' + }, + priority: DATA_GATHERING_QUEUE_PRIORITY_LOW, + removeOnComplete: { + age: ms('2 weeks') / 1000 + } +}; export const PROPERTY_BENCHMARKS = 'BENCHMARKS'; export const PROPERTY_COUPONS = 'COUPONS'; export const PROPERTY_CURRENCIES = 'CURRENCIES'; export const PROPERTY_IS_READ_ONLY_MODE = 'IS_READ_ONLY_MODE'; -export const PROPERTY_LAST_DATA_GATHERING = 'LAST_DATA_GATHERING'; -export const PROPERTY_LOCKED_DATA_GATHERING = 'LOCKED_DATA_GATHERING'; export const PROPERTY_SLACK_COMMUNITY_USERS = 'SLACK_COMMUNITY_USERS'; export const PROPERTY_STRIPE_CONFIG = 'STRIPE_CONFIG'; export const PROPERTY_SYSTEM_MESSAGE = 'SYSTEM_MESSAGE'; +export const QUEUE_JOB_STATUS_LIST = [ + 'active', + 'completed', + 'delayed', + 'failed', + 'paused', + 'waiting' +]; + export const UNKNOWN_KEY = 'UNKNOWN'; diff --git a/libs/common/src/lib/interfaces/admin-data.interface.ts b/libs/common/src/lib/interfaces/admin-data.interface.ts index ce90dccc5..1e0c6e532 100644 --- a/libs/common/src/lib/interfaces/admin-data.interface.ts +++ b/libs/common/src/lib/interfaces/admin-data.interface.ts @@ -1,7 +1,5 @@ export interface AdminData { - dataGatheringProgress?: number; exchangeRates: { label1: string; label2: string; value: number }[]; - lastDataGathering?: Date | 'IN_PROGRESS'; settings: { [key: string]: boolean | object | string | string[] }; transactionCount: number; userCount: number; diff --git a/libs/common/src/lib/interfaces/admin-jobs.interface.ts b/libs/common/src/lib/interfaces/admin-jobs.interface.ts index 5379eec69..25e937626 100644 --- a/libs/common/src/lib/interfaces/admin-jobs.interface.ts +++ b/libs/common/src/lib/interfaces/admin-jobs.interface.ts @@ -1,5 +1,16 @@ -import { Job } from 'bull'; +import { Job, JobStatus } from 'bull'; export interface AdminJobs { - jobs: Job[]; + jobs: (Pick< + Job, + | 'attemptsMade' + | 'data' + | 'finishedOn' + | 'id' + | 'name' + | 'stacktrace' + | 'timestamp' + > & { + state: JobStatus | 'stuck'; + })[]; }
## TypeData Source SymbolData SourceAttempts Created Finished Status
{{ job.id }}{{ job.name }}{{ job.data?.dataSource }}{{ job.id }} + + + + Asset Profile + + + Historical Market Data + + + {{ job.data?.symbol }}{{ job.data?.dataSource }} + {{ job.attemptsMade }} + {{ job.timestamp | date: defaultDateTimeFormat }} + - - - - + + + + +