Browse Source

Address code review feedback

Apply style improvements including type simplification, renamed flag
  from replaceExistingData to force, and moved CHANGELOG entry.
pull/5858/head
Sven Günther 6 days ago
parent
commit
cfc873f69e
  1. 2
      CHANGELOG.md
  2. 2
      apps/api/src/services/interfaces/interfaces.ts
  3. 16
      apps/api/src/services/market-data/market-data.service.ts
  4. 6
      apps/api/src/services/queues/data-gathering/data-gathering.processor.ts
  5. 8
      apps/api/src/services/queues/data-gathering/data-gathering.service.ts

2
CHANGELOG.md

@ -15,11 +15,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed ### Changed
- Improved the language localization for German (`de`) - Improved the language localization for German (`de`)
- Ensured atomic data replacememt for historical market data fetching
### Fixed ### Fixed
- Ensured the locale is available in the settings dialog to customize the rule thresholds of the _X-ray_ page - Ensured the locale is available in the settings dialog to customize the rule thresholds of the _X-ray_ page
- Ensured atomic data replacememt for historical market data fetching
## 2.211.0 - 2025-10-25 ## 2.211.0 - 2025-10-25

2
apps/api/src/services/interfaces/interfaces.ts

@ -20,5 +20,5 @@ export interface DataProviderResponse {
export interface DataGatheringItem extends AssetProfileIdentifier { export interface DataGatheringItem extends AssetProfileIdentifier {
date?: Date; date?: Date;
replaceExistingData?: boolean; force?: boolean;
} }

16
apps/api/src/services/market-data/market-data.service.ts

@ -206,20 +206,16 @@ export class MarketDataService {
return this.prismaService.$transaction(upsertPromises); return this.prismaService.$transaction(upsertPromises);
} }
/**
* Atomically replace all market data for a symbol.
* Deletes existing data and inserts new data within a single transaction
* to prevent data loss if the operation fails.
*/
public async replaceAllForSymbol({ public async replaceAllForSymbol({
dataSource, dataSource,
symbol, symbol,
data data
}: { }: AssetProfileIdentifier & { data: Prisma.MarketDataUpdateInput[] }) {
dataSource: DataSource; /**
symbol: string; * Atomically replace all market data for a symbol.
data: Prisma.MarketDataUpdateInput[]; * Deletes existing data and inserts new data within a single transaction
}): Promise<void> { * to prevent data loss if the operation fails.
*/
await this.prismaService.$transaction(async (prisma) => { await this.prismaService.$transaction(async (prisma) => {
await prisma.marketData.deleteMany({ await prisma.marketData.deleteMany({
where: { where: {

6
apps/api/src/services/queues/data-gathering/data-gathering.processor.ts

@ -100,7 +100,7 @@ export class DataGatheringProcessor {
name: GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME name: GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME
}) })
public async gatherHistoricalMarketData(job: Job<DataGatheringItem>) { public async gatherHistoricalMarketData(job: Job<DataGatheringItem>) {
const { dataSource, date, symbol, replaceExistingData } = job.data; const { dataSource, date, force, symbol } = job.data;
try { try {
let currentDate = parseISO(date as unknown as string); let currentDate = parseISO(date as unknown as string);
@ -109,7 +109,7 @@ export class DataGatheringProcessor {
`Historical market data gathering has been started for ${symbol} (${dataSource}) at ${format( `Historical market data gathering has been started for ${symbol} (${dataSource}) at ${format(
currentDate, currentDate,
DATE_FORMAT DATE_FORMAT
)}${replaceExistingData ? ' (replace mode)' : ''}`, )}${force ? ' (replace mode)' : ''}`,
`DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME})` `DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME})`
); );
@ -157,7 +157,7 @@ export class DataGatheringProcessor {
currentDate = addDays(currentDate, 1); currentDate = addDays(currentDate, 1);
} }
if (replaceExistingData) { if (force) {
await this.marketDataService.replaceAllForSymbol({ await this.marketDataService.replaceAllForSymbol({
dataSource, dataSource,
symbol, symbol,

8
apps/api/src/services/queues/data-gathering/data-gathering.service.ts

@ -108,7 +108,7 @@ export class DataGatheringService {
await this.gatherSymbols({ await this.gatherSymbols({
dataGatheringItems, dataGatheringItems,
priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH, priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH,
replaceExistingData: true force: true
}); });
} }
@ -272,11 +272,11 @@ export class DataGatheringService {
public async gatherSymbols({ public async gatherSymbols({
dataGatheringItems, dataGatheringItems,
priority, priority,
replaceExistingData = false force = false
}: { }: {
dataGatheringItems: DataGatheringItem[]; dataGatheringItems: DataGatheringItem[];
priority: number; priority: number;
replaceExistingData?: boolean; force?: boolean;
}) { }) {
await this.addJobsToQueue( await this.addJobsToQueue(
dataGatheringItems.map(({ dataSource, date, symbol }) => { dataGatheringItems.map(({ dataSource, date, symbol }) => {
@ -285,7 +285,7 @@ export class DataGatheringService {
dataSource, dataSource,
date, date,
symbol, symbol,
replaceExistingData force
}, },
name: GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME, name: GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME,
opts: { opts: {

Loading…
Cancel
Save