Browse Source

feat: add Float32Array exchange rate cache to eliminate P2028 timeout

Introduce a lazy-loaded Float32Array cache in ExchangeRateDataService
indexed by days since epoch for O(1) exchange rate lookups.

- Lazy-loads currency history via findMany on first access
- Forward-fills gaps for weekends/holidays
- pendingLoads map prevents redundant DB queries under Promise.all concurrency
- Cache is invalidated on market-data.updated events
- Emit market-data.updated from MarketDataService.updateMany
pull/6901/head
Andrea Bugeja 1 week ago
parent
commit
2cfcd7c162
  1. 158
      apps/api/src/services/exchange-rate-data/exchange-rate-data.service.ts
  2. 25
      apps/api/src/services/market-data/market-data.service.ts

158
apps/api/src/services/exchange-rate-data/exchange-rate-data.service.ts

@ -16,6 +16,7 @@ import {
} from '@ghostfolio/common/helper'; } from '@ghostfolio/common/helper';
import { Injectable, Logger } from '@nestjs/common'; import { Injectable, Logger } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { import {
eachDayOfInterval, eachDayOfInterval,
format, format,
@ -34,6 +35,8 @@ export class ExchangeRateDataService {
private currencyPairs: DataGatheringItem[] = []; private currencyPairs: DataGatheringItem[] = [];
private derivedCurrencyFactors: { [currencyPair: string]: number } = {}; private derivedCurrencyFactors: { [currencyPair: string]: number } = {};
private exchangeRates: { [currencyPair: string]: number } = {}; private exchangeRates: { [currencyPair: string]: number } = {};
private exchangeRateCache = new Map<string, Float32Array>();
private pendingLoads = new Map<string, Promise<Float32Array>>();
public constructor( public constructor(
private readonly dataProviderService: DataProviderService, private readonly dataProviderService: DataProviderService,
@ -284,56 +287,34 @@ export class ExchangeRateDataService {
} else if (derivedCurrencyFactor) { } else if (derivedCurrencyFactor) {
factor = derivedCurrencyFactor; factor = derivedCurrencyFactor;
} else { } else {
const dataSource = const marketPrice = await this.getRateFromCache(
this.dataProviderService.getDataSourceForExchangeRates(); `${aFromCurrency}${aToCurrency}`,
const symbol = `${aFromCurrency}${aToCurrency}`; aDate
);
const marketData = await this.marketDataService.get({
dataSource,
symbol,
date: aDate
});
if (marketData?.marketPrice) { if (marketPrice !== undefined) {
factor = marketData?.marketPrice; factor = marketPrice;
} else { } else {
// Calculate indirectly via base currency
let marketPriceBaseCurrencyFromCurrency: number;
let marketPriceBaseCurrencyToCurrency: number;
try { try {
if (aFromCurrency === DEFAULT_CURRENCY) { let baseFromPrice = 1;
marketPriceBaseCurrencyFromCurrency = 1; let baseToPrice = 1;
} else {
marketPriceBaseCurrencyFromCurrency = ( if (aFromCurrency !== DEFAULT_CURRENCY) {
await this.marketDataService.get({ baseFromPrice = await this.getRateFromCache(
dataSource, `${DEFAULT_CURRENCY}${aFromCurrency}`,
date: aDate, aDate
symbol: `${DEFAULT_CURRENCY}${aFromCurrency}` );
})
)?.marketPrice;
} }
} catch {}
try { if (aToCurrency !== DEFAULT_CURRENCY) {
if (aToCurrency === DEFAULT_CURRENCY) { baseToPrice = await this.getRateFromCache(
marketPriceBaseCurrencyToCurrency = 1; `${DEFAULT_CURRENCY}${aToCurrency}`,
} else { aDate
marketPriceBaseCurrencyToCurrency = ( );
await this.marketDataService.get({
dataSource,
date: aDate,
symbol: `${DEFAULT_CURRENCY}${aToCurrency}`
})
)?.marketPrice;
} }
} catch {}
// Calculate the opposite direction factor = (1 / baseFromPrice) * baseToPrice;
factor = } catch {}
(1 / marketPriceBaseCurrencyFromCurrency) *
marketPriceBaseCurrencyToCurrency;
} }
} }
@ -352,6 +333,97 @@ export class ExchangeRateDataService {
return undefined; return undefined;
} }
@OnEvent('market-data.updated')
public onMarketDataUpdated(event: { symbol: string }) {
this.exchangeRateCache.delete(event.symbol);
this.pendingLoads.delete(event.symbol);
}
private getDaysSinceEpoch(aDate: Date) {
return Math.floor(aDate.getTime() / 86400000);
}
private async loadCache(aSymbol: string): Promise<Float32Array> {
const dataSource = this.dataProviderService.getDataSourceForExchangeRates();
const marketData = await this.prismaService.marketData.findMany({
where: { dataSource, symbol: aSymbol },
orderBy: { date: 'asc' },
select: { date: true, marketPrice: true }
});
const todayDays = this.getDaysSinceEpoch(new Date());
const array = new Float32Array(todayDays + 1);
if (marketData.length > 0) {
let lastRate = marketData[0].marketPrice;
let currentIndex = this.getDaysSinceEpoch(marketData[0].date);
for (const data of marketData) {
const dataIndex = this.getDaysSinceEpoch(data.date);
while (currentIndex < dataIndex && currentIndex <= todayDays) {
array[currentIndex++] = lastRate;
}
lastRate = data.marketPrice;
array[dataIndex] = lastRate;
currentIndex = dataIndex + 1;
}
while (currentIndex <= todayDays) {
array[currentIndex++] = lastRate;
}
}
return array;
}
private async getRateFromCache(
aSymbol: string,
aDate: Date
): Promise<number | undefined> {
let cache = this.exchangeRateCache.get(aSymbol);
if (!cache) {
if (this.pendingLoads.has(aSymbol)) {
await this.pendingLoads.get(aSymbol);
cache = this.exchangeRateCache.get(aSymbol);
if (!cache) {
cache = await this.loadAndCommit(aSymbol);
}
} else {
cache = await this.loadAndCommit(aSymbol);
}
}
const days = Math.min(this.getDaysSinceEpoch(aDate), cache.length - 1);
if (days >= 0) {
const rate = cache[days];
return rate === 0 ? undefined : rate;
}
return undefined;
}
private async loadAndCommit(aSymbol: string): Promise<Float32Array> {
const loadPromise = this.loadCache(aSymbol);
this.pendingLoads.set(aSymbol, loadPromise);
try {
const cache = await loadPromise;
if (this.pendingLoads.get(aSymbol) === loadPromise) {
this.exchangeRateCache.set(aSymbol, cache);
}
return this.exchangeRateCache.get(aSymbol) ?? cache;
} finally {
if (this.pendingLoads.get(aSymbol) === loadPromise) {
this.pendingLoads.delete(aSymbol);
}
}
}
private async getExchangeRates({ private async getExchangeRates({
currencyFrom, currencyFrom,
currencyTo, currencyTo,

25
apps/api/src/services/market-data/market-data.service.ts

@ -6,6 +6,7 @@ import { resetHours } from '@ghostfolio/common/helper';
import { AssetProfileIdentifier } from '@ghostfolio/common/interfaces'; import { AssetProfileIdentifier } from '@ghostfolio/common/interfaces';
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { import {
DataSource, DataSource,
MarketData, MarketData,
@ -15,7 +16,10 @@ import {
@Injectable() @Injectable()
export class MarketDataService { export class MarketDataService {
public constructor(private readonly prismaService: PrismaService) {} public constructor(
private readonly eventEmitter: EventEmitter2,
private readonly prismaService: PrismaService
) {}
public async deleteMany({ dataSource, symbol }: AssetProfileIdentifier) { public async deleteMany({ dataSource, symbol }: AssetProfileIdentifier) {
return this.prismaService.marketData.deleteMany({ return this.prismaService.marketData.deleteMany({
@ -185,6 +189,8 @@ export class MarketDataService {
}); });
} }
}); });
this.eventEmitter.emit('market-data.updated', { symbol });
} }
public async updateAssetProfileIdentifier( public async updateAssetProfileIdentifier(
@ -211,7 +217,7 @@ export class MarketDataService {
}): Promise<MarketData> { }): Promise<MarketData> {
const { data, where } = params; const { data, where } = params;
return this.prismaService.marketData.upsert({ const result = await this.prismaService.marketData.upsert({
where, where,
create: { create: {
dataSource: where.dataSource_date_symbol.dataSource, dataSource: where.dataSource_date_symbol.dataSource,
@ -222,6 +228,12 @@ export class MarketDataService {
}, },
update: { marketPrice: data.marketPrice, state: data.state } update: { marketPrice: data.marketPrice, state: data.state }
}); });
this.eventEmitter.emit('market-data.updated', {
symbol: where.dataSource_date_symbol.symbol
});
return result;
} }
/** /**
@ -258,6 +270,13 @@ export class MarketDataService {
} }
); );
return this.prismaService.$transaction(upsertPromises); const result = await this.prismaService.$transaction(upsertPromises);
const symbols = [...new Set(data.map((d) => d.symbol as string))];
for (const symbol of symbols) {
this.eventEmitter.emit('market-data.updated', { symbol });
}
return result;
} }
} }

Loading…
Cancel
Save