diff --git a/CHANGELOG.md b/CHANGELOG.md index 248aaa6ac..15e89924f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - Improved the icon of the _View Holding_ menu item in the activities table +- Ensured atomic data replacement during historical market data gathering - Refreshed the cryptocurrencies list - Upgraded `ng-extract-i18n-merge` from version `3.0.0` to `3.1.0` diff --git a/apps/api/src/services/interfaces/interfaces.ts b/apps/api/src/services/interfaces/interfaces.ts index 7469754b5..492c2bd35 100644 --- a/apps/api/src/services/interfaces/interfaces.ts +++ b/apps/api/src/services/interfaces/interfaces.ts @@ -20,4 +20,5 @@ export interface DataProviderResponse { export interface DataGatheringItem extends AssetProfileIdentifier { date?: Date; + force?: boolean; } diff --git a/apps/api/src/services/market-data/market-data.service.ts b/apps/api/src/services/market-data/market-data.service.ts index 38ad61663..a757459e3 100644 --- a/apps/api/src/services/market-data/market-data.service.ts +++ b/apps/api/src/services/market-data/market-data.service.ts @@ -132,6 +132,61 @@ export class MarketDataService { }); } + /** + * Atomically replace market data for a symbol within a date range. + * Deletes existing data in the range and inserts new data within a single + * transaction to prevent data loss if the operation fails. + */ + public async replaceForSymbol({ + data, + dataSource, + symbol + }: AssetProfileIdentifier & { data: Prisma.MarketDataUpdateInput[] }) { + await this.prismaService.$transaction(async (prisma) => { + if (data.length > 0) { + let minTime = Infinity; + let maxTime = -Infinity; + + for (const item of data) { + const time = (item.date as Date).getTime(); + + if (time < minTime) { + minTime = time; + } + + if (time > maxTime) { + maxTime = time; + } + } + + const minDate = new Date(minTime); + const maxDate = new Date(maxTime); + + await prisma.marketData.deleteMany({ + where: { + dataSource, + symbol, + date: { + gte: minDate, + lte: maxDate + } + } + }); + + await prisma.marketData.createMany({ + data: data.map(({ date, marketPrice, state }) => ({ + dataSource, + symbol, + date: date as Date, + marketPrice: marketPrice as number, + state: state as MarketDataState + })), + skipDuplicates: true + }); + } + }); + } + public async updateAssetProfileIdentifier( oldAssetProfileIdentifier: AssetProfileIdentifier, newAssetProfileIdentifier: AssetProfileIdentifier diff --git a/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts b/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts index 1a172f3ea..1a4038652 100644 --- a/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts +++ b/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts @@ -100,7 +100,7 @@ export class DataGatheringProcessor { name: GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME }) public async gatherHistoricalMarketData(job: Job) { - const { dataSource, date, symbol } = job.data; + const { dataSource, date, force, symbol } = job.data; try { let currentDate = parseISO(date as unknown as string); @@ -109,7 +109,7 @@ export class DataGatheringProcessor { `Historical market data gathering has been started for ${symbol} (${dataSource}) at ${format( currentDate, DATE_FORMAT - )}`, + )}${force ? ' (forced update)' : ''}`, `DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME})` ); @@ -157,7 +157,15 @@ export class DataGatheringProcessor { currentDate = addDays(currentDate, 1); } - await this.marketDataService.updateMany({ data }); + if (force) { + await this.marketDataService.replaceForSymbol({ + data, + dataSource, + symbol + }); + } else { + await this.marketDataService.updateMany({ data }); + } Logger.log( `Historical market data gathering has been completed for ${symbol} (${dataSource}) at ${format( diff --git a/apps/api/src/services/queues/data-gathering/data-gathering.service.ts b/apps/api/src/services/queues/data-gathering/data-gathering.service.ts index 2d3ec45ad..c433f692f 100644 --- a/apps/api/src/services/queues/data-gathering/data-gathering.service.ts +++ b/apps/api/src/services/queues/data-gathering/data-gathering.service.ts @@ -2,7 +2,6 @@ import { DataProviderService } from '@ghostfolio/api/services/data-provider/data import { DataEnhancerInterface } from '@ghostfolio/api/services/data-provider/interfaces/data-enhancer.interface'; import { ExchangeRateDataService } from '@ghostfolio/api/services/exchange-rate-data/exchange-rate-data.service'; import { DataGatheringItem } from '@ghostfolio/api/services/interfaces/interfaces'; -import { MarketDataService } from '@ghostfolio/api/services/market-data/market-data.service'; import { PrismaService } from '@ghostfolio/api/services/prisma/prisma.service'; import { PropertyService } from '@ghostfolio/api/services/property/property.service'; import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile/symbol-profile.service'; @@ -41,7 +40,6 @@ export class DataGatheringService { private readonly dataGatheringQueue: Queue, private readonly dataProviderService: DataProviderService, private readonly exchangeRateDataService: ExchangeRateDataService, - private readonly marketDataService: MarketDataService, private readonly prismaService: PrismaService, private readonly propertyService: PropertyService, private readonly symbolProfileService: SymbolProfileService @@ -95,8 +93,6 @@ export class DataGatheringService { } public async gatherSymbol({ dataSource, date, symbol }: DataGatheringItem) { - await this.marketDataService.deleteMany({ dataSource, symbol }); - const dataGatheringItems = (await this.getSymbolsMax()) .filter((dataGatheringItem) => { return ( @@ -111,6 +107,7 @@ export class DataGatheringService { await this.gatherSymbols({ dataGatheringItems, + force: true, priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH }); } @@ -274,9 +271,11 @@ export class DataGatheringService { public async gatherSymbols({ dataGatheringItems, + force = false, priority }: { dataGatheringItems: DataGatheringItem[]; + force?: boolean; priority: number; }) { await this.addJobsToQueue( @@ -285,6 +284,7 @@ export class DataGatheringService { data: { dataSource, date, + force, symbol }, name: GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME,