diff --git a/CHANGELOG.md b/CHANGELOG.md index ad8f2f70e..954396b49 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - Ensured the locale is available in the settings dialog to customize the rule thresholds of the _X-ray_ page +- Ensured atomic data replacememt for historical market data fetching ## 2.211.0 - 2025-10-25 diff --git a/apps/api/src/services/interfaces/interfaces.ts b/apps/api/src/services/interfaces/interfaces.ts index 7469754b5..0c7cd31c0 100644 --- a/apps/api/src/services/interfaces/interfaces.ts +++ b/apps/api/src/services/interfaces/interfaces.ts @@ -20,4 +20,5 @@ export interface DataProviderResponse { export interface DataGatheringItem extends AssetProfileIdentifier { date?: Date; + replaceExistingData?: boolean; } diff --git a/apps/api/src/services/market-data/market-data.service.ts b/apps/api/src/services/market-data/market-data.service.ts index 38ad61663..e046bc8dd 100644 --- a/apps/api/src/services/market-data/market-data.service.ts +++ b/apps/api/src/services/market-data/market-data.service.ts @@ -205,4 +205,41 @@ export class MarketDataService { return this.prismaService.$transaction(upsertPromises); } + + /** + * Atomically replace all market data for a symbol. + * Deletes existing data and inserts new data within a single transaction + * to prevent data loss if the operation fails. + */ + public async replaceAllForSymbol({ + dataSource, + symbol, + data + }: { + dataSource: DataSource; + symbol: string; + data: Prisma.MarketDataUpdateInput[]; + }): Promise { + await this.prismaService.$transaction(async (prisma) => { + await prisma.marketData.deleteMany({ + where: { + dataSource, + symbol + } + }); + + if (data.length > 0) { + await prisma.marketData.createMany({ + data: data.map(({ dataSource, date, marketPrice, state }) => ({ + dataSource: dataSource as DataSource, + date: date as Date, + marketPrice: marketPrice as number, + state: state as MarketDataState, + symbol: symbol as string + })), + skipDuplicates: true + }); + } + }); + } } diff --git a/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts b/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts index 1a172f3ea..57c99a87c 100644 --- a/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts +++ b/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts @@ -100,7 +100,7 @@ export class DataGatheringProcessor { name: GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME }) public async gatherHistoricalMarketData(job: Job) { - const { dataSource, date, symbol } = job.data; + const { dataSource, date, symbol, replaceExistingData } = job.data; try { let currentDate = parseISO(date as unknown as string); @@ -109,7 +109,7 @@ export class DataGatheringProcessor { `Historical market data gathering has been started for ${symbol} (${dataSource}) at ${format( currentDate, DATE_FORMAT - )}`, + )}${replaceExistingData ? ' (replace mode)' : ''}`, `DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME})` ); @@ -157,7 +157,15 @@ export class DataGatheringProcessor { currentDate = addDays(currentDate, 1); } - await this.marketDataService.updateMany({ data }); + if (replaceExistingData) { + await this.marketDataService.replaceAllForSymbol({ + dataSource, + symbol, + data + }); + } else { + await this.marketDataService.updateMany({ data }); + } Logger.log( `Historical market data gathering has been completed for ${symbol} (${dataSource}) at ${format( diff --git a/apps/api/src/services/queues/data-gathering/data-gathering.service.ts b/apps/api/src/services/queues/data-gathering/data-gathering.service.ts index 2d3ec45ad..7bcde097c 100644 --- a/apps/api/src/services/queues/data-gathering/data-gathering.service.ts +++ b/apps/api/src/services/queues/data-gathering/data-gathering.service.ts @@ -2,7 +2,6 @@ import { DataProviderService } from '@ghostfolio/api/services/data-provider/data import { DataEnhancerInterface } from '@ghostfolio/api/services/data-provider/interfaces/data-enhancer.interface'; import { ExchangeRateDataService } from '@ghostfolio/api/services/exchange-rate-data/exchange-rate-data.service'; import { DataGatheringItem } from '@ghostfolio/api/services/interfaces/interfaces'; -import { MarketDataService } from '@ghostfolio/api/services/market-data/market-data.service'; import { PrismaService } from '@ghostfolio/api/services/prisma/prisma.service'; import { PropertyService } from '@ghostfolio/api/services/property/property.service'; import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile/symbol-profile.service'; @@ -41,7 +40,6 @@ export class DataGatheringService { private readonly dataGatheringQueue: Queue, private readonly dataProviderService: DataProviderService, private readonly exchangeRateDataService: ExchangeRateDataService, - private readonly marketDataService: MarketDataService, private readonly prismaService: PrismaService, private readonly propertyService: PropertyService, private readonly symbolProfileService: SymbolProfileService @@ -95,8 +93,6 @@ export class DataGatheringService { } public async gatherSymbol({ dataSource, date, symbol }: DataGatheringItem) { - await this.marketDataService.deleteMany({ dataSource, symbol }); - const dataGatheringItems = (await this.getSymbolsMax()) .filter((dataGatheringItem) => { return ( @@ -111,7 +107,8 @@ export class DataGatheringService { await this.gatherSymbols({ dataGatheringItems, - priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH + priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH, + replaceExistingData: true }); } @@ -274,10 +271,12 @@ export class DataGatheringService { public async gatherSymbols({ dataGatheringItems, - priority + priority, + replaceExistingData = false }: { dataGatheringItems: DataGatheringItem[]; priority: number; + replaceExistingData?: boolean; }) { await this.addJobsToQueue( dataGatheringItems.map(({ dataSource, date, symbol }) => { @@ -285,7 +284,8 @@ export class DataGatheringService { data: { dataSource, date, - symbol + symbol, + replaceExistingData }, name: GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME, opts: {