From b0cfb2b6fd4c90bf4bd6062489a234ac18829151 Mon Sep 17 00:00:00 2001 From: Dan Date: Sat, 12 Oct 2024 17:08:41 +0200 Subject: [PATCH] Add Datagathering process for missing values only --- apps/api/src/app/admin/admin.controller.ts | 17 +- .../data-gathering.processor.ts | 159 +++++++++++++++++- .../data-gathering/data-gathering.service.ts | 52 +++++- .../asset-profile-dialog.component.ts | 10 ++ .../asset-profile-dialog.html | 13 ++ apps/client/src/app/services/admin.service.ts | 16 ++ libs/common/src/lib/config.ts | 12 ++ 7 files changed, 271 insertions(+), 8 deletions(-) diff --git a/apps/api/src/app/admin/admin.controller.ts b/apps/api/src/app/admin/admin.controller.ts index 0cf8d78bd..2c469612e 100644 --- a/apps/api/src/app/admin/admin.controller.ts +++ b/apps/api/src/app/admin/admin.controller.ts @@ -158,7 +158,22 @@ export class AdminController { @Param('dataSource') dataSource: DataSource, @Param('symbol') symbol: string ): Promise { - this.dataGatheringService.gatherSymbol({ dataSource, symbol }); + await this.dataGatheringService.gatherSymbol({ dataSource, symbol }); + + return; + } + + @Post('gatherMissing/:dataSource/:symbol') + @UseGuards(AuthGuard('jwt'), HasPermissionGuard) + @HasPermission(permissions.accessAdminControl) + public async gatherSymbolMissingOnly( + @Param('dataSource') dataSource: DataSource, + @Param('symbol') symbol: string + ): Promise { + await this.dataGatheringService.gatherSymbolMissingOnly({ + dataSource, + symbol + }); return; } diff --git a/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts b/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts index 2745aa288..d925b2294 100644 --- a/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts +++ b/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts @@ -1,20 +1,25 @@ import { DataProviderService } from '@ghostfolio/api/services/data-provider/data-provider.service'; -import { IDataGatheringItem } from '@ghostfolio/api/services/interfaces/interfaces'; +import { + IDataGatheringItem, + IDataProviderHistoricalResponse +} from '@ghostfolio/api/services/interfaces/interfaces'; import { MarketDataService } from '@ghostfolio/api/services/market-data/market-data.service'; import { DATA_GATHERING_QUEUE, DEFAULT_PROCESSOR_CONCURRENCY_GATHER_ASSET_PROFILE, DEFAULT_PROCESSOR_CONCURRENCY_GATHER_HISTORICAL_MARKET_DATA, GATHER_ASSET_PROFILE_PROCESS, - GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME + GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME, + GATHER_MISSING_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME } from '@ghostfolio/common/config'; import { DATE_FORMAT, getStartOfUtcDate } from '@ghostfolio/common/helper'; import { AssetProfileIdentifier } from '@ghostfolio/common/interfaces'; import { Process, Processor } from '@nestjs/bull'; import { Injectable, Logger } from '@nestjs/common'; -import { Prisma } from '@prisma/client'; +import { DataSource, Prisma } from '@prisma/client'; import { Job } from 'bull'; +import { isNumber } from 'class-validator'; import { addDays, format, @@ -22,7 +27,9 @@ import { getMonth, getYear, isBefore, - parseISO + parseISO, + eachDayOfInterval, + isEqual } from 'date-fns'; import { DataGatheringService } from './data-gathering.service'; @@ -150,4 +157,148 @@ export class DataGatheringProcessor { throw new Error(error); } } + @Process({ + concurrency: parseInt( + process.env.PROCESSOR_CONCURRENCY_GATHER_HISTORICAL_MARKET_DATA ?? + DEFAULT_PROCESSOR_CONCURRENCY_GATHER_HISTORICAL_MARKET_DATA.toString(), + 10 + ), + name: GATHER_MISSING_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME + }) + public async gatherMissingHistoricalMarketData(job: Job) { + try { + const { dataSource, date, symbol } = job.data; + + Logger.log( + `Historical market data gathering for missing values has been started for ${symbol} (${dataSource}) at ${format( + date, + DATE_FORMAT + )}`, + `DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME})` + ); + const entries = await this.marketDataService.marketDataItems({ + where: { + AND: { + symbol: { + equals: symbol + }, + dataSource: { + equals: dataSource + } + } + }, + orderBy: { + date: 'asc' + }, + take: 1 + }); + const firstEntry = entries[0]; + const marketData = await this.marketDataService + .getRange({ + assetProfileIdentifiers: [{ dataSource, symbol }], + dateQuery: { + gte: addDays(firstEntry.date, -10) + } + }) + .then((md) => md.map((m) => m.date)); + + let dates = eachDayOfInterval( + { + start: firstEntry.date, + end: new Date() + }, + { + step: 1 + } + ); + dates = dates.filter((d) => !marketData.some((md) => isEqual(md,d))); + + const historicalData = await this.dataProviderService.getHistoricalRaw({ + dataGatheringItems: [{ dataSource, symbol }], + from: firstEntry.date, + to: new Date() + }); + + const data: Prisma.MarketDataUpdateInput[] = + this.mapToMarketUpsertDataInputs( + dates, + historicalData, + symbol, + dataSource + ); + + await this.marketDataService.updateMany({ data }); + + Logger.log( + `Historical market data gathering for missing values has been completed for ${symbol} (${dataSource}) at ${format( + date, + DATE_FORMAT + )}`, + `DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME})` + ); + } catch (error) { + Logger.error( + error, + `DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME})` + ); + + throw new Error(error); + } + } + + private mapToMarketUpsertDataInputs( + missingMarketData: Date[], + historicalData: Record< + string, + Record + >, + symbol: string, + dataSource: DataSource + ): Prisma.MarketDataUpdateInput[] { + return missingMarketData.map((date) => { + if ( + isNumber( + historicalData[symbol]?.[format(date, DATE_FORMAT)]?.marketPrice + ) + ) { + return { + date, + symbol, + dataSource, + marketPrice: + historicalData[symbol]?.[format(date, DATE_FORMAT)]?.marketPrice + }; + } else { + let earlierDate = date; + let index = 0; + while ( + !isNumber( + historicalData[symbol]?.[format(earlierDate, DATE_FORMAT)] + ?.marketPrice + ) + ) { + earlierDate = addDays(earlierDate, -1); + index++; + if (index > 10) { + break; + } + } + if ( + isNumber( + historicalData[symbol]?.[format(earlierDate, DATE_FORMAT)] + ?.marketPrice + ) + ) { + return { + date, + symbol, + dataSource, + marketPrice: + historicalData[symbol]?.[format(earlierDate, DATE_FORMAT)] + ?.marketPrice + }; + } + } + }); + } } diff --git a/apps/api/src/services/queues/data-gathering/data-gathering.service.ts b/apps/api/src/services/queues/data-gathering/data-gathering.service.ts index a66e05b72..24b174785 100644 --- a/apps/api/src/services/queues/data-gathering/data-gathering.service.ts +++ b/apps/api/src/services/queues/data-gathering/data-gathering.service.ts @@ -13,6 +13,8 @@ import { DATA_GATHERING_QUEUE_PRIORITY_MEDIUM, GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME, GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_OPTIONS, + GATHER_MISSING_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME, + GATHER_MISSING_HISTORICAL_MARKET_DATA_PROCESS_JOB_OPTIONS, PROPERTY_BENCHMARKS } from '@ghostfolio/common/config'; import { @@ -28,7 +30,6 @@ import { import { InjectQueue } from '@nestjs/bull'; import { Inject, Injectable, Logger } from '@nestjs/common'; import { DataSource } from '@prisma/client'; -import AwaitLock from 'await-lock'; import { JobOptions, Queue } from 'bull'; import { format, min, subDays, subYears } from 'date-fns'; import { isEmpty } from 'lodash'; @@ -48,8 +49,6 @@ export class DataGatheringService { private readonly symbolProfileService: SymbolProfileService ) {} - lock = new AwaitLock(); - public async addJobToQueue({ data, name, @@ -114,6 +113,24 @@ export class DataGatheringService { }); } + public async gatherSymbolMissingOnly({ + dataSource, + symbol + }: AssetProfileIdentifier) { + const dataGatheringItems = (await this.getSymbolsMax()).filter( + (dataGatheringItem) => { + return ( + dataGatheringItem.dataSource === dataSource && + dataGatheringItem.symbol === symbol + ); + } + ); + await this.gatherMissingDataSymbols({ + dataGatheringItems, + priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH + }); + } + public async gatherSymbolForDate({ dataSource, date, @@ -296,6 +313,35 @@ export class DataGatheringService { ); } + public async gatherMissingDataSymbols({ + dataGatheringItems, + priority + }: { + dataGatheringItems: IDataGatheringItem[]; + priority: number; + }) { + await this.addJobsToQueue( + dataGatheringItems.map(({ dataSource, date, symbol }) => { + return { + data: { + dataSource, + date, + symbol + }, + name: GATHER_MISSING_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME, + opts: { + ...GATHER_MISSING_HISTORICAL_MARKET_DATA_PROCESS_JOB_OPTIONS, + priority, + jobId: `${getAssetProfileIdentifier({ + dataSource, + symbol + })}-missing-${format(date, DATE_FORMAT)}` + } + }; + }) + ); + } + public async getAllAssetProfileIdentifiers(): Promise< AssetProfileIdentifier[] > { diff --git a/apps/client/src/app/components/admin-market-data/asset-profile-dialog/asset-profile-dialog.component.ts b/apps/client/src/app/components/admin-market-data/asset-profile-dialog/asset-profile-dialog.component.ts index d79ba44f3..4eb65a70d 100644 --- a/apps/client/src/app/components/admin-market-data/asset-profile-dialog/asset-profile-dialog.component.ts +++ b/apps/client/src/app/components/admin-market-data/asset-profile-dialog/asset-profile-dialog.component.ts @@ -223,6 +223,16 @@ export class AssetProfileDialog implements OnDestroy, OnInit { .subscribe(() => {}); } + public onGatherSymbolMissingOnly({ + dataSource, + symbol + }: AssetProfileIdentifier) { + this.adminService + .gatherSymbolMissingOnly({ dataSource, symbol }) + .pipe(takeUntil(this.unsubscribeSubject)) + .subscribe(() => {}); + } + public onImportHistoricalData() { try { const marketData = csvToJson( diff --git a/apps/client/src/app/components/admin-market-data/asset-profile-dialog/asset-profile-dialog.html b/apps/client/src/app/components/admin-market-data/asset-profile-dialog/asset-profile-dialog.html index 76bce400b..6ea0a1a03 100644 --- a/apps/client/src/app/components/admin-market-data/asset-profile-dialog/asset-profile-dialog.html +++ b/apps/client/src/app/components/admin-market-data/asset-profile-dialog/asset-profile-dialog.html @@ -31,6 +31,19 @@ > Gather Historical Data +