diff --git a/apps/api/src/app/activities/activities.service.ts b/apps/api/src/app/activities/activities.service.ts index 821185e11..298b8ab20 100644 --- a/apps/api/src/app/activities/activities.service.ts +++ b/apps/api/src/app/activities/activities.service.ts @@ -469,7 +469,7 @@ export class ActivitiesService { sortColumn, sortDirection = 'asc', startDate, - take = Number.MAX_SAFE_INTEGER, + take, types, userCurrency, userId, diff --git a/apps/api/src/app/portfolio/current-rate.service.spec.ts b/apps/api/src/app/portfolio/current-rate.service.spec.ts index 5f2358679..d80ee8e9a 100644 --- a/apps/api/src/app/portfolio/current-rate.service.spec.ts +++ b/apps/api/src/app/portfolio/current-rate.service.spec.ts @@ -111,7 +111,7 @@ describe('CurrentRateService', () => { null ); - marketDataService = new MarketDataService(null); + marketDataService = new MarketDataService(null, null); currentRateService = new CurrentRateService( null, diff --git a/apps/api/src/events/market-data-updated.event.ts b/apps/api/src/events/market-data-updated.event.ts new file mode 100644 index 000000000..140a7564e --- /dev/null +++ b/apps/api/src/events/market-data-updated.event.ts @@ -0,0 +1 @@ +export const MARKET_DATA_UPDATED = 'market-data.updated'; diff --git a/apps/api/src/services/exchange-rate-data/exchange-rate-data.service.ts b/apps/api/src/services/exchange-rate-data/exchange-rate-data.service.ts index 024bdf4e1..450b4abff 100644 --- a/apps/api/src/services/exchange-rate-data/exchange-rate-data.service.ts +++ b/apps/api/src/services/exchange-rate-data/exchange-rate-data.service.ts @@ -16,6 +16,7 @@ import { } from '@ghostfolio/common/helper'; import { Injectable, Logger } from '@nestjs/common'; +import { OnEvent } from '@nestjs/event-emitter'; import { eachDayOfInterval, format, @@ -26,6 +27,7 @@ import { import { isNumber } from 'lodash'; import ms from 'ms'; +import { MARKET_DATA_UPDATED } from '../../events/market-data-updated.event'; import { ExchangeRatesByCurrency } from './interfaces/exchange-rate-data.interface'; @Injectable() @@ -34,6 +36,8 @@ export class ExchangeRateDataService { private currencyPairs: DataGatheringItem[] = []; private derivedCurrencyFactors: { [currencyPair: string]: number } = {}; private exchangeRates: { [currencyPair: string]: number } = {}; + private exchangeRateCache = new Map(); + private pendingLoads = new Map>(); public constructor( private readonly dataProviderService: DataProviderService, @@ -284,56 +288,56 @@ export class ExchangeRateDataService { } else if (derivedCurrencyFactor) { factor = derivedCurrencyFactor; } else { - const dataSource = - this.dataProviderService.getDataSourceForExchangeRates(); - const symbol = `${aFromCurrency}${aToCurrency}`; - - const marketData = await this.marketDataService.get({ - dataSource, - symbol, - date: aDate - }); + const marketPrice = await this.getRateFromCache( + `${aFromCurrency}${aToCurrency}`, + aDate + ); - if (marketData?.marketPrice) { - factor = marketData?.marketPrice; + if (marketPrice !== undefined) { + factor = marketPrice; } else { - // Calculate indirectly via base currency - - let marketPriceBaseCurrencyFromCurrency: number; - let marketPriceBaseCurrencyToCurrency: number; - try { - if (aFromCurrency === DEFAULT_CURRENCY) { - marketPriceBaseCurrencyFromCurrency = 1; - } else { - marketPriceBaseCurrencyFromCurrency = ( - await this.marketDataService.get({ - dataSource, - date: aDate, - symbol: `${DEFAULT_CURRENCY}${aFromCurrency}` - }) - )?.marketPrice; + let baseFromPrice: number | undefined = + aFromCurrency === DEFAULT_CURRENCY ? 1 : undefined; + let baseToPrice: number | undefined = + aToCurrency === DEFAULT_CURRENCY ? 1 : undefined; + + if (aFromCurrency !== DEFAULT_CURRENCY) { + baseFromPrice = await this.getRateFromCache( + `${DEFAULT_CURRENCY}${aFromCurrency}`, + aDate + ); + + if (baseFromPrice === undefined) { + const crossPrice = await this.getRateFromCache( + `${aFromCurrency}${DEFAULT_CURRENCY}`, + aDate + ); + if (crossPrice !== undefined) { + baseFromPrice = 1 / crossPrice; + } + } } - } catch {} - try { - if (aToCurrency === DEFAULT_CURRENCY) { - marketPriceBaseCurrencyToCurrency = 1; - } else { - marketPriceBaseCurrencyToCurrency = ( - await this.marketDataService.get({ - dataSource, - date: aDate, - symbol: `${DEFAULT_CURRENCY}${aToCurrency}` - }) - )?.marketPrice; + if (aToCurrency !== DEFAULT_CURRENCY) { + baseToPrice = await this.getRateFromCache( + `${DEFAULT_CURRENCY}${aToCurrency}`, + aDate + ); + + if (baseToPrice === undefined) { + const crossPrice = await this.getRateFromCache( + `${aToCurrency}${DEFAULT_CURRENCY}`, + aDate + ); + if (crossPrice !== undefined) { + baseToPrice = 1 / crossPrice; + } + } } - } catch {} - // Calculate the opposite direction - factor = - (1 / marketPriceBaseCurrencyFromCurrency) * - marketPriceBaseCurrencyToCurrency; + factor = (1 / baseFromPrice) * baseToPrice; + } catch {} } } @@ -352,6 +356,98 @@ export class ExchangeRateDataService { return undefined; } + @OnEvent(MARKET_DATA_UPDATED) + public onMarketDataUpdated(event: { symbol: string }) { + this.exchangeRateCache.delete(event.symbol); + this.pendingLoads.delete(event.symbol); + } + + private getDaysSinceEpoch(aDate: Date) { + return Math.floor(aDate.getTime() / 86400000) + 25569; + } + + private async loadCache(aSymbol: string): Promise { + const dataSource = this.dataProviderService.getDataSourceForExchangeRates(); + const marketData = await this.prismaService.marketData.findMany({ + where: { dataSource, symbol: aSymbol }, + orderBy: { date: 'asc' }, + select: { date: true, marketPrice: true } + }); + + const todayDays = this.getDaysSinceEpoch(new Date()); + const maxDataDays = + marketData.length > 0 + ? this.getDaysSinceEpoch(marketData[marketData.length - 1].date) + : 0; + + const array = new Float64Array(Math.max(todayDays, maxDataDays) + 1); + + if (marketData.length > 0) { + let lastRate = marketData[0].marketPrice; + let currentIndex = this.getDaysSinceEpoch(marketData[0].date); + + for (const data of marketData) { + const dataIndex = this.getDaysSinceEpoch(data.date); + while (currentIndex < dataIndex && currentIndex <= todayDays) { + array[currentIndex++] = lastRate; + } + lastRate = data.marketPrice; + array[dataIndex] = lastRate; + currentIndex = dataIndex + 1; + } + + while (currentIndex <= todayDays) { + array[currentIndex++] = lastRate; + } + } + + return array; + } + + private async getRateFromCache( + aSymbol: string, + aDate: Date + ): Promise { + let cache = this.exchangeRateCache.get(aSymbol); + + while (!cache) { + if (this.pendingLoads.has(aSymbol)) { + await this.pendingLoads.get(aSymbol); + cache = this.exchangeRateCache.get(aSymbol); + } else { + cache = await this.loadAndCommit(aSymbol); + } + } + + const days = Math.min(this.getDaysSinceEpoch(aDate), cache.length - 1); + + if (days >= 0) { + const rate = cache[days]; + return rate === 0 ? undefined : rate; + } + + return undefined; + } + + private async loadAndCommit(aSymbol: string): Promise { + const loadPromise = this.loadCache(aSymbol); + this.pendingLoads.set(aSymbol, loadPromise); + + try { + const cache = await loadPromise; + + if (this.pendingLoads.get(aSymbol) === loadPromise) { + this.exchangeRateCache.set(aSymbol, cache); + } + + return this.exchangeRateCache.get(aSymbol) ?? cache; + } finally { + if (this.pendingLoads.get(aSymbol) === loadPromise) { + this.pendingLoads.delete(aSymbol); + } + } + } + private async getExchangeRates({ currencyFrom, currencyTo, diff --git a/apps/api/src/services/market-data/market-data.service.ts b/apps/api/src/services/market-data/market-data.service.ts index 87b08e1bd..ce523a646 100644 --- a/apps/api/src/services/market-data/market-data.service.ts +++ b/apps/api/src/services/market-data/market-data.service.ts @@ -6,6 +6,7 @@ import { resetHours } from '@ghostfolio/common/helper'; import { AssetProfileIdentifier } from '@ghostfolio/common/interfaces'; import { Injectable } from '@nestjs/common'; +import { EventEmitter2 } from '@nestjs/event-emitter'; import { DataSource, MarketData, @@ -13,17 +14,26 @@ import { Prisma } from '@prisma/client'; +import { MARKET_DATA_UPDATED } from '../../events/market-data-updated.event'; + @Injectable() export class MarketDataService { - public constructor(private readonly prismaService: PrismaService) {} + public constructor( + private readonly eventEmitter: EventEmitter2, + private readonly prismaService: PrismaService + ) {} public async deleteMany({ dataSource, symbol }: AssetProfileIdentifier) { - return this.prismaService.marketData.deleteMany({ + const result = await this.prismaService.marketData.deleteMany({ where: { dataSource, symbol } }); + + this.eventEmitter.emit(MARKET_DATA_UPDATED, { symbol }); + + return result; } public async get({ @@ -185,13 +195,15 @@ export class MarketDataService { }); } }); + + this.eventEmitter.emit(MARKET_DATA_UPDATED, { symbol }); } public async updateAssetProfileIdentifier( oldAssetProfileIdentifier: AssetProfileIdentifier, newAssetProfileIdentifier: AssetProfileIdentifier ) { - return this.prismaService.marketData.updateMany({ + const result = await this.prismaService.marketData.updateMany({ data: { dataSource: newAssetProfileIdentifier.dataSource, symbol: newAssetProfileIdentifier.symbol @@ -201,6 +213,15 @@ export class MarketDataService { symbol: oldAssetProfileIdentifier.symbol } }); + + this.eventEmitter.emit(MARKET_DATA_UPDATED, { + symbol: oldAssetProfileIdentifier.symbol + }); + this.eventEmitter.emit(MARKET_DATA_UPDATED, { + symbol: newAssetProfileIdentifier.symbol + }); + + return result; } public async updateMarketData(params: { @@ -211,7 +232,7 @@ export class MarketDataService { }): Promise { const { data, where } = params; - return this.prismaService.marketData.upsert({ + const result = await this.prismaService.marketData.upsert({ where, create: { dataSource: where.dataSource_date_symbol.dataSource, @@ -222,6 +243,12 @@ export class MarketDataService { }, update: { marketPrice: data.marketPrice, state: data.state } }); + + this.eventEmitter.emit(MARKET_DATA_UPDATED, { + symbol: where.dataSource_date_symbol.symbol + }); + + return result; } /** @@ -258,6 +285,13 @@ export class MarketDataService { } ); - return this.prismaService.$transaction(upsertPromises); + const result = await this.prismaService.$transaction(upsertPromises); + + const symbols = [...new Set(data.map((d) => d.symbol as string))]; + for (const symbol of symbols) { + this.eventEmitter.emit(MARKET_DATA_UPDATED, { symbol }); + } + + return result; } }