Browse Source

Fix data loss risk in manual historical market data gathering (#5686)

Replace delete-then-fetch pattern with atomic transaction to prevent data loss when manually gathering historical market data fails.

Previously, when triggering "Gather Historical Market Data" from the Admin panel, the system would immediately delete all existing market
data before queueing the fetch job. If the external data provider was down or returned an error, the asset would be left with no historical
data and the original data was permanently lost.

Changes:
 - Add `replaceAllForSymbol()` method to MarketDataService that  performs delete and insert within a Prisma transaction
 - Remove upfront `deleteMany()` call from `gatherSymbol()` method
 - Add `replaceExistingData` flag to DataGatheringItem interface to distinguish manual refresh from scheduled updates
 - Update data gathering processor to use atomic replace only for manual operations while keeping normal upsert behavior for scheduled
  updates
 - Remove unused MarketDataService dependency from DataGatheringService

The atomic transaction ensures that if the fetch operation fails, the original market data remains untouched. Regular scheduled data gathering continues to use upsert operations and is unaffected by this change.
pull/5858/head
Sven Günther 3 days ago
parent
commit
3539e875ba
  1. 1
      CHANGELOG.md
  2. 1
      apps/api/src/services/interfaces/interfaces.ts
  3. 42
      apps/api/src/services/market-data/market-data.service.ts
  4. 16
      apps/api/src/services/queues/data-gathering/data-gathering.processor.ts
  5. 16
      apps/api/src/services/queues/data-gathering/data-gathering.service.ts

1
CHANGELOG.md

@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Ensured the locale is available in the settings dialog to customize the rule thresholds of the _X-ray_ page
- Ensured atomic data replacememt for historical market data fetching
## 2.211.0 - 2025-10-25

1
apps/api/src/services/interfaces/interfaces.ts

@ -20,4 +20,5 @@ export interface DataProviderResponse {
export interface DataGatheringItem extends AssetProfileIdentifier {
date?: Date;
replaceExistingData?: boolean;
}

42
apps/api/src/services/market-data/market-data.service.ts

@ -205,4 +205,46 @@ export class MarketDataService {
return this.prismaService.$transaction(upsertPromises);
}
/**
* Atomically replace all market data for a symbol.
* Deletes existing data and inserts new data within a single transaction
* to prevent data loss if the operation fails.
*/
public async replaceAllForSymbol({
dataSource,
symbol,
data
}: {
dataSource: DataSource;
symbol: string;
data: Prisma.MarketDataUpdateInput[];
}): Promise<void> {
await this.prismaService.$transaction(async (prisma) => {
// First, delete all existing market data for this symbol
await prisma.marketData.deleteMany({
where: {
dataSource,
symbol
}
});
// Then, insert all new market data
const upsertPromises = data.map(
({ dataSource, date, marketPrice, state }) => {
return prisma.marketData.create({
data: {
dataSource: dataSource as DataSource,
date: date as Date,
marketPrice: marketPrice as number,
state: state as MarketDataState,
symbol: symbol as string
}
});
}
);
await Promise.all(upsertPromises);
});
}
}

16
apps/api/src/services/queues/data-gathering/data-gathering.processor.ts

@ -100,7 +100,7 @@ export class DataGatheringProcessor {
name: GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME
})
public async gatherHistoricalMarketData(job: Job<DataGatheringItem>) {
const { dataSource, date, symbol } = job.data;
const { dataSource, date, symbol, replaceExistingData } = job.data;
try {
let currentDate = parseISO(date as unknown as string);
@ -109,7 +109,7 @@ export class DataGatheringProcessor {
`Historical market data gathering has been started for ${symbol} (${dataSource}) at ${format(
currentDate,
DATE_FORMAT
)}`,
)}${replaceExistingData ? ' (replace mode)' : ''}`,
`DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME})`
);
@ -157,7 +157,17 @@ export class DataGatheringProcessor {
currentDate = addDays(currentDate, 1);
}
await this.marketDataService.updateMany({ data });
// If replaceExistingData is true, use atomic replace to prevent data loss
// on failure. Otherwise, use the normal upsert approach.
if (replaceExistingData) {
await this.marketDataService.replaceAllForSymbol({
dataSource,
symbol,
data
});
} else {
await this.marketDataService.updateMany({ data });
}
Logger.log(
`Historical market data gathering has been completed for ${symbol} (${dataSource}) at ${format(

16
apps/api/src/services/queues/data-gathering/data-gathering.service.ts

@ -2,7 +2,6 @@ import { DataProviderService } from '@ghostfolio/api/services/data-provider/data
import { DataEnhancerInterface } from '@ghostfolio/api/services/data-provider/interfaces/data-enhancer.interface';
import { ExchangeRateDataService } from '@ghostfolio/api/services/exchange-rate-data/exchange-rate-data.service';
import { DataGatheringItem } from '@ghostfolio/api/services/interfaces/interfaces';
import { MarketDataService } from '@ghostfolio/api/services/market-data/market-data.service';
import { PrismaService } from '@ghostfolio/api/services/prisma/prisma.service';
import { PropertyService } from '@ghostfolio/api/services/property/property.service';
import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile/symbol-profile.service';
@ -41,7 +40,6 @@ export class DataGatheringService {
private readonly dataGatheringQueue: Queue,
private readonly dataProviderService: DataProviderService,
private readonly exchangeRateDataService: ExchangeRateDataService,
private readonly marketDataService: MarketDataService,
private readonly prismaService: PrismaService,
private readonly propertyService: PropertyService,
private readonly symbolProfileService: SymbolProfileService
@ -95,8 +93,6 @@ export class DataGatheringService {
}
public async gatherSymbol({ dataSource, date, symbol }: DataGatheringItem) {
await this.marketDataService.deleteMany({ dataSource, symbol });
const dataGatheringItems = (await this.getSymbolsMax())
.filter((dataGatheringItem) => {
return (
@ -109,9 +105,12 @@ export class DataGatheringService {
date: date ?? item.date
}));
// Add a flag to indicate this should replace all existing data
// The data will be deleted and replaced within a transaction in the processor
await this.gatherSymbols({
dataGatheringItems,
priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH
priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH,
replaceExistingData: true
});
}
@ -274,10 +273,12 @@ export class DataGatheringService {
public async gatherSymbols({
dataGatheringItems,
priority
priority,
replaceExistingData = false
}: {
dataGatheringItems: DataGatheringItem[];
priority: number;
replaceExistingData?: boolean;
}) {
await this.addJobsToQueue(
dataGatheringItems.map(({ dataSource, date, symbol }) => {
@ -285,7 +286,8 @@ export class DataGatheringService {
data: {
dataSource,
date,
symbol
symbol,
replaceExistingData
},
name: GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME,
opts: {

Loading…
Cancel
Save