diff --git a/apps/api/src/app/admin/admin.controller.ts b/apps/api/src/app/admin/admin.controller.ts index 3b10bb0f0..b13ba5b5e 100644 --- a/apps/api/src/app/admin/admin.controller.ts +++ b/apps/api/src/app/admin/admin.controller.ts @@ -2,9 +2,8 @@ import { DataGatheringService } from '@ghostfolio/api/services/data-gathering.se import { MarketDataService } from '@ghostfolio/api/services/market-data.service'; import { PropertyDto } from '@ghostfolio/api/services/property/property.dto'; import { - DATA_GATHERING_QUEUE, - DATA_GATHERING_QUEUE_PRIORITY_HIGH, - GATHER_ASSET_PROFILE_PROCESS + GATHER_ASSET_PROFILE_PROCESS, + GATHER_ASSET_PROFILE_PROCESS_OPTIONS } from '@ghostfolio/common/config'; import { AdminData, @@ -13,7 +12,6 @@ import { } from '@ghostfolio/common/interfaces'; import { hasPermission, permissions } from '@ghostfolio/common/permissions'; import type { RequestWithUser } from '@ghostfolio/common/types'; -import { InjectQueue } from '@nestjs/bull'; import { Body, Controller, @@ -29,10 +27,8 @@ import { import { REQUEST } from '@nestjs/core'; import { AuthGuard } from '@nestjs/passport'; import { DataSource, MarketData } from '@prisma/client'; -import { Queue } from 'bull'; import { isDate } from 'date-fns'; import { StatusCodes, getReasonPhrase } from 'http-status-codes'; -import ms from 'ms'; import { AdminService } from './admin.service'; import { UpdateMarketDataDto } from './update-market-data.dto'; @@ -41,8 +37,6 @@ import { UpdateMarketDataDto } from './update-market-data.dto'; export class AdminController { public constructor( private readonly adminService: AdminService, - @InjectQueue(DATA_GATHERING_QUEUE) - private readonly dataGatheringQueue: Queue, private readonly dataGatheringService: DataGatheringService, private readonly marketDataService: MarketDataService, @Inject(REQUEST) private readonly request: RequestWithUser @@ -102,20 +96,13 @@ export class AdminController { const uniqueAssets = await this.dataGatheringService.getUniqueAssets(); for (const { dataSource, symbol } of uniqueAssets) { - await this.dataGatheringQueue.add( + await this.dataGatheringService.addJobToQueue( GATHER_ASSET_PROFILE_PROCESS, { dataSource, symbol }, - { - attempts: 20, - backoff: { - delay: ms('1 minute'), - type: 'exponential' - }, - priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH - } + GATHER_ASSET_PROFILE_PROCESS_OPTIONS ); } @@ -140,20 +127,13 @@ export class AdminController { const uniqueAssets = await this.dataGatheringService.getUniqueAssets(); for (const { dataSource, symbol } of uniqueAssets) { - await this.dataGatheringQueue.add( + await this.dataGatheringService.addJobToQueue( GATHER_ASSET_PROFILE_PROCESS, { dataSource, symbol }, - { - attempts: 20, - backoff: { - delay: ms('1 minute'), - type: 'exponential' - }, - priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH - } + GATHER_ASSET_PROFILE_PROCESS_OPTIONS ); } } @@ -176,20 +156,13 @@ export class AdminController { ); } - await this.dataGatheringQueue.add( + await this.dataGatheringService.addJobToQueue( GATHER_ASSET_PROFILE_PROCESS, { dataSource, symbol }, - { - attempts: 20, - backoff: { - delay: ms('1 minute'), - type: 'exponential' - }, - priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH - } + GATHER_ASSET_PROFILE_PROCESS_OPTIONS ); } diff --git a/apps/api/src/app/order/order.service.ts b/apps/api/src/app/order/order.service.ts index 26a067a15..0be1523c1 100644 --- a/apps/api/src/app/order/order.service.ts +++ b/apps/api/src/app/order/order.service.ts @@ -1,17 +1,14 @@ import { AccountService } from '@ghostfolio/api/app/account/account.service'; -import { CacheService } from '@ghostfolio/api/app/cache/cache.service'; import { DataGatheringService } from '@ghostfolio/api/services/data-gathering.service'; import { ExchangeRateDataService } from '@ghostfolio/api/services/exchange-rate-data.service'; import { PrismaService } from '@ghostfolio/api/services/prisma.service'; import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile.service'; import { - DATA_GATHERING_QUEUE, - DATA_GATHERING_QUEUE_PRIORITY_HIGH, - GATHER_ASSET_PROFILE_PROCESS + GATHER_ASSET_PROFILE_PROCESS, + GATHER_ASSET_PROFILE_PROCESS_OPTIONS } from '@ghostfolio/common/config'; import { Filter } from '@ghostfolio/common/interfaces'; import { OrderWithAccount } from '@ghostfolio/common/types'; -import { InjectQueue } from '@nestjs/bull'; import { Injectable } from '@nestjs/common'; import { AssetClass, @@ -22,10 +19,8 @@ import { Type as TypeOfOrder } from '@prisma/client'; import Big from 'big.js'; -import { Queue } from 'bull'; import { endOfToday, isAfter } from 'date-fns'; import { groupBy } from 'lodash'; -import ms from 'ms'; import { v4 as uuidv4 } from 'uuid'; import { Activity } from './interfaces/activities.interface'; @@ -34,11 +29,8 @@ import { Activity } from './interfaces/activities.interface'; export class OrderService { public constructor( private readonly accountService: AccountService, - private readonly cacheService: CacheService, - @InjectQueue(DATA_GATHERING_QUEUE) - private readonly dataGatheringQueue: Queue, - private readonly exchangeRateDataService: ExchangeRateDataService, private readonly dataGatheringService: DataGatheringService, + private readonly exchangeRateDataService: ExchangeRateDataService, private readonly prismaService: PrismaService, private readonly symbolProfileService: SymbolProfileService ) {} @@ -122,20 +114,13 @@ export class OrderService { data.SymbolProfile.connectOrCreate.create.symbol.toUpperCase(); } - await this.dataGatheringQueue.add( + await this.dataGatheringService.addJobToQueue( GATHER_ASSET_PROFILE_PROCESS, { dataSource: data.SymbolProfile.connectOrCreate.create.dataSource, symbol: data.SymbolProfile.connectOrCreate.create.symbol }, - { - attempts: 20, - backoff: { - delay: ms('1 minute'), - type: 'exponential' - }, - priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH - } + GATHER_ASSET_PROFILE_PROCESS_OPTIONS ); const isDraft = isAfter(data.date as Date, endOfToday()); diff --git a/apps/api/src/services/cron.service.ts b/apps/api/src/services/cron.service.ts index 4aaef3dbe..727ff0998 100644 --- a/apps/api/src/services/cron.service.ts +++ b/apps/api/src/services/cron.service.ts @@ -1,13 +1,9 @@ import { - DATA_GATHERING_QUEUE, - DATA_GATHERING_QUEUE_PRIORITY_HIGH, - GATHER_ASSET_PROFILE_PROCESS + GATHER_ASSET_PROFILE_PROCESS, + GATHER_ASSET_PROFILE_PROCESS_OPTIONS } from '@ghostfolio/common/config'; -import { InjectQueue } from '@nestjs/bull'; import { Injectable } from '@nestjs/common'; import { Cron, CronExpression } from '@nestjs/schedule'; -import { Queue } from 'bull'; -import ms from 'ms'; import { DataGatheringService } from './data-gathering.service'; import { ExchangeRateDataService } from './exchange-rate-data.service'; @@ -16,8 +12,6 @@ import { TwitterBotService } from './twitter-bot/twitter-bot.service'; @Injectable() export class CronService { public constructor( - @InjectQueue(DATA_GATHERING_QUEUE) - private readonly dataGatheringQueue: Queue, private readonly dataGatheringService: DataGatheringService, private readonly exchangeRateDataService: ExchangeRateDataService, private readonly twitterBotService: TwitterBotService @@ -43,20 +37,13 @@ export class CronService { const uniqueAssets = await this.dataGatheringService.getUniqueAssets(); for (const { dataSource, symbol } of uniqueAssets) { - await this.dataGatheringQueue.add( + await this.dataGatheringService.addJobToQueue( GATHER_ASSET_PROFILE_PROCESS, { dataSource, symbol }, - { - attempts: 20, - backoff: { - delay: ms('1 minute'), - type: 'exponential' - }, - priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH - } + GATHER_ASSET_PROFILE_PROCESS_OPTIONS ); } } diff --git a/apps/api/src/services/data-gathering.module.ts b/apps/api/src/services/data-gathering.module.ts index 2a179878b..7e6fdc029 100644 --- a/apps/api/src/services/data-gathering.module.ts +++ b/apps/api/src/services/data-gathering.module.ts @@ -16,7 +16,7 @@ import { SymbolProfileModule } from './symbol-profile.module'; imports: [ BullModule.registerQueue({ limiter: { - duration: ms('1 second'), + duration: ms('5 seconds'), max: 1 }, name: DATA_GATHERING_QUEUE diff --git a/apps/api/src/services/data-gathering.processor.ts b/apps/api/src/services/data-gathering.processor.ts index 37b3ec325..b973544fb 100644 --- a/apps/api/src/services/data-gathering.processor.ts +++ b/apps/api/src/services/data-gathering.processor.ts @@ -99,14 +99,6 @@ export class DataGatheringProcessor { } }); } catch {} - } else { - Logger.warn( - `Failed to gather data for symbol ${symbol} from ${dataSource} at ${format( - currentDate, - DATE_FORMAT - )}.`, - `DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS})` - ); } // Count month one up for iteration @@ -119,6 +111,11 @@ export class DataGatheringProcessor { ) ); } + + Logger.log( + `Historical market data gathering has been completed for ${symbol} (${dataSource}).`, + `DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS})` + ); } catch (error) { Logger.error( error, diff --git a/apps/api/src/services/data-gathering.service.ts b/apps/api/src/services/data-gathering.service.ts index 50ce2e892..5bc669e6e 100644 --- a/apps/api/src/services/data-gathering.service.ts +++ b/apps/api/src/services/data-gathering.service.ts @@ -1,17 +1,17 @@ import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile.service'; import { DATA_GATHERING_QUEUE, - DATA_GATHERING_QUEUE_PRIORITY_LOW, - GATHER_HISTORICAL_MARKET_DATA_PROCESS + GATHER_HISTORICAL_MARKET_DATA_PROCESS, + GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS, + QUEUE_JOB_STATUS_LIST } from '@ghostfolio/common/config'; import { DATE_FORMAT, resetHours } from '@ghostfolio/common/helper'; import { UniqueAsset } from '@ghostfolio/common/interfaces'; import { InjectQueue } from '@nestjs/bull'; import { Inject, Injectable, Logger } from '@nestjs/common'; import { DataSource } from '@prisma/client'; -import { Queue } from 'bull'; +import { JobOptions, Queue } from 'bull'; import { format, subDays } from 'date-fns'; -import ms from 'ms'; import { DataProviderService } from './data-provider/data-provider.service'; import { DataEnhancerInterface } from './data-provider/interfaces/data-enhancer.interface'; @@ -21,8 +21,6 @@ import { PrismaService } from './prisma.service'; @Injectable() export class DataGatheringService { - private dataGatheringProgress: number; - public constructor( @Inject('DataEnhancers') private readonly dataEnhancers: DataEnhancerInterface[], @@ -34,6 +32,19 @@ export class DataGatheringService { private readonly symbolProfileService: SymbolProfileService ) {} + public async addJobToQueue(name: string, data: any, options?: JobOptions) { + const hasJob = await this.hasJob(name, data); + + if (hasJob) { + Logger.log( + `Job ${name} with data ${JSON.stringify(data)} already exists.`, + 'DataGatheringService' + ); + } else { + return this.dataGatheringQueue.add(name, data, options); + } + } + public async gather7Days() { const dataGatheringItems = await this.getSymbols7D(); await this.gatherSymbols(dataGatheringItems); @@ -101,15 +112,6 @@ export class DataGatheringService { uniqueAssets = await this.getUniqueAssets(); } - Logger.log( - `Asset profile data gathering has been started for ${uniqueAssets - .map(({ dataSource, symbol }) => { - return `${symbol} (${dataSource})`; - }) - .join(',')}.`, - 'DataGatheringService' - ); - const assetProfiles = await this.dataProviderService.getAssetProfiles( uniqueAssets ); @@ -205,21 +207,14 @@ export class DataGatheringService { continue; } - await this.dataGatheringQueue.add( + await this.addJobToQueue( GATHER_HISTORICAL_MARKET_DATA_PROCESS, { dataSource, date, symbol }, - { - attempts: 20, - backoff: { - delay: ms('1 minute'), - type: 'exponential' - }, - priority: DATA_GATHERING_QUEUE_PRIORITY_LOW - } + GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS ); } } @@ -354,4 +349,18 @@ export class DataGatheringService { return [...currencyPairsToGather, ...symbolProfilesToGather]; } + + private async hasJob(name: string, data: any) { + const jobs = await this.dataGatheringQueue.getJobs( + QUEUE_JOB_STATUS_LIST.filter((status) => { + return status !== 'completed'; + }) + ); + + return jobs.some((job) => { + return ( + job.name === name && JSON.stringify(job.data) === JSON.stringify(data) + ); + }); + } } diff --git a/apps/api/src/services/data-provider/alpha-vantage/alpha-vantage.service.ts b/apps/api/src/services/data-provider/alpha-vantage/alpha-vantage.service.ts index bff966fe3..41bd715b1 100644 --- a/apps/api/src/services/data-provider/alpha-vantage/alpha-vantage.service.ts +++ b/apps/api/src/services/data-provider/alpha-vantage/alpha-vantage.service.ts @@ -9,7 +9,7 @@ import { DATE_FORMAT } from '@ghostfolio/common/helper'; import { Granularity } from '@ghostfolio/common/types'; import { Injectable, Logger } from '@nestjs/common'; import { DataSource, SymbolProfile } from '@prisma/client'; -import { isAfter, isBefore, parse } from 'date-fns'; +import { format, isAfter, isBefore, parse } from 'date-fns'; import { IAlphaVantageHistoricalResponse } from './interfaces/interfaces'; @@ -76,9 +76,12 @@ export class AlphaVantageService implements DataProviderInterface { return response; } catch (error) { - Logger.error(error, 'AlphaVantageService'); - - return {}; + throw new Error( + `Could not get historical market data for ${aSymbol} (${this.getName()}) from ${format( + from, + DATE_FORMAT + )} to ${format(to, DATE_FORMAT)}: [${error.name}] ${error.message}` + ); } } diff --git a/apps/api/src/services/data-provider/eod-historical-data/eod-historical-data.service.ts b/apps/api/src/services/data-provider/eod-historical-data/eod-historical-data.service.ts index bb0401f00..1dd3c7aff 100644 --- a/apps/api/src/services/data-provider/eod-historical-data/eod-historical-data.service.ts +++ b/apps/api/src/services/data-provider/eod-historical-data/eod-historical-data.service.ts @@ -72,10 +72,13 @@ export class EodHistoricalDataService implements DataProviderInterface { { [aSymbol]: {} } ); } catch (error) { - Logger.error(error, 'EodHistoricalDataService'); + throw new Error( + `Could not get historical market data for ${aSymbol} (${this.getName()}) from ${format( + from, + DATE_FORMAT + )} to ${format(to, DATE_FORMAT)}: [${error.name}] ${error.message}` + ); } - - return {}; } public getName(): DataSource { diff --git a/apps/api/src/services/data-provider/ghostfolio-scraper-api/ghostfolio-scraper-api.service.ts b/apps/api/src/services/data-provider/ghostfolio-scraper-api/ghostfolio-scraper-api.service.ts index 7186ea7ec..373922fe1 100644 --- a/apps/api/src/services/data-provider/ghostfolio-scraper-api/ghostfolio-scraper-api.service.ts +++ b/apps/api/src/services/data-provider/ghostfolio-scraper-api/ghostfolio-scraper-api.service.ts @@ -87,10 +87,13 @@ export class GhostfolioScraperApiService implements DataProviderInterface { } }; } catch (error) { - Logger.error(error, 'GhostfolioScraperApiService'); + throw new Error( + `Could not get historical market data for ${aSymbol} (${this.getName()}) from ${format( + from, + DATE_FORMAT + )} to ${format(to, DATE_FORMAT)}: [${error.name}] ${error.message}` + ); } - - return {}; } public getName(): DataSource { diff --git a/apps/api/src/services/data-provider/google-sheets/google-sheets.service.ts b/apps/api/src/services/data-provider/google-sheets/google-sheets.service.ts index b196df532..cc6af5241 100644 --- a/apps/api/src/services/data-provider/google-sheets/google-sheets.service.ts +++ b/apps/api/src/services/data-provider/google-sheets/google-sheets.service.ts @@ -71,10 +71,13 @@ export class GoogleSheetsService implements DataProviderInterface { [symbol]: historicalData }; } catch (error) { - Logger.error(error, 'GoogleSheetsService'); + throw new Error( + `Could not get historical market data for ${aSymbol} (${this.getName()}) from ${format( + from, + DATE_FORMAT + )} to ${format(to, DATE_FORMAT)}: [${error.name}] ${error.message}` + ); } - - return {}; } public getName(): DataSource { diff --git a/apps/api/src/services/data-provider/rakuten-rapid-api/rakuten-rapid-api.service.ts b/apps/api/src/services/data-provider/rakuten-rapid-api/rakuten-rapid-api.service.ts index 2a516c5ef..7d92ae354 100644 --- a/apps/api/src/services/data-provider/rakuten-rapid-api/rakuten-rapid-api.service.ts +++ b/apps/api/src/services/data-provider/rakuten-rapid-api/rakuten-rapid-api.service.ts @@ -90,7 +90,14 @@ export class RakutenRapidApiService implements DataProviderInterface { } }; } - } catch (error) {} + } catch (error) { + throw new Error( + `Could not get historical market data for ${aSymbol} (${this.getName()}) from ${format( + from, + DATE_FORMAT + )} to ${format(to, DATE_FORMAT)}: [${error.name}] ${error.message}` + ); + } return {}; } diff --git a/apps/api/src/services/data-provider/yahoo-finance/yahoo-finance.service.ts b/apps/api/src/services/data-provider/yahoo-finance/yahoo-finance.service.ts index f277d9ac9..81f8c43fc 100644 --- a/apps/api/src/services/data-provider/yahoo-finance/yahoo-finance.service.ts +++ b/apps/api/src/services/data-provider/yahoo-finance/yahoo-finance.service.ts @@ -131,7 +131,13 @@ export class YahooFinanceService implements DataProviderInterface { if (url) { response.url = url; } - } catch {} + } catch (error) { + throw new Error( + `Could not get asset profile for ${aSymbol} (${this.getName()}): [${ + error.name + }] ${error.message}` + ); + } return response; } @@ -185,12 +191,12 @@ export class YahooFinanceService implements DataProviderInterface { return response; } catch (error) { - Logger.warn( - `Skipping yahooFinance2.getHistorical("${aSymbol}"): [${error.name}] ${error.message}`, - 'YahooFinanceService' + throw new Error( + `Could not get historical market data for ${aSymbol} (${this.getName()}) from ${format( + from, + DATE_FORMAT + )} to ${format(to, DATE_FORMAT)}: [${error.name}] ${error.message}` ); - - return {}; } } diff --git a/apps/client/src/app/components/admin-jobs/admin-jobs.component.ts b/apps/client/src/app/components/admin-jobs/admin-jobs.component.ts index 3cd0d2d04..aba2fd29b 100644 --- a/apps/client/src/app/components/admin-jobs/admin-jobs.component.ts +++ b/apps/client/src/app/components/admin-jobs/admin-jobs.component.ts @@ -80,11 +80,12 @@ export class AdminJobsComponent implements OnDestroy, OnInit { } public onDeleteJobs() { + const currentFilter = this.filterForm.get('status').value; + this.adminService - .deleteJobs({}) + .deleteJobs({ status: currentFilter ? [currentFilter] : undefined }) .pipe(takeUntil(this.unsubscribeSubject)) .subscribe(() => { - const currentFilter = this.filterForm.get('status').value; this.fetchJobs(currentFilter ? [currentFilter] : undefined); }); } diff --git a/apps/client/src/app/components/admin-jobs/admin-jobs.html b/apps/client/src/app/components/admin-jobs/admin-jobs.html index e660b97a2..6c48b9576 100644 --- a/apps/client/src/app/components/admin-jobs/admin-jobs.html +++ b/apps/client/src/app/components/admin-jobs/admin-jobs.html @@ -79,6 +79,7 @@ View Stacktrace diff --git a/apps/client/src/app/components/admin-overview/admin-overview.html b/apps/client/src/app/components/admin-overview/admin-overview.html index 76cab6c86..d510582c3 100644 --- a/apps/client/src/app/components/admin-overview/admin-overview.html +++ b/apps/client/src/app/components/admin-overview/admin-overview.html @@ -30,7 +30,7 @@ class="mr-1" name="cloud-download-outline" > - Start Data Gathering + Gather Recent Data
diff --git a/apps/client/src/app/services/admin.service.ts b/apps/client/src/app/services/admin.service.ts index 4c520e6e1..610be4147 100644 --- a/apps/client/src/app/services/admin.service.ts +++ b/apps/client/src/app/services/admin.service.ts @@ -23,7 +23,7 @@ export class AdminService { return this.http.delete(`/api/v1/admin/queue/job/${aId}`); } - public deleteJobs({ status }: { status?: JobStatus[] }) { + public deleteJobs({ status }: { status: JobStatus[] }) { let params = new HttpParams(); if (status?.length > 0) { diff --git a/libs/common/src/lib/config.ts b/libs/common/src/lib/config.ts index 63c2dd4cb..e8fc6561b 100644 --- a/libs/common/src/lib/config.ts +++ b/libs/common/src/lib/config.ts @@ -1,5 +1,6 @@ import { DataSource } from '@prisma/client'; -import { JobStatus } from 'bull'; +import { JobOptions, JobStatus } from 'bull'; +import ms from 'ms'; import { ToggleOption } from './types'; @@ -50,8 +51,30 @@ export const DATA_GATHERING_QUEUE_PRIORITY_HIGH = 1; export const DEFAULT_DATE_FORMAT_MONTH_YEAR = 'MMM yyyy'; export const GATHER_ASSET_PROFILE_PROCESS = 'GATHER_ASSET_PROFILE'; +export const GATHER_ASSET_PROFILE_PROCESS_OPTIONS: JobOptions = { + attempts: 20, + backoff: { + delay: ms('1 minute'), + type: 'exponential' + }, + priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH, + removeOnComplete: { + age: ms('2 weeks') / 1000 + } +}; export const GATHER_HISTORICAL_MARKET_DATA_PROCESS = 'GATHER_HISTORICAL_MARKET_DATA'; +export const GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS: JobOptions = { + attempts: 20, + backoff: { + delay: ms('1 minute'), + type: 'exponential' + }, + priority: DATA_GATHERING_QUEUE_PRIORITY_LOW, + removeOnComplete: { + age: ms('2 weeks') / 1000 + } +}; export const PROPERTY_BENCHMARKS = 'BENCHMARKS'; export const PROPERTY_COUPONS = 'COUPONS';