From 3539e875ba31b948555d5d1660902b208e1a34a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sven=20G=C3=BCnther?= Date: Mon, 27 Oct 2025 16:41:37 +0100 Subject: [PATCH] Fix data loss risk in manual historical market data gathering (#5686) Replace delete-then-fetch pattern with atomic transaction to prevent data loss when manually gathering historical market data fails. Previously, when triggering "Gather Historical Market Data" from the Admin panel, the system would immediately delete all existing market data before queueing the fetch job. If the external data provider was down or returned an error, the asset would be left with no historical data and the original data was permanently lost. Changes: - Add `replaceAllForSymbol()` method to MarketDataService that performs delete and insert within a Prisma transaction - Remove upfront `deleteMany()` call from `gatherSymbol()` method - Add `replaceExistingData` flag to DataGatheringItem interface to distinguish manual refresh from scheduled updates - Update data gathering processor to use atomic replace only for manual operations while keeping normal upsert behavior for scheduled updates - Remove unused MarketDataService dependency from DataGatheringService The atomic transaction ensures that if the fetch operation fails, the original market data remains untouched. Regular scheduled data gathering continues to use upsert operations and is unaffected by this change. --- CHANGELOG.md | 1 + .../api/src/services/interfaces/interfaces.ts | 1 + .../market-data/market-data.service.ts | 42 +++++++++++++++++++ .../data-gathering.processor.ts | 16 +++++-- .../data-gathering/data-gathering.service.ts | 16 +++---- 5 files changed, 66 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1024336b3..78731b64d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - Ensured the locale is available in the settings dialog to customize the rule thresholds of the _X-ray_ page +- Ensured atomic data replacememt for historical market data fetching ## 2.211.0 - 2025-10-25 diff --git a/apps/api/src/services/interfaces/interfaces.ts b/apps/api/src/services/interfaces/interfaces.ts index 7469754b5..0c7cd31c0 100644 --- a/apps/api/src/services/interfaces/interfaces.ts +++ b/apps/api/src/services/interfaces/interfaces.ts @@ -20,4 +20,5 @@ export interface DataProviderResponse { export interface DataGatheringItem extends AssetProfileIdentifier { date?: Date; + replaceExistingData?: boolean; } diff --git a/apps/api/src/services/market-data/market-data.service.ts b/apps/api/src/services/market-data/market-data.service.ts index 38ad61663..d09cab6c5 100644 --- a/apps/api/src/services/market-data/market-data.service.ts +++ b/apps/api/src/services/market-data/market-data.service.ts @@ -205,4 +205,46 @@ export class MarketDataService { return this.prismaService.$transaction(upsertPromises); } + + /** + * Atomically replace all market data for a symbol. + * Deletes existing data and inserts new data within a single transaction + * to prevent data loss if the operation fails. + */ + public async replaceAllForSymbol({ + dataSource, + symbol, + data + }: { + dataSource: DataSource; + symbol: string; + data: Prisma.MarketDataUpdateInput[]; + }): Promise { + await this.prismaService.$transaction(async (prisma) => { + // First, delete all existing market data for this symbol + await prisma.marketData.deleteMany({ + where: { + dataSource, + symbol + } + }); + + // Then, insert all new market data + const upsertPromises = data.map( + ({ dataSource, date, marketPrice, state }) => { + return prisma.marketData.create({ + data: { + dataSource: dataSource as DataSource, + date: date as Date, + marketPrice: marketPrice as number, + state: state as MarketDataState, + symbol: symbol as string + } + }); + } + ); + + await Promise.all(upsertPromises); + }); + } } diff --git a/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts b/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts index 1a172f3ea..153316ea6 100644 --- a/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts +++ b/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts @@ -100,7 +100,7 @@ export class DataGatheringProcessor { name: GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME }) public async gatherHistoricalMarketData(job: Job) { - const { dataSource, date, symbol } = job.data; + const { dataSource, date, symbol, replaceExistingData } = job.data; try { let currentDate = parseISO(date as unknown as string); @@ -109,7 +109,7 @@ export class DataGatheringProcessor { `Historical market data gathering has been started for ${symbol} (${dataSource}) at ${format( currentDate, DATE_FORMAT - )}`, + )}${replaceExistingData ? ' (replace mode)' : ''}`, `DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME})` ); @@ -157,7 +157,17 @@ export class DataGatheringProcessor { currentDate = addDays(currentDate, 1); } - await this.marketDataService.updateMany({ data }); + // If replaceExistingData is true, use atomic replace to prevent data loss + // on failure. Otherwise, use the normal upsert approach. + if (replaceExistingData) { + await this.marketDataService.replaceAllForSymbol({ + dataSource, + symbol, + data + }); + } else { + await this.marketDataService.updateMany({ data }); + } Logger.log( `Historical market data gathering has been completed for ${symbol} (${dataSource}) at ${format( diff --git a/apps/api/src/services/queues/data-gathering/data-gathering.service.ts b/apps/api/src/services/queues/data-gathering/data-gathering.service.ts index 2d3ec45ad..0b8ed8931 100644 --- a/apps/api/src/services/queues/data-gathering/data-gathering.service.ts +++ b/apps/api/src/services/queues/data-gathering/data-gathering.service.ts @@ -2,7 +2,6 @@ import { DataProviderService } from '@ghostfolio/api/services/data-provider/data import { DataEnhancerInterface } from '@ghostfolio/api/services/data-provider/interfaces/data-enhancer.interface'; import { ExchangeRateDataService } from '@ghostfolio/api/services/exchange-rate-data/exchange-rate-data.service'; import { DataGatheringItem } from '@ghostfolio/api/services/interfaces/interfaces'; -import { MarketDataService } from '@ghostfolio/api/services/market-data/market-data.service'; import { PrismaService } from '@ghostfolio/api/services/prisma/prisma.service'; import { PropertyService } from '@ghostfolio/api/services/property/property.service'; import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile/symbol-profile.service'; @@ -41,7 +40,6 @@ export class DataGatheringService { private readonly dataGatheringQueue: Queue, private readonly dataProviderService: DataProviderService, private readonly exchangeRateDataService: ExchangeRateDataService, - private readonly marketDataService: MarketDataService, private readonly prismaService: PrismaService, private readonly propertyService: PropertyService, private readonly symbolProfileService: SymbolProfileService @@ -95,8 +93,6 @@ export class DataGatheringService { } public async gatherSymbol({ dataSource, date, symbol }: DataGatheringItem) { - await this.marketDataService.deleteMany({ dataSource, symbol }); - const dataGatheringItems = (await this.getSymbolsMax()) .filter((dataGatheringItem) => { return ( @@ -109,9 +105,12 @@ export class DataGatheringService { date: date ?? item.date })); + // Add a flag to indicate this should replace all existing data + // The data will be deleted and replaced within a transaction in the processor await this.gatherSymbols({ dataGatheringItems, - priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH + priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH, + replaceExistingData: true }); } @@ -274,10 +273,12 @@ export class DataGatheringService { public async gatherSymbols({ dataGatheringItems, - priority + priority, + replaceExistingData = false }: { dataGatheringItems: DataGatheringItem[]; priority: number; + replaceExistingData?: boolean; }) { await this.addJobsToQueue( dataGatheringItems.map(({ dataSource, date, symbol }) => { @@ -285,7 +286,8 @@ export class DataGatheringService { data: { dataSource, date, - symbol + symbol, + replaceExistingData }, name: GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME, opts: {