Browse Source

fix: handle null exchange rates and optimize backfill loop to prevent event loop stalls

- Replace throw/catch pattern with direct if/else checks to avoid V8 stack trace overhead
- Add setImmediate yield every 500 iterations to unblock Bull heartbeat
- Downgrade missing rate logs from error to debug level
- Add dates array support for targeted queries instead of full date ranges
pull/6912/head
Andrea Bugeja 5 days ago
parent
commit
8643fb3995
  1. 37
      apps/api/src/app/portfolio/current-rate.service.ts
  2. 82
      apps/api/src/services/exchange-rate-data/exchange-rate-data.service.ts

37
apps/api/src/app/portfolio/current-rate.service.ts

@ -24,8 +24,6 @@ import { GetValuesParams } from './interfaces/get-values-params.interface';
@Injectable() @Injectable()
export class CurrentRateService { export class CurrentRateService {
private static readonly MARKET_DATA_PAGE_SIZE = 50000;
public constructor( public constructor(
private readonly activitiesService: ActivitiesService, private readonly activitiesService: ActivitiesService,
private readonly dataProviderService: DataProviderService, private readonly dataProviderService: DataProviderService,
@ -84,32 +82,27 @@ export class CurrentRateService {
return { dataSource, symbol }; return { dataSource, symbol };
}); });
const marketDataCount = await this.marketDataService.getRangeCount({ // Fetch each asset profile individually to use the composite index efficiently
assetProfileIdentifiers, // Process in batches of 10 to avoid overwhelming the database
dateQuery const batchSize = 10;
}); for (let i = 0; i < assetProfileIdentifiers.length; i += batchSize) {
const batch = assetProfileIdentifiers.slice(i, i + batchSize);
for ( const promises = batch.map(async (assetProfile) => {
let i = 0;
i < marketDataCount;
i += CurrentRateService.MARKET_DATA_PAGE_SIZE
) {
// Use page size to limit the number of records fetched at once
const data = await this.marketDataService.getRange({ const data = await this.marketDataService.getRange({
assetProfileIdentifiers, assetProfileIdentifiers: [assetProfile],
dateQuery, dateQuery
skip: i,
take: CurrentRateService.MARKET_DATA_PAGE_SIZE
}); });
return data.map(({ dataSource, date, marketPrice, symbol }) => ({
values.push(
...data.map(({ dataSource, date, marketPrice, symbol }) => ({
dataSource, dataSource,
date, date,
marketPrice, marketPrice,
symbol symbol
})) }));
); });
const results = await Promise.all(promises);
for (const result of results) {
values.push(...result);
}
} }
const response: GetValuesObject = { const response: GetValuesObject = {

82
apps/api/src/services/exchange-rate-data/exchange-rate-data.service.ts

@ -53,16 +53,18 @@ export class ExchangeRateDataService {
@LogPerformance @LogPerformance
public async getExchangeRatesByCurrency({ public async getExchangeRatesByCurrency({
currencies, currencies,
dates,
endDate = new Date(), endDate = new Date(),
startDate, startDate,
targetCurrency targetCurrency
}: { }: {
currencies: string[]; currencies: string[];
dates?: Date[];
endDate?: Date; endDate?: Date;
startDate: Date; startDate?: Date;
targetCurrency: string; targetCurrency: string;
}): Promise<ExchangeRatesByCurrency> { }): Promise<ExchangeRatesByCurrency> {
if (!startDate) { if (!startDate && !dates?.length) {
return {}; return {};
} }
@ -73,6 +75,8 @@ export class ExchangeRateDataService {
for (const currency of currencies) { for (const currency of currencies) {
exchangeRatesByCurrency[`${currency}${targetCurrency}`] = exchangeRatesByCurrency[`${currency}${targetCurrency}`] =
await this.getExchangeRates({ await this.getExchangeRates({
dates,
endDate,
startDate, startDate,
currencyFrom: currency, currencyFrom: currency,
currencyTo: targetCurrency currencyTo: targetCurrency
@ -90,11 +94,14 @@ export class ExchangeRateDataService {
lastDateString lastDateString
] ?? 1; ] ?? 1;
const loopStartDate =
startDate || dates?.reduce((min, d) => (d < min ? d : min), dates[0]);
// Start from the most recent date and fill in missing exchange rates // Start from the most recent date and fill in missing exchange rates
// using the latest available rate // using the latest available rate
for ( for (
let date = endDate; let date = endDate;
!isBefore(date, startDate); loopStartDate && !isBefore(date, loopStartDate);
date = subDays(resetHours(date), 1) date = subDays(resetHours(date), 1)
) { ) {
const dateString = format(date, DATE_FORMAT); const dateString = format(date, DATE_FORMAT);
@ -110,7 +117,7 @@ export class ExchangeRateDataService {
previousExchangeRate; previousExchangeRate;
if (currency === DEFAULT_CURRENCY && isBefore(date, new Date())) { if (currency === DEFAULT_CURRENCY && isBefore(date, new Date())) {
Logger.error( Logger.debug(
`No exchange rate has been found for ${currency}${targetCurrency} at ${dateString}`, `No exchange rate has been found for ${currency}${targetCurrency} at ${dateString}`,
'ExchangeRateDataService' 'ExchangeRateDataService'
); );
@ -355,19 +362,22 @@ export class ExchangeRateDataService {
private async getExchangeRates({ private async getExchangeRates({
currencyFrom, currencyFrom,
currencyTo, currencyTo,
dates,
endDate = new Date(), endDate = new Date(),
startDate startDate
}: { }: {
currencyFrom: string; currencyFrom: string;
currencyTo: string; currencyTo: string;
dates?: Date[];
endDate?: Date; endDate?: Date;
startDate: Date; startDate?: Date;
}) { }) {
const dates = eachDayOfInterval({ end: endDate, start: startDate }); const datesToProcess =
dates ?? eachDayOfInterval({ end: endDate, start: startDate });
const factors: { [dateString: string]: number } = {}; const factors: { [dateString: string]: number } = {};
if (currencyFrom === currencyTo) { if (currencyFrom === currencyTo) {
for (const date of dates) { for (const date of datesToProcess) {
factors[format(date, DATE_FORMAT)] = 1; factors[format(date, DATE_FORMAT)] = 1;
} }
@ -378,7 +388,7 @@ export class ExchangeRateDataService {
this.derivedCurrencyFactors[`${currencyFrom}${currencyTo}`]; this.derivedCurrencyFactors[`${currencyFrom}${currencyTo}`];
if (derivedCurrencyFactor) { if (derivedCurrencyFactor) {
for (const date of dates) { for (const date of datesToProcess) {
factors[format(date, DATE_FORMAT)] = derivedCurrencyFactor; factors[format(date, DATE_FORMAT)] = derivedCurrencyFactor;
} }
@ -395,7 +405,8 @@ export class ExchangeRateDataService {
symbol symbol
} }
], ],
dateQuery: { gte: startDate, lt: endDate } dateQuery:
dates?.length > 0 ? { in: dates } : { gte: startDate, lt: endDate }
}); });
if (marketData?.length > 0) { if (marketData?.length > 0) {
@ -414,7 +425,7 @@ export class ExchangeRateDataService {
try { try {
if (currencyFrom === DEFAULT_CURRENCY) { if (currencyFrom === DEFAULT_CURRENCY) {
for (const date of dates) { for (const date of datesToProcess) {
marketPriceBaseCurrencyFromCurrency[format(date, DATE_FORMAT)] = 1; marketPriceBaseCurrencyFromCurrency[format(date, DATE_FORMAT)] = 1;
} }
} else { } else {
@ -425,7 +436,10 @@ export class ExchangeRateDataService {
symbol: `${DEFAULT_CURRENCY}${currencyFrom}` symbol: `${DEFAULT_CURRENCY}${currencyFrom}`
} }
], ],
dateQuery: { gte: startDate, lt: endDate } dateQuery:
dates?.length > 0
? { in: dates }
: { gte: startDate, lt: endDate }
}); });
for (const { date, marketPrice } of marketData) { for (const { date, marketPrice } of marketData) {
@ -437,7 +451,7 @@ export class ExchangeRateDataService {
try { try {
if (currencyTo === DEFAULT_CURRENCY) { if (currencyTo === DEFAULT_CURRENCY) {
for (const date of dates) { for (const date of datesToProcess) {
marketPriceBaseCurrencyToCurrency[format(date, DATE_FORMAT)] = 1; marketPriceBaseCurrencyToCurrency[format(date, DATE_FORMAT)] = 1;
} }
} else { } else {
@ -448,10 +462,10 @@ export class ExchangeRateDataService {
symbol: `${DEFAULT_CURRENCY}${currencyTo}` symbol: `${DEFAULT_CURRENCY}${currencyTo}`
} }
], ],
dateQuery: { dateQuery:
gte: startDate, dates?.length > 0
lt: endDate ? { in: dates }
} : { gte: startDate, lt: endDate }
}); });
for (const { date, marketPrice } of marketData) { for (const { date, marketPrice } of marketData) {
@ -461,30 +475,32 @@ export class ExchangeRateDataService {
} }
} catch {} } catch {}
for (const date of dates) { for (let i = 0; i < datesToProcess.length; i++) {
try { if (i % 500 === 0 && process.env.NODE_ENV !== 'test') {
const factor = await new Promise((resolve) => setImmediate(resolve));
(1 / }
marketPriceBaseCurrencyFromCurrency[format(date, DATE_FORMAT)]) *
marketPriceBaseCurrencyToCurrency[format(date, DATE_FORMAT)];
if (isNaN(factor)) { const date = datesToProcess[i];
throw new Error('Exchange rate is not a number'); const dateString = format(date, DATE_FORMAT);
} else {
factors[format(date, DATE_FORMAT)] = factor; const priceFrom = marketPriceBaseCurrencyFromCurrency[dateString];
const priceTo = marketPriceBaseCurrencyToCurrency[dateString];
if (priceFrom && priceTo) {
const factor = (1 / priceFrom) * priceTo;
if (!isNaN(factor) && isFinite(factor)) {
factors[dateString] = factor;
continue;
} }
} catch { }
let errorMessage = `No exchange rate has been found for ${currencyFrom}${currencyTo} at ${format(
date, let errorMessage = `No exchange rate has been found for ${currencyFrom}${currencyTo} at ${dateString}. Please complement market data for ${DEFAULT_CURRENCY}${currencyFrom}`;
DATE_FORMAT
)}. Please complement market data for ${DEFAULT_CURRENCY}${currencyFrom}`;
if (DEFAULT_CURRENCY !== currencyTo) { if (DEFAULT_CURRENCY !== currencyTo) {
errorMessage = `${errorMessage} and ${DEFAULT_CURRENCY}${currencyTo}`; errorMessage = `${errorMessage} and ${DEFAULT_CURRENCY}${currencyTo}`;
} }
Logger.error(`${errorMessage}.`, 'ExchangeRateDataService'); Logger.debug(`${errorMessage}.`, 'ExchangeRateDataService');
}
} }
} }

Loading…
Cancel
Save