Browse Source

Refactoring

pull/5858/head
Thomas Kaul 1 day ago
parent
commit
c2a58e35bf
  1. 101
      apps/api/src/services/market-data/market-data.service.ts
  2. 4
      apps/api/src/services/queues/data-gathering/data-gathering.processor.ts

101
apps/api/src/services/market-data/market-data.service.ts

@ -132,6 +132,64 @@ export class MarketDataService {
});
}
/**
* Atomically replace market data for a symbol within a date range.
* Deletes existing data in the range and inserts new data within a single
* transaction to prevent data loss if the operation fails.
*/
public async replaceForSymbol({
data,
dataSource,
symbol
}: AssetProfileIdentifier & { data: Prisma.MarketDataUpdateInput[] }) {
await this.prismaService.$transaction(async (prisma) => {
if (data.length > 0) {
// Find the earliest and latest dates in the incoming data
const dates = data.map(({ date }) => {
return date as Date;
});
const minDate = new Date(
Math.min(
...dates.map((date) => {
return date.getTime();
})
)
);
const maxDate = new Date(
Math.max(
...dates.map((date) => {
return date.getTime();
})
)
);
await prisma.marketData.deleteMany({
where: {
dataSource,
symbol,
date: {
gte: minDate,
lte: maxDate
}
}
});
await prisma.marketData.createMany({
data: data.map(({ date, marketPrice, state }) => ({
dataSource,
symbol,
date: date as Date,
marketPrice: marketPrice as number,
state: state as MarketDataState
})),
skipDuplicates: true
});
}
});
}
public async updateAssetProfileIdentifier(
oldAssetProfileIdentifier: AssetProfileIdentifier,
newAssetProfileIdentifier: AssetProfileIdentifier
@ -205,47 +263,4 @@ export class MarketDataService {
return this.prismaService.$transaction(upsertPromises);
}
public async replaceAllForSymbol({
data,
dataSource,
symbol
}: AssetProfileIdentifier & { data: Prisma.MarketDataUpdateInput[] }) {
/**
* Atomically replace market data for a symbol within a date range.
* Deletes existing data in the range and inserts new data within a single
* transaction to prevent data loss if the operation fails.
*/
await this.prismaService.$transaction(async (prisma) => {
if (data.length > 0) {
// Find the earliest and latest dates in the incoming data
const dates = data.map(({ date }) => date as Date);
const minDate = new Date(Math.min(...dates.map((d) => d.getTime())));
const maxDate = new Date(Math.max(...dates.map((d) => d.getTime())));
// Only delete data within the date range that will be replaced
await prisma.marketData.deleteMany({
where: {
dataSource,
symbol,
date: {
gte: minDate,
lte: maxDate
}
}
});
await prisma.marketData.createMany({
data: data.map(({ dataSource, date, marketPrice, state }) => ({
dataSource: dataSource as DataSource,
date: date as Date,
marketPrice: marketPrice as number,
state: state as MarketDataState,
symbol: symbol as string
})),
skipDuplicates: true
});
}
});
}
}

4
apps/api/src/services/queues/data-gathering/data-gathering.processor.ts

@ -109,7 +109,7 @@ export class DataGatheringProcessor {
`Historical market data gathering has been started for ${symbol} (${dataSource}) at ${format(
currentDate,
DATE_FORMAT
)}${force ? ' (replace mode)' : ''}`,
)}${force ? ' (forced update)' : ''}`,
`DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME})`
);
@ -158,7 +158,7 @@ export class DataGatheringProcessor {
}
if (force) {
await this.marketDataService.replaceAllForSymbol({
await this.marketDataService.replaceForSymbol({
data,
dataSource,
symbol

Loading…
Cancel
Save