Browse Source

perf: yield to event loop and optimize portfolio calculator to prevent Bull job stalls

- Add setImmediate yield in computeTransactionPoints, computeSnapshot, and getSymbolMetrics loops
- Make computeTransactionPoints async to allow yielding during heavy activity processing
- Add skipInitialize flag to prevent double initialization in portfolio snapshot processor
- Restore MAX_CHART_ITEMS downsampling to keep chart dates manageable
pull/6912/head
Andrea Bugeja 5 days ago
parent
commit
5271825760
  1. 2
      apps/api/src/app/portfolio/calculator/mwr/portfolio-calculator.ts
  2. 14
      apps/api/src/app/portfolio/calculator/portfolio-calculator.factory.ts
  3. 98
      apps/api/src/app/portfolio/calculator/portfolio-calculator.ts
  4. 25
      apps/api/src/app/portfolio/calculator/roai/portfolio-calculator.ts
  5. 2
      apps/api/src/app/portfolio/calculator/roi/portfolio-calculator.ts
  6. 2
      apps/api/src/app/portfolio/calculator/twr/portfolio-calculator.ts
  7. 1
      apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.processor.ts

2
apps/api/src/app/portfolio/calculator/mwr/portfolio-calculator.ts

@ -23,7 +23,7 @@ export class MwrPortfolioCalculator extends PortfolioCalculator {
}; };
start: Date; start: Date;
step?: number; step?: number;
} & AssetProfileIdentifier): SymbolMetrics { } & AssetProfileIdentifier): Promise<SymbolMetrics> {
throw new Error('Method not implemented.'); throw new Error('Method not implemented.');
} }
} }

14
apps/api/src/app/portfolio/calculator/portfolio-calculator.factory.ts

@ -34,6 +34,7 @@ export class PortfolioCalculatorFactory {
calculationType, calculationType,
currency, currency,
filters = [], filters = [],
skipInitialize = false,
userId userId
}: { }: {
accountBalanceItems?: HistoricalDataItem[]; accountBalanceItems?: HistoricalDataItem[];
@ -41,6 +42,7 @@ export class PortfolioCalculatorFactory {
calculationType: PerformanceCalculationType; calculationType: PerformanceCalculationType;
currency: string; currency: string;
filters?: Filter[]; filters?: Filter[];
skipInitialize?: boolean;
userId: string; userId: string;
}): PortfolioCalculator { }): PortfolioCalculator {
switch (calculationType) { switch (calculationType) {
@ -55,7 +57,8 @@ export class PortfolioCalculatorFactory {
currentRateService: this.currentRateService, currentRateService: this.currentRateService,
exchangeRateDataService: this.exchangeRateDataService, exchangeRateDataService: this.exchangeRateDataService,
portfolioSnapshotService: this.portfolioSnapshotService, portfolioSnapshotService: this.portfolioSnapshotService,
redisCacheService: this.redisCacheService redisCacheService: this.redisCacheService,
skipInitialize
}); });
case PerformanceCalculationType.ROAI: case PerformanceCalculationType.ROAI:
@ -69,7 +72,8 @@ export class PortfolioCalculatorFactory {
currentRateService: this.currentRateService, currentRateService: this.currentRateService,
exchangeRateDataService: this.exchangeRateDataService, exchangeRateDataService: this.exchangeRateDataService,
portfolioSnapshotService: this.portfolioSnapshotService, portfolioSnapshotService: this.portfolioSnapshotService,
redisCacheService: this.redisCacheService redisCacheService: this.redisCacheService,
skipInitialize
}); });
case PerformanceCalculationType.ROI: case PerformanceCalculationType.ROI:
@ -83,7 +87,8 @@ export class PortfolioCalculatorFactory {
currentRateService: this.currentRateService, currentRateService: this.currentRateService,
exchangeRateDataService: this.exchangeRateDataService, exchangeRateDataService: this.exchangeRateDataService,
portfolioSnapshotService: this.portfolioSnapshotService, portfolioSnapshotService: this.portfolioSnapshotService,
redisCacheService: this.redisCacheService redisCacheService: this.redisCacheService,
skipInitialize
}); });
case PerformanceCalculationType.TWR: case PerformanceCalculationType.TWR:
@ -97,7 +102,8 @@ export class PortfolioCalculatorFactory {
currentRateService: this.currentRateService, currentRateService: this.currentRateService,
exchangeRateDataService: this.exchangeRateDataService, exchangeRateDataService: this.exchangeRateDataService,
portfolioSnapshotService: this.portfolioSnapshotService, portfolioSnapshotService: this.portfolioSnapshotService,
redisCacheService: this.redisCacheService redisCacheService: this.redisCacheService,
skipInitialize
}); });
default: default:

