Sven Günther 4 days ago
committed by GitHub
parent
commit
f97a5c9f33
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 1
      CHANGELOG.md
  2. 1
      apps/api/src/services/interfaces/interfaces.ts
  3. 55
      apps/api/src/services/market-data/market-data.service.ts
  4. 14
      apps/api/src/services/queues/data-gathering/data-gathering.processor.ts
  5. 8
      apps/api/src/services/queues/data-gathering/data-gathering.service.ts

1
CHANGELOG.md

@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed ### Changed
- Improved the icon of the _View Holding_ menu item in the activities table - Improved the icon of the _View Holding_ menu item in the activities table
- Ensured atomic data replacement during historical market data gathering
- Refreshed the cryptocurrencies list - Refreshed the cryptocurrencies list
- Upgraded `ng-extract-i18n-merge` from version `3.0.0` to `3.1.0` - Upgraded `ng-extract-i18n-merge` from version `3.0.0` to `3.1.0`

1
apps/api/src/services/interfaces/interfaces.ts

@ -20,4 +20,5 @@ export interface DataProviderResponse {
export interface DataGatheringItem extends AssetProfileIdentifier { export interface DataGatheringItem extends AssetProfileIdentifier {
date?: Date; date?: Date;
force?: boolean;
} }

55
apps/api/src/services/market-data/market-data.service.ts

@ -132,6 +132,61 @@ export class MarketDataService {
}); });
} }
/**
* Atomically replace market data for a symbol within a date range.
* Deletes existing data in the range and inserts new data within a single
* transaction to prevent data loss if the operation fails.
*/
public async replaceForSymbol({
data,
dataSource,
symbol
}: AssetProfileIdentifier & { data: Prisma.MarketDataUpdateInput[] }) {
await this.prismaService.$transaction(async (prisma) => {
if (data.length > 0) {
let minTime = Infinity;
let maxTime = -Infinity;
for (const item of data) {
const time = (item.date as Date).getTime();
if (time < minTime) {
minTime = time;
}
if (time > maxTime) {
maxTime = time;
}
}
const minDate = new Date(minTime);
const maxDate = new Date(maxTime);
await prisma.marketData.deleteMany({
where: {
dataSource,
symbol,
date: {
gte: minDate,
lte: maxDate
}
}
});
await prisma.marketData.createMany({
data: data.map(({ date, marketPrice, state }) => ({
dataSource,
symbol,
date: date as Date,
marketPrice: marketPrice as number,
state: state as MarketDataState
})),
skipDuplicates: true
});
}
});
}
public async updateAssetProfileIdentifier( public async updateAssetProfileIdentifier(
oldAssetProfileIdentifier: AssetProfileIdentifier, oldAssetProfileIdentifier: AssetProfileIdentifier,
newAssetProfileIdentifier: AssetProfileIdentifier newAssetProfileIdentifier: AssetProfileIdentifier

14
apps/api/src/services/queues/data-gathering/data-gathering.processor.ts

@ -100,7 +100,7 @@ export class DataGatheringProcessor {
name: GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME name: GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME
}) })
public async gatherHistoricalMarketData(job: Job<DataGatheringItem>) { public async gatherHistoricalMarketData(job: Job<DataGatheringItem>) {
const { dataSource, date, symbol } = job.data; const { dataSource, date, force, symbol } = job.data;
try { try {
let currentDate = parseISO(date as unknown as string); let currentDate = parseISO(date as unknown as string);
@ -109,7 +109,7 @@ export class DataGatheringProcessor {
`Historical market data gathering has been started for ${symbol} (${dataSource}) at ${format( `Historical market data gathering has been started for ${symbol} (${dataSource}) at ${format(
currentDate, currentDate,
DATE_FORMAT DATE_FORMAT
)}`, )}${force ? ' (forced update)' : ''}`,
`DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME})` `DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME})`
); );
@ -157,7 +157,15 @@ export class DataGatheringProcessor {
currentDate = addDays(currentDate, 1); currentDate = addDays(currentDate, 1);
} }
await this.marketDataService.updateMany({ data }); if (force) {
await this.marketDataService.replaceForSymbol({
data,
dataSource,
symbol
});
} else {
await this.marketDataService.updateMany({ data });
}
Logger.log( Logger.log(
`Historical market data gathering has been completed for ${symbol} (${dataSource}) at ${format( `Historical market data gathering has been completed for ${symbol} (${dataSource}) at ${format(

8
apps/api/src/services/queues/data-gathering/data-gathering.service.ts

@ -2,7 +2,6 @@ import { DataProviderService } from '@ghostfolio/api/services/data-provider/data
import { DataEnhancerInterface } from '@ghostfolio/api/services/data-provider/interfaces/data-enhancer.interface'; import { DataEnhancerInterface } from '@ghostfolio/api/services/data-provider/interfaces/data-enhancer.interface';
import { ExchangeRateDataService } from '@ghostfolio/api/services/exchange-rate-data/exchange-rate-data.service'; import { ExchangeRateDataService } from '@ghostfolio/api/services/exchange-rate-data/exchange-rate-data.service';
import { DataGatheringItem } from '@ghostfolio/api/services/interfaces/interfaces'; import { DataGatheringItem } from '@ghostfolio/api/services/interfaces/interfaces';
import { MarketDataService } from '@ghostfolio/api/services/market-data/market-data.service';
import { PrismaService } from '@ghostfolio/api/services/prisma/prisma.service'; import { PrismaService } from '@ghostfolio/api/services/prisma/prisma.service';
import { PropertyService } from '@ghostfolio/api/services/property/property.service'; import { PropertyService } from '@ghostfolio/api/services/property/property.service';
import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile/symbol-profile.service'; import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile/symbol-profile.service';
@ -41,7 +40,6 @@ export class DataGatheringService {
private readonly dataGatheringQueue: Queue, private readonly dataGatheringQueue: Queue,
private readonly dataProviderService: DataProviderService, private readonly dataProviderService: DataProviderService,
private readonly exchangeRateDataService: ExchangeRateDataService, private readonly exchangeRateDataService: ExchangeRateDataService,
private readonly marketDataService: MarketDataService,
private readonly prismaService: PrismaService, private readonly prismaService: PrismaService,
private readonly propertyService: PropertyService, private readonly propertyService: PropertyService,
private readonly symbolProfileService: SymbolProfileService private readonly symbolProfileService: SymbolProfileService
@ -95,8 +93,6 @@ export class DataGatheringService {
} }
public async gatherSymbol({ dataSource, date, symbol }: DataGatheringItem) { public async gatherSymbol({ dataSource, date, symbol }: DataGatheringItem) {
await this.marketDataService.deleteMany({ dataSource, symbol });
const dataGatheringItems = (await this.getSymbolsMax()) const dataGatheringItems = (await this.getSymbolsMax())
.filter((dataGatheringItem) => { .filter((dataGatheringItem) => {
return ( return (
@ -111,6 +107,7 @@ export class DataGatheringService {
await this.gatherSymbols({ await this.gatherSymbols({
dataGatheringItems, dataGatheringItems,
force: true,
priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH
}); });
} }
@ -274,9 +271,11 @@ export class DataGatheringService {
public async gatherSymbols({ public async gatherSymbols({
dataGatheringItems, dataGatheringItems,
force = false,
priority priority
}: { }: {
dataGatheringItems: DataGatheringItem[]; dataGatheringItems: DataGatheringItem[];
force?: boolean;
priority: number; priority: number;
}) { }) {
await this.addJobsToQueue( await this.addJobsToQueue(
@ -285,6 +284,7 @@ export class DataGatheringService {
data: { data: {
dataSource, dataSource,
date, date,
force,
symbol symbol
}, },
name: GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME, name: GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME,

Loading…
Cancel
Save