Sven Günther 2 days ago
committed by GitHub
parent
commit
1413944431
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 1
      CHANGELOG.md
  2. 1
      apps/api/src/services/interfaces/interfaces.ts
  3. 37
      apps/api/src/services/market-data/market-data.service.ts
  4. 14
      apps/api/src/services/queues/data-gathering/data-gathering.processor.ts
  5. 14
      apps/api/src/services/queues/data-gathering/data-gathering.service.ts

1
CHANGELOG.md

@ -22,6 +22,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Ensured the locale is available in the settings dialog to customize the rule thresholds of the _X-ray_ page
- Ensured atomic data replacememt for historical market data fetching
## 2.211.0 - 2025-10-25

1
apps/api/src/services/interfaces/interfaces.ts

@ -20,4 +20,5 @@ export interface DataProviderResponse {
export interface DataGatheringItem extends AssetProfileIdentifier {
date?: Date;
replaceExistingData?: boolean;
}

37
apps/api/src/services/market-data/market-data.service.ts

@ -205,4 +205,41 @@ export class MarketDataService {
return this.prismaService.$transaction(upsertPromises);
}
/**
* Atomically replace all market data for a symbol.
* Deletes existing data and inserts new data within a single transaction
* to prevent data loss if the operation fails.
*/
public async replaceAllForSymbol({
dataSource,
symbol,
data
}: {
dataSource: DataSource;
symbol: string;
data: Prisma.MarketDataUpdateInput[];
}): Promise<void> {
await this.prismaService.$transaction(async (prisma) => {
await prisma.marketData.deleteMany({
where: {
dataSource,
symbol
}
});
if (data.length > 0) {
await prisma.marketData.createMany({
data: data.map(({ dataSource, date, marketPrice, state }) => ({
dataSource: dataSource as DataSource,
date: date as Date,
marketPrice: marketPrice as number,
state: state as MarketDataState,
symbol: symbol as string
})),
skipDuplicates: true
});
}
});
}
}

14
apps/api/src/services/queues/data-gathering/data-gathering.processor.ts

@ -100,7 +100,7 @@ export class DataGatheringProcessor {
name: GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME
})
public async gatherHistoricalMarketData(job: Job<DataGatheringItem>) {
const { dataSource, date, symbol } = job.data;
const { dataSource, date, symbol, replaceExistingData } = job.data;
try {
let currentDate = parseISO(date as unknown as string);
@ -109,7 +109,7 @@ export class DataGatheringProcessor {
`Historical market data gathering has been started for ${symbol} (${dataSource}) at ${format(
currentDate,
DATE_FORMAT
)}`,
)}${replaceExistingData ? ' (replace mode)' : ''}`,
`DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME})`
);
@ -157,7 +157,15 @@ export class DataGatheringProcessor {
currentDate = addDays(currentDate, 1);
}
await this.marketDataService.updateMany({ data });
if (replaceExistingData) {
await this.marketDataService.replaceAllForSymbol({
dataSource,
symbol,
data
});
} else {
await this.marketDataService.updateMany({ data });
}
Logger.log(
`Historical market data gathering has been completed for ${symbol} (${dataSource}) at ${format(

14
apps/api/src/services/queues/data-gathering/data-gathering.service.ts

@ -2,7 +2,6 @@ import { DataProviderService } from '@ghostfolio/api/services/data-provider/data
import { DataEnhancerInterface } from '@ghostfolio/api/services/data-provider/interfaces/data-enhancer.interface';
import { ExchangeRateDataService } from '@ghostfolio/api/services/exchange-rate-data/exchange-rate-data.service';
import { DataGatheringItem } from '@ghostfolio/api/services/interfaces/interfaces';
import { MarketDataService } from '@ghostfolio/api/services/market-data/market-data.service';
import { PrismaService } from '@ghostfolio/api/services/prisma/prisma.service';
import { PropertyService } from '@ghostfolio/api/services/property/property.service';
import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile/symbol-profile.service';
@ -41,7 +40,6 @@ export class DataGatheringService {
private readonly dataGatheringQueue: Queue,
private readonly dataProviderService: DataProviderService,
private readonly exchangeRateDataService: ExchangeRateDataService,
private readonly marketDataService: MarketDataService,
private readonly prismaService: PrismaService,
private readonly propertyService: PropertyService,
private readonly symbolProfileService: SymbolProfileService
@ -95,8 +93,6 @@ export class DataGatheringService {
}
public async gatherSymbol({ dataSource, date, symbol }: DataGatheringItem) {
await this.marketDataService.deleteMany({ dataSource, symbol });
const dataGatheringItems = (await this.getSymbolsMax())
.filter((dataGatheringItem) => {
return (
@ -111,7 +107,8 @@ export class DataGatheringService {
await this.gatherSymbols({
dataGatheringItems,
priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH
priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH,
replaceExistingData: true
});
}
@ -274,10 +271,12 @@ export class DataGatheringService {
public async gatherSymbols({
dataGatheringItems,
priority
priority,
replaceExistingData = false
}: {
dataGatheringItems: DataGatheringItem[];
priority: number;
replaceExistingData?: boolean;
}) {
await this.addJobsToQueue(
dataGatheringItems.map(({ dataSource, date, symbol }) => {
@ -285,7 +284,8 @@ export class DataGatheringService {
data: {
dataSource,
date,
symbol
symbol,
replaceExistingData
},
name: GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME,
opts: {

Loading…
Cancel
Save