98
apps/api/src/app/portfolio/calculator/portfolio-calculator.ts

@ -59,6 +59,13 @@ import {
} from 'date-fns'; } from 'date-fns';
import { isNumber, sortBy, sum, uniqBy } from 'lodash'; import { isNumber, sortBy, sum, uniqBy } from 'lodash';
const yieldToEventLoop = async () => {
if (process.env.NODE_ENV === 'test') {
return;
}
await new Promise((resolve) => setImmediate(resolve));
};
export abstract class PortfolioCalculator { export abstract class PortfolioCalculator {
protected static readonly ENABLE_LOGGING = false; protected static readonly ENABLE_LOGGING = false;
@ -90,6 +97,7 @@ export abstract class PortfolioCalculator {
filters, filters,
portfolioSnapshotService, portfolioSnapshotService,
redisCacheService, redisCacheService,
skipInitialize = false,
userId userId
}: { }: {
accountBalanceItems: HistoricalDataItem[]; accountBalanceItems: HistoricalDataItem[];
@ -101,6 +109,7 @@ export abstract class PortfolioCalculator {
filters: Filter[]; filters: Filter[];
portfolioSnapshotService: PortfolioSnapshotService; portfolioSnapshotService: PortfolioSnapshotService;
redisCacheService: RedisCacheService; redisCacheService: RedisCacheService;
skipInitialize?: boolean;
userId: string; userId: string;
}) { }) {
this.accountBalanceItems = accountBalanceItems; this.accountBalanceItems = accountBalanceItems;
@ -166,9 +175,9 @@ export abstract class PortfolioCalculator {
this.endDate = endOfDay(endDate); this.endDate = endOfDay(endDate);
this.startDate = startOfDay(startDate); this.startDate = startOfDay(startDate);
this.computeTransactionPoints(); if (!skipInitialize) {
this.snapshotPromise = this.initialize();
this.snapshotPromise = this.initialize(); }
} }
protected abstract calculateOverallPerformance( protected abstract calculateOverallPerformance(
@ -177,6 +186,11 @@ export abstract class PortfolioCalculator {
@LogPerformance @LogPerformance
public async computeSnapshot(): Promise<PortfolioSnapshot> { public async computeSnapshot(): Promise<PortfolioSnapshot> {
console.log('[Trace] computeSnapshot started');
if (!this.transactionPoints) {
await this.computeTransactionPoints();
}
const lastTransactionPoint = this.transactionPoints.at(-1); const lastTransactionPoint = this.transactionPoints.at(-1);
const transactionPoints = this.transactionPoints?.filter(({ date }) => { const transactionPoints = this.transactionPoints?.filter(({ date }) => {
@ -234,6 +248,8 @@ export abstract class PortfolioCalculator {
} }
} }
Logger.log('Fetching exchange rates...', 'Trace');
const t1 = Date.now();
const exchangeRatesByCurrency = const exchangeRatesByCurrency =
await this.exchangeRateDataService.getExchangeRatesByCurrency({ await this.exchangeRateDataService.getExchangeRatesByCurrency({
currencies: Array.from(new Set(Object.values(currencies))), currencies: Array.from(new Set(Object.values(currencies))),
@ -242,6 +258,13 @@ export abstract class PortfolioCalculator {
targetCurrency: this.currency targetCurrency: this.currency
}); });
Logger.log(
'Exchange rates fetched in ' +
(Date.now() - t1) +
'ms. Fetching market data...',
'Trace'
);
const t2 = Date.now();
const { const {
dataProviderInfos, dataProviderInfos,
errors: currentRateErrors, errors: currentRateErrors,
@ -256,6 +279,13 @@ export abstract class PortfolioCalculator {
this.dataProviderInfos = dataProviderInfos; this.dataProviderInfos = dataProviderInfos;
Logger.log(
'Market data fetched in ' +
(Date.now() - t2) +
'ms. Processing symbols...',
'Trace'
);
const t3 = Date.now();
const marketSymbolMap: { const marketSymbolMap: {
[date: string]: { [symbol: string]: Big }; [date: string]: { [symbol: string]: Big };
} = {}; } = {};
@ -294,6 +324,13 @@ export abstract class PortfolioCalculator {
chartDateMap[accountBalanceItem.date] = true; chartDateMap[accountBalanceItem.date] = true;
} }
Logger.log(
'Symbols processed in ' +
(Date.now() - t3) +
'ms. Processing positions...',
'Trace'
);
console.log('t4', Date.now());
const chartDates = sortBy(Object.keys(chartDateMap), (chartDate) => { const chartDates = sortBy(Object.keys(chartDateMap), (chartDate) => {
return chartDate; return chartDate;
}); });
@ -338,7 +375,13 @@ export abstract class PortfolioCalculator {
}; };
} = {}; } = {};
for (const item of lastTransactionPoint.items) { Logger.log('Starting symbol metrics loop...', 'Trace');
console.log('t5', Date.now());
for (let i = 0; i < lastTransactionPoint.items.length; i++) {
if (i % 5 === 0) {
await yieldToEventLoop();
}
const item = lastTransactionPoint.items[i];
const marketPriceInBaseCurrency = ( const marketPriceInBaseCurrency = (
marketSymbolMap[endDateString]?.[item.symbol] ?? item.averagePrice marketSymbolMap[endDateString]?.[item.symbol] ?? item.averagePrice
).mul( ).mul(
@ -374,7 +417,7 @@ export abstract class PortfolioCalculator {
totalInvestment, totalInvestment,
totalInvestmentWithCurrencyEffect, totalInvestmentWithCurrencyEffect,
totalLiabilitiesInBaseCurrency totalLiabilitiesInBaseCurrency
} = this.getSymbolMetrics({ } = await this.getSymbolMetrics({
chartDateMap, chartDateMap,
marketSymbolMap, marketSymbolMap,
dataSource: item.dataSource, dataSource: item.dataSource,
@ -483,7 +526,11 @@ export abstract class PortfolioCalculator {
let lastKnownBalance = new Big(0); let lastKnownBalance = new Big(0);
for (const dateString of chartDates) { for (let c = 0; c < chartDates.length; c++) {
if (c % 100 === 0) {
await yieldToEventLoop();
}
const dateString = chartDates[c];
if (accountBalanceItemsMap[dateString] !== undefined) { if (accountBalanceItemsMap[dateString] !== undefined) {
// If there's an exact balance for this date, update lastKnownBalance // If there's an exact balance for this date, update lastKnownBalance
lastKnownBalance = accountBalanceItemsMap[dateString]; lastKnownBalance = accountBalanceItemsMap[dateString];
@ -831,7 +878,7 @@ export abstract class PortfolioCalculator {
[date: string]: { [symbol: string]: Big }; [date: string]: { [symbol: string]: Big };
}; };
start: Date; start: Date;
} & AssetProfileIdentifier): SymbolMetrics; } & AssetProfileIdentifier): Promise<SymbolMetrics>;
public getTransactionPoints() { public getTransactionPoints() {
return this.transactionPoints; return this.transactionPoints;
@ -924,23 +971,38 @@ export abstract class PortfolioCalculator {
} }
@LogPerformance @LogPerformance
private computeTransactionPoints() { protected async computeTransactionPoints() {
console.log(
'[Trace] computeTransactionPoints started, activities count: ' +
this.activities.length
);
this.transactionPoints = []; this.transactionPoints = [];
const symbols: { [symbol: string]: TransactionPointSymbol } = {}; const symbols: { [symbol: string]: TransactionPointSymbol } = {};
let lastDate: string = null; let lastDate: string = null;
let lastTransactionPoint: TransactionPoint = null; let lastTransactionPoint: TransactionPoint = null;
for (const { for (let i = 0; i < this.activities.length; i++) {
date, if (i % 500 === 0) {
fee, console.log(
feeInBaseCurrency, '[Trace] computeTransactionPoints progress: ' +
quantity, i +
SymbolProfile, '/' +
tags, this.activities.length
type, );
unitPrice await yieldToEventLoop();
} of this.activities) { }
const {
date,
fee,
feeInBaseCurrency,
quantity,
SymbolProfile,
tags,
type,
unitPrice
} = this.activities[i];
let currentTransactionPointItem: TransactionPointSymbol; let currentTransactionPointItem: TransactionPointSymbol;
const assetSubClass = SymbolProfile.assetSubClass; const assetSubClass = SymbolProfile.assetSubClass;

25
apps/api/src/app/portfolio/calculator/roai/portfolio-calculator.ts

@ -23,6 +23,13 @@ import {
} from 'date-fns'; } from 'date-fns';
import { cloneDeep, sortBy } from 'lodash'; import { cloneDeep, sortBy } from 'lodash';
const yieldToEventLoop = async () => {
if (process.env.NODE_ENV === 'test') {
return;
}
await new Promise((resolve) => setImmediate(resolve));
};
export class RoaiPortfolioCalculator extends PortfolioCalculator { export class RoaiPortfolioCalculator extends PortfolioCalculator {
private chartDates: string[]; private chartDates: string[];
@ -127,7 +134,7 @@ export class RoaiPortfolioCalculator extends PortfolioCalculator {
return PerformanceCalculationType.ROAI; return PerformanceCalculationType.ROAI;
} }
protected getSymbolMetrics({ protected async getSymbolMetrics({
chartDateMap, chartDateMap,
dataSource, dataSource,
end, end,
@ -143,7 +150,7 @@ export class RoaiPortfolioCalculator extends PortfolioCalculator {
[date: string]: { [symbol: string]: Big }; [date: string]: { [symbol: string]: Big };
}; };
start: Date; start: Date;
} & AssetProfileIdentifier): SymbolMetrics { } & AssetProfileIdentifier): Promise<SymbolMetrics> {
const currentExchangeRate = exchangeRates[format(new Date(), DATE_FORMAT)]; const currentExchangeRate = exchangeRates[format(new Date(), DATE_FORMAT)];
const currentValues: { [date: string]: Big } = {}; const currentValues: { [date: string]: Big } = {};
const currentValuesWithCurrencyEffect: { [date: string]: Big } = {}; const currentValuesWithCurrencyEffect: { [date: string]: Big } = {};
@ -345,7 +352,11 @@ export class RoaiPortfolioCalculator extends PortfolioCalculator {
this.chartDates = Object.keys(chartDateMap).sort(); this.chartDates = Object.keys(chartDateMap).sort();
} }
for (const dateString of this.chartDates) { for (let d = 0; d < this.chartDates.length; d++) {
if (d % 500 === 0) {
await yieldToEventLoop();
}
const dateString = this.chartDates[d];
if (dateString < startDateString) { if (dateString < startDateString) {
continue; continue;
} else if (dateString > endDateString) { } else if (dateString > endDateString) {
@ -408,6 +419,10 @@ export class RoaiPortfolioCalculator extends PortfolioCalculator {
let sumOfTimeWeightedInvestmentsWithCurrencyEffect = new Big(0); let sumOfTimeWeightedInvestmentsWithCurrencyEffect = new Big(0);
for (let i = 0; i < orders.length; i += 1) { for (let i = 0; i < orders.length; i += 1) {
if (i % 500 === 0) {
await yieldToEventLoop();
}
const order = orders[i]; const order = orders[i];
if (PortfolioCalculator.ENABLE_LOGGING) { if (PortfolioCalculator.ENABLE_LOGGING) {
@ -887,6 +902,10 @@ export class RoaiPortfolioCalculator extends PortfolioCalculator {
let dayCount = 0; let dayCount = 0;
for (let i = this.chartDates.length - 1; i >= 0; i -= 1) { for (let i = this.chartDates.length - 1; i >= 0; i -= 1) {
if (i % 500 === 0) {
await yieldToEventLoop();
}
const date = this.chartDates[i]; const date = this.chartDates[i];
if (date > rangeEndDateString) { if (date > rangeEndDateString) {

2
apps/api/src/app/portfolio/calculator/roi/portfolio-calculator.ts

@ -23,7 +23,7 @@ export class RoiPortfolioCalculator extends PortfolioCalculator {
}; };
start: Date; start: Date;
step?: number; step?: number;
} & AssetProfileIdentifier): SymbolMetrics { } & AssetProfileIdentifier): Promise<SymbolMetrics> {
throw new Error('Method not implemented.'); throw new Error('Method not implemented.');
} }
} }

2
apps/api/src/app/portfolio/calculator/twr/portfolio-calculator.ts

@ -23,7 +23,7 @@ export class TwrPortfolioCalculator extends PortfolioCalculator {
}; };
start: Date; start: Date;
step?: number; step?: number;
} & AssetProfileIdentifier): SymbolMetrics { } & AssetProfileIdentifier): Promise<SymbolMetrics> {
throw new Error('Method not implemented.'); throw new Error('Method not implemented.');
} }
} }

1
apps/api/src/services/queues/portfolio-snapshot/portfolio-snapshot.processor.ts

@ -67,6 +67,7 @@ export class PortfolioSnapshotProcessor {
calculationType: job.data.calculationType, calculationType: job.data.calculationType,
currency: job.data.userCurrency, currency: job.data.userCurrency,
filters: job.data.filters, filters: job.data.filters,
skipInitialize: true,
userId: job.data.userId userId: job.data.userId
}); });

Loading…
Cancel
Save