From 3539e875ba31b948555d5d1660902b208e1a34a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sven=20G=C3=BCnther?= Date: Mon, 27 Oct 2025 16:41:37 +0100 Subject: [PATCH 1/3] Fix data loss risk in manual historical market data gathering (#5686) Replace delete-then-fetch pattern with atomic transaction to prevent data loss when manually gathering historical market data fails. Previously, when triggering "Gather Historical Market Data" from the Admin panel, the system would immediately delete all existing market data before queueing the fetch job. If the external data provider was down or returned an error, the asset would be left with no historical data and the original data was permanently lost. Changes: - Add `replaceAllForSymbol()` method to MarketDataService that performs delete and insert within a Prisma transaction - Remove upfront `deleteMany()` call from `gatherSymbol()` method - Add `replaceExistingData` flag to DataGatheringItem interface to distinguish manual refresh from scheduled updates - Update data gathering processor to use atomic replace only for manual operations while keeping normal upsert behavior for scheduled updates - Remove unused MarketDataService dependency from DataGatheringService The atomic transaction ensures that if the fetch operation fails, the original market data remains untouched. Regular scheduled data gathering continues to use upsert operations and is unaffected by this change. --- CHANGELOG.md | 1 + .../api/src/services/interfaces/interfaces.ts | 1 + .../market-data/market-data.service.ts | 42 +++++++++++++++++++ .../data-gathering.processor.ts | 16 +++++-- .../data-gathering/data-gathering.service.ts | 16 +++---- 5 files changed, 66 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1024336b3..78731b64d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - Ensured the locale is available in the settings dialog to customize the rule thresholds of the _X-ray_ page +- Ensured atomic data replacememt for historical market data fetching ## 2.211.0 - 2025-10-25 diff --git a/apps/api/src/services/interfaces/interfaces.ts b/apps/api/src/services/interfaces/interfaces.ts index 7469754b5..0c7cd31c0 100644 --- a/apps/api/src/services/interfaces/interfaces.ts +++ b/apps/api/src/services/interfaces/interfaces.ts @@ -20,4 +20,5 @@ export interface DataProviderResponse { export interface DataGatheringItem extends AssetProfileIdentifier { date?: Date; + replaceExistingData?: boolean; } diff --git a/apps/api/src/services/market-data/market-data.service.ts b/apps/api/src/services/market-data/market-data.service.ts index 38ad61663..d09cab6c5 100644 --- a/apps/api/src/services/market-data/market-data.service.ts +++ b/apps/api/src/services/market-data/market-data.service.ts @@ -205,4 +205,46 @@ export class MarketDataService { return this.prismaService.$transaction(upsertPromises); } + + /** + * Atomically replace all market data for a symbol. + * Deletes existing data and inserts new data within a single transaction + * to prevent data loss if the operation fails. + */ + public async replaceAllForSymbol({ + dataSource, + symbol, + data + }: { + dataSource: DataSource; + symbol: string; + data: Prisma.MarketDataUpdateInput[]; + }): Promise { + await this.prismaService.$transaction(async (prisma) => { + // First, delete all existing market data for this symbol + await prisma.marketData.deleteMany({ + where: { + dataSource, + symbol + } + }); + + // Then, insert all new market data + const upsertPromises = data.map( + ({ dataSource, date, marketPrice, state }) => { + return prisma.marketData.create({ + data: { + dataSource: dataSource as DataSource, + date: date as Date, + marketPrice: marketPrice as number, + state: state as MarketDataState, + symbol: symbol as string + } + }); + } + ); + + await Promise.all(upsertPromises); + }); + } } diff --git a/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts b/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts index 1a172f3ea..153316ea6 100644 --- a/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts +++ b/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts @@ -100,7 +100,7 @@ export class DataGatheringProcessor { name: GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME }) public async gatherHistoricalMarketData(job: Job) { - const { dataSource, date, symbol } = job.data; + const { dataSource, date, symbol, replaceExistingData } = job.data; try { let currentDate = parseISO(date as unknown as string); @@ -109,7 +109,7 @@ export class DataGatheringProcessor { `Historical market data gathering has been started for ${symbol} (${dataSource}) at ${format( currentDate, DATE_FORMAT - )}`, + )}${replaceExistingData ? ' (replace mode)' : ''}`, `DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME})` ); @@ -157,7 +157,17 @@ export class DataGatheringProcessor { currentDate = addDays(currentDate, 1); } - await this.marketDataService.updateMany({ data }); + // If replaceExistingData is true, use atomic replace to prevent data loss + // on failure. Otherwise, use the normal upsert approach. + if (replaceExistingData) { + await this.marketDataService.replaceAllForSymbol({ + dataSource, + symbol, + data + }); + } else { + await this.marketDataService.updateMany({ data }); + } Logger.log( `Historical market data gathering has been completed for ${symbol} (${dataSource}) at ${format( diff --git a/apps/api/src/services/queues/data-gathering/data-gathering.service.ts b/apps/api/src/services/queues/data-gathering/data-gathering.service.ts index 2d3ec45ad..0b8ed8931 100644 --- a/apps/api/src/services/queues/data-gathering/data-gathering.service.ts +++ b/apps/api/src/services/queues/data-gathering/data-gathering.service.ts @@ -2,7 +2,6 @@ import { DataProviderService } from '@ghostfolio/api/services/data-provider/data import { DataEnhancerInterface } from '@ghostfolio/api/services/data-provider/interfaces/data-enhancer.interface'; import { ExchangeRateDataService } from '@ghostfolio/api/services/exchange-rate-data/exchange-rate-data.service'; import { DataGatheringItem } from '@ghostfolio/api/services/interfaces/interfaces'; -import { MarketDataService } from '@ghostfolio/api/services/market-data/market-data.service'; import { PrismaService } from '@ghostfolio/api/services/prisma/prisma.service'; import { PropertyService } from '@ghostfolio/api/services/property/property.service'; import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile/symbol-profile.service'; @@ -41,7 +40,6 @@ export class DataGatheringService { private readonly dataGatheringQueue: Queue, private readonly dataProviderService: DataProviderService, private readonly exchangeRateDataService: ExchangeRateDataService, - private readonly marketDataService: MarketDataService, private readonly prismaService: PrismaService, private readonly propertyService: PropertyService, private readonly symbolProfileService: SymbolProfileService @@ -95,8 +93,6 @@ export class DataGatheringService { } public async gatherSymbol({ dataSource, date, symbol }: DataGatheringItem) { - await this.marketDataService.deleteMany({ dataSource, symbol }); - const dataGatheringItems = (await this.getSymbolsMax()) .filter((dataGatheringItem) => { return ( @@ -109,9 +105,12 @@ export class DataGatheringService { date: date ?? item.date })); + // Add a flag to indicate this should replace all existing data + // The data will be deleted and replaced within a transaction in the processor await this.gatherSymbols({ dataGatheringItems, - priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH + priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH, + replaceExistingData: true }); } @@ -274,10 +273,12 @@ export class DataGatheringService { public async gatherSymbols({ dataGatheringItems, - priority + priority, + replaceExistingData = false }: { dataGatheringItems: DataGatheringItem[]; priority: number; + replaceExistingData?: boolean; }) { await this.addJobsToQueue( dataGatheringItems.map(({ dataSource, date, symbol }) => { @@ -285,7 +286,8 @@ export class DataGatheringService { data: { dataSource, date, - symbol + symbol, + replaceExistingData }, name: GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME, opts: { From 9e4df2573be858d7e15e797bbc99aaadc220d528 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sven=20G=C3=BCnther?= Date: Mon, 27 Oct 2025 16:50:54 +0100 Subject: [PATCH 2/3] remove unnessesary comments --- apps/api/src/services/market-data/market-data.service.ts | 2 -- .../services/queues/data-gathering/data-gathering.processor.ts | 2 -- .../services/queues/data-gathering/data-gathering.service.ts | 2 -- 3 files changed, 6 deletions(-) diff --git a/apps/api/src/services/market-data/market-data.service.ts b/apps/api/src/services/market-data/market-data.service.ts index d09cab6c5..ea15fc9cd 100644 --- a/apps/api/src/services/market-data/market-data.service.ts +++ b/apps/api/src/services/market-data/market-data.service.ts @@ -221,7 +221,6 @@ export class MarketDataService { data: Prisma.MarketDataUpdateInput[]; }): Promise { await this.prismaService.$transaction(async (prisma) => { - // First, delete all existing market data for this symbol await prisma.marketData.deleteMany({ where: { dataSource, @@ -229,7 +228,6 @@ export class MarketDataService { } }); - // Then, insert all new market data const upsertPromises = data.map( ({ dataSource, date, marketPrice, state }) => { return prisma.marketData.create({ diff --git a/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts b/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts index 153316ea6..57c99a87c 100644 --- a/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts +++ b/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts @@ -157,8 +157,6 @@ export class DataGatheringProcessor { currentDate = addDays(currentDate, 1); } - // If replaceExistingData is true, use atomic replace to prevent data loss - // on failure. Otherwise, use the normal upsert approach. if (replaceExistingData) { await this.marketDataService.replaceAllForSymbol({ dataSource, diff --git a/apps/api/src/services/queues/data-gathering/data-gathering.service.ts b/apps/api/src/services/queues/data-gathering/data-gathering.service.ts index 0b8ed8931..7bcde097c 100644 --- a/apps/api/src/services/queues/data-gathering/data-gathering.service.ts +++ b/apps/api/src/services/queues/data-gathering/data-gathering.service.ts @@ -105,8 +105,6 @@ export class DataGatheringService { date: date ?? item.date })); - // Add a flag to indicate this should replace all existing data - // The data will be deleted and replaced within a transaction in the processor await this.gatherSymbols({ dataGatheringItems, priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH, From 094b5083583f91b3d724b8c3a7b9e21afbff4115 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sven=20G=C3=BCnther?= Date: Mon, 27 Oct 2025 17:11:32 +0100 Subject: [PATCH 3/3] Fix P2002 unique constraint error in replaceAllForSymbol Replace Promise.all with createMany to handle duplicate dates in the data array when atomically replacing market data. The previous implementation used multiple parallel create() operations which caused a P2002 unique constraint violation when the data array contained duplicate dates (e.g., when market prices are forward-filled for non-trading days). Changes: - Replace Promise.all of individual create() operations with createMany() - Add skipDuplicates: true to silently handle duplicate records - Add data.length check to avoid empty createMany call This maintains the atomic transaction behavior while efficiently handling batch inserts with potential duplicates. --- .../market-data/market-data.service.ts | 27 +++++++++---------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/apps/api/src/services/market-data/market-data.service.ts b/apps/api/src/services/market-data/market-data.service.ts index ea15fc9cd..e046bc8dd 100644 --- a/apps/api/src/services/market-data/market-data.service.ts +++ b/apps/api/src/services/market-data/market-data.service.ts @@ -228,21 +228,18 @@ export class MarketDataService { } }); - const upsertPromises = data.map( - ({ dataSource, date, marketPrice, state }) => { - return prisma.marketData.create({ - data: { - dataSource: dataSource as DataSource, - date: date as Date, - marketPrice: marketPrice as number, - state: state as MarketDataState, - symbol: symbol as string - } - }); - } - ); - - await Promise.all(upsertPromises); + if (data.length > 0) { + await prisma.marketData.createMany({ + data: data.map(({ dataSource, date, marketPrice, state }) => ({ + dataSource: dataSource as DataSource, + date: date as Date, + marketPrice: marketPrice as number, + state: state as MarketDataState, + symbol: symbol as string + })), + skipDuplicates: true + }); + } }); } }