Browse Source

perf: batch and chunk database queries to prevent Prisma P2029 limits

- Split large market data queries into batches of 10 to use composite indexes efficiently
- Chunk activities queries to avoid exceeding Prisma parameter limits
pull/6912/head
Andrea Bugeja 5 days ago
parent
commit
ddb05e3f4e
  1. 369
      apps/api/src/app/activities/activities.service.ts
  2. 20
      apps/api/src/services/market-data/market-data.service.ts

369
apps/api/src/app/activities/activities.service.ts

@ -11,12 +11,16 @@ import { DataGatheringService } from '@ghostfolio/api/services/queues/data-gathe
import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile/symbol-profile.service';
import {
DATA_GATHERING_QUEUE_PRIORITY_HIGH,
DEFAULT_CURRENCY,
GATHER_ASSET_PROFILE_PROCESS_JOB_NAME,
GATHER_ASSET_PROFILE_PROCESS_JOB_OPTIONS,
ghostfolioPrefix,
TAG_ID_EXCLUDE_FROM_ANALYSIS
ghostfolioPrefix
} from '@ghostfolio/common/config';
import { getAssetProfileIdentifier } from '@ghostfolio/common/helper';
import {
DATE_FORMAT,
getAssetProfileIdentifier,
resetHours
} from '@ghostfolio/common/helper';
import {
ActivitiesResponse,
Activity,
@ -39,8 +43,8 @@ import {
} from '@prisma/client';
import { Big } from 'big.js';
import { isUUID } from 'class-validator';
import { endOfToday, isAfter } from 'date-fns';
import { groupBy, uniqBy } from 'lodash';
import { endOfToday, format, isAfter, subDays } from 'date-fns';
import { uniqBy } from 'lodash';
import { randomUUID } from 'node:crypto';
@Injectable()
@ -470,7 +474,7 @@ export class ActivitiesService {
sortColumn,
sortDirection = 'asc',
startDate,
take = Number.MAX_SAFE_INTEGER,
take,
types,
userCurrency,
userId,
@ -489,166 +493,112 @@ export class ActivitiesService {
userId: string;
withExcludedAccountsAndActivities?: boolean;
}): Promise<ActivitiesResponse> {
let orderBy: Prisma.Enumerable<Prisma.OrderOrderByWithRelationInput> = [
{ date: 'asc' }
];
const where: Prisma.OrderWhereInput = { userId };
if (endDate || startDate) {
where.AND = [];
if (endDate) {
where.AND.push({ date: { lte: endDate } });
}
const where: Prisma.OrderWhereInput = {
...(includeDrafts === false ? { isDraft: false } : {}),
...(types?.length > 0 ? { type: { in: types } } : {}),
...(userId ? { userId } : {})
};
if (startDate) {
where.AND.push({ date: { gt: startDate } });
}
where.date = { gte: resetHours(startDate) };
}
const {
ACCOUNT: filtersByAccount,
ASSET_CLASS: filtersByAssetClass,
TAG: filtersByTag
} = groupBy(filters, ({ type }) => {
return type;
});
const filterByDataSource = filters?.find(({ type }) => {
return type === 'DATA_SOURCE';
})?.id;
const filterBySymbol = filters?.find(({ type }) => {
return type === 'SYMBOL';
})?.id;
const searchQuery = filters?.find(({ type }) => {
return type === 'SEARCH_QUERY';
})?.id;
if (filtersByAccount?.length > 0) {
where.accountId = {
in: filtersByAccount.map(({ id }) => {
return id;
})
if (endDate) {
where.date = {
...(where.date as Prisma.DateTimeFilter),
lte: resetHours(endDate)
};
}
if (includeDrafts === false) {
where.isDraft = false;
if (withExcludedAccountsAndActivities === false) {
where.account = { isExcluded: false };
}
if (filtersByAssetClass?.length > 0) {
where.SymbolProfile = {
OR: [
{
AND: [
{
OR: filtersByAssetClass.map(({ id }) => {
return { assetClass: AssetClass[id] };
})
},
{
OR: [
{ SymbolProfileOverrides: { is: null } },
{ SymbolProfileOverrides: { assetClass: null } }
]
}
]
},
{
SymbolProfileOverrides: {
OR: filtersByAssetClass.map(({ id }) => {
return { assetClass: AssetClass[id] };
})
}
}
]
};
if (filters?.length > 0) {
where.OR = [];
const accountIds = filters
.filter(({ type }) => type === 'ACCOUNT')
.map(({ id }) => id);
if (accountIds.length > 0) {
where.OR.push({ accountId: { in: accountIds } });
}
if (filterByDataSource && filterBySymbol) {
if (where.SymbolProfile) {
where.SymbolProfile = {
AND: [
where.SymbolProfile,
{
AND: [
{ dataSource: filterByDataSource as DataSource },
{ symbol: filterBySymbol }
]
const assetClasses = filters
.filter(({ type }) => type === 'ASSET_CLASS')
.map(({ id }) => id) as AssetClass[];
if (assetClasses.length > 0) {
where.OR.push({ SymbolProfile: { assetClass: { in: assetClasses } } });
}
]
};
} else {
where.SymbolProfile = {
AND: [
{ dataSource: filterByDataSource as DataSource },
{ symbol: filterBySymbol }
]
};
const tags = filters
.filter(({ type }) => type === 'TAG')
.map(({ id }) => id);
if (tags.length > 0) {
where.OR.push({ tags: { some: { id: { in: tags } } } });
}
}
if (searchQuery) {
const searchQueryWhereInput: Prisma.SymbolProfileWhereInput[] = [
{ id: { mode: 'insensitive', startsWith: searchQuery } },
{ isin: { mode: 'insensitive', startsWith: searchQuery } },
{ name: { mode: 'insensitive', startsWith: searchQuery } },
{ symbol: { mode: 'insensitive', startsWith: searchQuery } }
let orderBy: Prisma.OrderOrderByWithRelationInput[] = [
{ date: sortDirection }
];
if (where.SymbolProfile) {
where.SymbolProfile = {
AND: [
where.SymbolProfile,
{
OR: searchQueryWhereInput
}
]
};
if (sortColumn) {
orderBy = [];
if (
['currency', 'fee', 'quantity', 'type', 'unitPrice'].includes(
sortColumn
)
) {
orderBy.push({ [sortColumn]: sortDirection });
} else {
where.SymbolProfile = {
OR: searchQueryWhereInput
};
if (sortColumn === 'SymbolProfile.name') {
orderBy.push({ SymbolProfile: { name: sortDirection } });
} else if (sortColumn === 'account.name') {
orderBy.push({ account: { name: sortDirection } });
}
}
if (filtersByTag?.length > 0) {
where.tags = {
some: {
OR: filtersByTag.map(({ id }) => {
return { id };
})
}
};
}
if (sortColumn) {
orderBy = [{ [sortColumn]: sortDirection }];
}
const count = await this.prismaService.order.count({ where });
if (types?.length > 0) {
where.type = { in: types };
}
let orders: OrderWithAccount[] = [];
if (withExcludedAccountsAndActivities === false) {
where.OR = [
{ account: null },
{ account: { NOT: { isExcluded: true } } }
];
// If take is undefined and count is extremely large, batch fetch to prevent Prisma P2029 limits
const BATCH_SIZE = 5000;
if (take === undefined && count > BATCH_SIZE) {
let currentSkip = skip || 0;
const totalToFetch = count - currentSkip;
where.tags = {
...where.tags,
none: {
id: TAG_ID_EXCLUDE_FROM_ANALYSIS
while (orders.length < totalToFetch) {
const batch = await this.orders({
skip: currentSkip,
take: BATCH_SIZE,
where,
include: {
account: {
include: {
platform: true
}
};
},
// eslint-disable-next-line @typescript-eslint/naming-convention
SymbolProfile: true,
tags: true
},
orderBy: [...orderBy, { id: sortDirection }]
});
if (batch.length === 0) {
break;
}
const [orders, count] = await Promise.all([
this.orders({
orders = orders.concat(batch);
currentSkip += batch.length;
}
} else {
orders = await this.orders({
skip,
take,
where,
@ -663,9 +613,8 @@ export class ActivitiesService {
tags: true
},
orderBy: [...orderBy, { id: sortDirection }]
}),
this.prismaService.order.count({ where })
]);
});
}
const assetProfileIdentifiers = uniqBy(
orders.map(({ SymbolProfile }) => {
@ -686,8 +635,122 @@ export class ActivitiesService {
assetProfileIdentifiers
);
const activities = await Promise.all(
orders.map(async (order) => {
let exchangeRatesToUser: any = {};
let exchangeRatesToDefault: any = {};
if (orders.length > 0) {
let minDate = orders[0].date;
let maxDate = orders[0].date;
const uniqueCurrencies = new Set<string>();
uniqueCurrencies.add(userCurrency);
uniqueCurrencies.add(DEFAULT_CURRENCY);
const uniqueDatesSet = new Set<number>();
for (const order of orders) {
if (order.date < minDate) {
minDate = order.date;
}
if (order.date > maxDate) {
maxDate = order.date;
}
uniqueDatesSet.add(resetHours(order.date).getTime());
if (order.currency) {
uniqueCurrencies.add(order.currency);
}
if (order.SymbolProfile?.currency) {
uniqueCurrencies.add(order.SymbolProfile.currency);
}
}
const currenciesList = Array.from(uniqueCurrencies).filter(Boolean);
const uniqueDates = Array.from(uniqueDatesSet).map(
(time) => new Date(time)
);
const startDatePreload = subDays(resetHours(minDate), 1);
const endDatePreload = resetHours(maxDate);
const [ratesUser, ratesDefault] = await Promise.all([
this.exchangeRateDataService.getExchangeRatesByCurrency({
currencies: currenciesList,
dates: uniqueDates,
endDate: endDatePreload,
startDate: startDatePreload,
targetCurrency: userCurrency
}),
this.exchangeRateDataService.getExchangeRatesByCurrency({
currencies: currenciesList,
dates: uniqueDates,
endDate: endDatePreload,
startDate: startDatePreload,
targetCurrency: DEFAULT_CURRENCY
})
]);
exchangeRatesToUser = ratesUser;
exchangeRatesToDefault = ratesDefault;
}
const getPreloadedRate = (
from: string,
to: string,
dateStr: string
): number | undefined => {
if (from === to) {
return 1;
}
if (to === userCurrency) {
return exchangeRatesToUser[`${from}${userCurrency}`]?.[dateStr];
}
if (to === DEFAULT_CURRENCY) {
return exchangeRatesToDefault[`${from}${DEFAULT_CURRENCY}`]?.[dateStr];
}
if (from === DEFAULT_CURRENCY) {
const rateToDefault =
exchangeRatesToDefault[`${to}${DEFAULT_CURRENCY}`]?.[dateStr];
return rateToDefault ? 1 / rateToDefault : undefined;
}
const rateFromToDefault =
exchangeRatesToDefault[`${from}${DEFAULT_CURRENCY}`]?.[dateStr];
const rateToToDefault =
exchangeRatesToDefault[`${to}${DEFAULT_CURRENCY}`]?.[dateStr];
if (rateFromToDefault !== undefined && rateToToDefault) {
return rateFromToDefault / rateToToDefault;
}
return undefined;
};
const convertValue = async (
val: number,
from: string,
to: string,
date: Date
): Promise<number> => {
if (val === 0) {
return 0;
}
const dateStr = format(date, DATE_FORMAT);
const rate = getPreloadedRate(from, to, dateStr);
if (rate !== undefined && !isNaN(rate)) {
return rate * val;
}
return this.exchangeRateDataService.toCurrencyAtDate(val, from, to, date);
};
const activities = [];
const chunkSize = 500;
for (let i = 0; i < orders.length; i += chunkSize) {
const chunk = orders.slice(i, i + chunkSize);
const processedChunk = await Promise.all(
chunk.map(async (order) => {
const assetProfile = assetProfiles.find(({ dataSource, symbol }) => {
return (
dataSource === order.SymbolProfile.dataSource &&
@ -703,25 +766,25 @@ export class ActivitiesService {
unitPriceInAssetProfileCurrency,
valueInBaseCurrency
] = await Promise.all([
this.exchangeRateDataService.toCurrencyAtDate(
convertValue(
order.fee,
order.currency ?? order.SymbolProfile.currency,
order.SymbolProfile.currency,
order.date
),
this.exchangeRateDataService.toCurrencyAtDate(
convertValue(
order.fee,
order.currency ?? order.SymbolProfile.currency,
userCurrency,
order.date
),
this.exchangeRateDataService.toCurrencyAtDate(
convertValue(
order.unitPrice,
order.currency ?? order.SymbolProfile.currency,
order.SymbolProfile.currency,
order.date
),
this.exchangeRateDataService.toCurrencyAtDate(
convertValue(
value,
order.currency ?? order.SymbolProfile.currency,
userCurrency,
@ -731,16 +794,20 @@ export class ActivitiesService {
return {
...order,
feeInAssetProfileCurrency,
feeInBaseCurrency,
unitPriceInAssetProfileCurrency,
feeInAssetProfileCurrency: feeInAssetProfileCurrency ?? order.fee,
feeInBaseCurrency: feeInBaseCurrency ?? order.fee,
unitPriceInAssetProfileCurrency:
unitPriceInAssetProfileCurrency ?? order.unitPrice,
value,
valueInBaseCurrency,
valueInBaseCurrency: valueInBaseCurrency ?? value,
SymbolProfile: assetProfile
};
})
);
activities.push(...processedChunk);
}
return { activities, count };
}

20
apps/api/src/services/market-data/market-data.service.ts

@ -92,26 +92,6 @@ export class MarketDataService {
});
}
public async getRangeCount({
assetProfileIdentifiers,
dateQuery
}: {
assetProfileIdentifiers: AssetProfileIdentifier[];
dateQuery: DateQuery;
}): Promise<number> {
return this.prismaService.marketData.count({
where: {
date: dateQuery,
OR: assetProfileIdentifiers.map(({ dataSource, symbol }) => {
return {
dataSource,
symbol
};
})
}
});
}
public async marketDataItems(params: {
select?: Prisma.MarketDataSelectScalar;
skip?: number;

Loading…
Cancel
Save