Browse Source

Merge a9e14fc508 into 5689326b12

pull/6901/merge
Andrea Bugeja 21 hours ago
committed by GitHub
parent
commit
ec7a420c01
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 2
      apps/api/src/app/activities/activities.service.ts
  2. 2
      apps/api/src/app/portfolio/current-rate.service.spec.ts
  3. 1
      apps/api/src/events/market-data-updated.event.ts
  4. 182
      apps/api/src/services/exchange-rate-data/exchange-rate-data.service.ts
  5. 44
      apps/api/src/services/market-data/market-data.service.ts

2
apps/api/src/app/activities/activities.service.ts

@ -469,7 +469,7 @@ export class ActivitiesService {
sortColumn,
sortDirection = 'asc',
startDate,
take = Number.MAX_SAFE_INTEGER,
take,
types,
userCurrency,
userId,

2
apps/api/src/app/portfolio/current-rate.service.spec.ts

@ -111,7 +111,7 @@ describe('CurrentRateService', () => {
null
);
marketDataService = new MarketDataService(null);
marketDataService = new MarketDataService(null, null);
currentRateService = new CurrentRateService(
null,

1
apps/api/src/events/market-data-updated.event.ts

@ -0,0 +1 @@
export const MARKET_DATA_UPDATED = 'market-data.updated';

182
apps/api/src/services/exchange-rate-data/exchange-rate-data.service.ts

@ -16,6 +16,7 @@ import {
} from '@ghostfolio/common/helper';
import { Injectable, Logger } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import {
eachDayOfInterval,
format,
@ -26,6 +27,7 @@ import {
import { isNumber } from 'lodash';
import ms from 'ms';
import { MARKET_DATA_UPDATED } from '../../events/market-data-updated.event';
import { ExchangeRatesByCurrency } from './interfaces/exchange-rate-data.interface';
@Injectable()
@ -34,6 +36,8 @@ export class ExchangeRateDataService {
private currencyPairs: DataGatheringItem[] = [];
private derivedCurrencyFactors: { [currencyPair: string]: number } = {};
private exchangeRates: { [currencyPair: string]: number } = {};
private exchangeRateCache = new Map<string, Float64Array>();
private pendingLoads = new Map<string, Promise<Float64Array>>();
public constructor(
private readonly dataProviderService: DataProviderService,
@ -284,56 +288,56 @@ export class ExchangeRateDataService {
} else if (derivedCurrencyFactor) {
factor = derivedCurrencyFactor;
} else {
const dataSource =
this.dataProviderService.getDataSourceForExchangeRates();
const symbol = `${aFromCurrency}${aToCurrency}`;
const marketData = await this.marketDataService.get({
dataSource,
symbol,
date: aDate
});
const marketPrice = await this.getRateFromCache(
`${aFromCurrency}${aToCurrency}`,
aDate
);
if (marketData?.marketPrice) {
factor = marketData?.marketPrice;
if (marketPrice !== undefined) {
factor = marketPrice;
} else {
// Calculate indirectly via base currency
let marketPriceBaseCurrencyFromCurrency: number;
let marketPriceBaseCurrencyToCurrency: number;
try {
if (aFromCurrency === DEFAULT_CURRENCY) {
marketPriceBaseCurrencyFromCurrency = 1;
} else {
marketPriceBaseCurrencyFromCurrency = (
await this.marketDataService.get({
dataSource,
date: aDate,
symbol: `${DEFAULT_CURRENCY}${aFromCurrency}`
})
)?.marketPrice;
let baseFromPrice: number | undefined =
aFromCurrency === DEFAULT_CURRENCY ? 1 : undefined;
let baseToPrice: number | undefined =
aToCurrency === DEFAULT_CURRENCY ? 1 : undefined;
if (aFromCurrency !== DEFAULT_CURRENCY) {
baseFromPrice = await this.getRateFromCache(
`${DEFAULT_CURRENCY}${aFromCurrency}`,
aDate
);
if (baseFromPrice === undefined) {
const crossPrice = await this.getRateFromCache(
`${aFromCurrency}${DEFAULT_CURRENCY}`,
aDate
);
if (crossPrice !== undefined) {
baseFromPrice = 1 / crossPrice;
}
}
}
} catch {}
try {
if (aToCurrency === DEFAULT_CURRENCY) {
marketPriceBaseCurrencyToCurrency = 1;
} else {
marketPriceBaseCurrencyToCurrency = (
await this.marketDataService.get({
dataSource,
date: aDate,
symbol: `${DEFAULT_CURRENCY}${aToCurrency}`
})
)?.marketPrice;
if (aToCurrency !== DEFAULT_CURRENCY) {
baseToPrice = await this.getRateFromCache(
`${DEFAULT_CURRENCY}${aToCurrency}`,
aDate
);
if (baseToPrice === undefined) {
const crossPrice = await this.getRateFromCache(
`${aToCurrency}${DEFAULT_CURRENCY}`,
aDate
);
if (crossPrice !== undefined) {
baseToPrice = 1 / crossPrice;
}
}
}
} catch {}
// Calculate the opposite direction
factor =
(1 / marketPriceBaseCurrencyFromCurrency) *
marketPriceBaseCurrencyToCurrency;
factor = (1 / baseFromPrice) * baseToPrice;
} catch {}
}
}
@ -352,6 +356,98 @@ export class ExchangeRateDataService {
return undefined;
}
@OnEvent(MARKET_DATA_UPDATED)
public onMarketDataUpdated(event: { symbol: string }) {
this.exchangeRateCache.delete(event.symbol);
this.pendingLoads.delete(event.symbol);
}
private getDaysSinceEpoch(aDate: Date) {
return Math.floor(aDate.getTime() / 86400000) + 25569;
}
private async loadCache(aSymbol: string): Promise<Float64Array> {
const dataSource = this.dataProviderService.getDataSourceForExchangeRates();
const marketData = await this.prismaService.marketData.findMany({
where: { dataSource, symbol: aSymbol },
orderBy: { date: 'asc' },
select: { date: true, marketPrice: true }
});
const todayDays = this.getDaysSinceEpoch(new Date());
const maxDataDays =
marketData.length > 0
? this.getDaysSinceEpoch(marketData[marketData.length - 1].date)
: 0;
const array = new Float64Array(Math.max(todayDays, maxDataDays) + 1);
if (marketData.length > 0) {
let lastRate = marketData[0].marketPrice;
let currentIndex = this.getDaysSinceEpoch(marketData[0].date);
for (const data of marketData) {
const dataIndex = this.getDaysSinceEpoch(data.date);
while (currentIndex < dataIndex && currentIndex <= todayDays) {
array[currentIndex++] = lastRate;
}
lastRate = data.marketPrice;
array[dataIndex] = lastRate;
currentIndex = dataIndex + 1;
}
while (currentIndex <= todayDays) {
array[currentIndex++] = lastRate;
}
}
return array;
}
private async getRateFromCache(
aSymbol: string,
aDate: Date
): Promise<number | undefined> {
let cache = this.exchangeRateCache.get(aSymbol);
while (!cache) {
if (this.pendingLoads.has(aSymbol)) {
await this.pendingLoads.get(aSymbol);
cache = this.exchangeRateCache.get(aSymbol);
} else {
cache = await this.loadAndCommit(aSymbol);
}
}
const days = Math.min(this.getDaysSinceEpoch(aDate), cache.length - 1);
if (days >= 0) {
const rate = cache[days];
return rate === 0 ? undefined : rate;
}
return undefined;
}
private async loadAndCommit(aSymbol: string): Promise<Float64Array> {
const loadPromise = this.loadCache(aSymbol);
this.pendingLoads.set(aSymbol, loadPromise);
try {
const cache = await loadPromise;
if (this.pendingLoads.get(aSymbol) === loadPromise) {
this.exchangeRateCache.set(aSymbol, cache);
}
return this.exchangeRateCache.get(aSymbol) ?? cache;
} finally {
if (this.pendingLoads.get(aSymbol) === loadPromise) {
this.pendingLoads.delete(aSymbol);
}
}
}
private async getExchangeRates({
currencyFrom,
currencyTo,

44
apps/api/src/services/market-data/market-data.service.ts

@ -6,6 +6,7 @@ import { resetHours } from '@ghostfolio/common/helper';
import { AssetProfileIdentifier } from '@ghostfolio/common/interfaces';
import { Injectable } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import {
DataSource,
MarketData,
@ -13,17 +14,26 @@ import {
Prisma
} from '@prisma/client';
import { MARKET_DATA_UPDATED } from '../../events/market-data-updated.event';
@Injectable()
export class MarketDataService {
public constructor(private readonly prismaService: PrismaService) {}
public constructor(
private readonly eventEmitter: EventEmitter2,
private readonly prismaService: PrismaService
) {}
public async deleteMany({ dataSource, symbol }: AssetProfileIdentifier) {
return this.prismaService.marketData.deleteMany({
const result = await this.prismaService.marketData.deleteMany({
where: {
dataSource,
symbol
}
});
this.eventEmitter.emit(MARKET_DATA_UPDATED, { symbol });
return result;
}
public async get({
@ -185,13 +195,15 @@ export class MarketDataService {
});
}
});
this.eventEmitter.emit(MARKET_DATA_UPDATED, { symbol });
}
public async updateAssetProfileIdentifier(
oldAssetProfileIdentifier: AssetProfileIdentifier,
newAssetProfileIdentifier: AssetProfileIdentifier
) {
return this.prismaService.marketData.updateMany({
const result = await this.prismaService.marketData.updateMany({
data: {
dataSource: newAssetProfileIdentifier.dataSource,
symbol: newAssetProfileIdentifier.symbol
@ -201,6 +213,15 @@ export class MarketDataService {
symbol: oldAssetProfileIdentifier.symbol
}
});
this.eventEmitter.emit(MARKET_DATA_UPDATED, {
symbol: oldAssetProfileIdentifier.symbol
});
this.eventEmitter.emit(MARKET_DATA_UPDATED, {
symbol: newAssetProfileIdentifier.symbol
});
return result;
}
public async updateMarketData(params: {
@ -211,7 +232,7 @@ export class MarketDataService {
}): Promise<MarketData> {
const { data, where } = params;
return this.prismaService.marketData.upsert({
const result = await this.prismaService.marketData.upsert({
where,
create: {
dataSource: where.dataSource_date_symbol.dataSource,
@ -222,6 +243,12 @@ export class MarketDataService {
},
update: { marketPrice: data.marketPrice, state: data.state }
});
this.eventEmitter.emit(MARKET_DATA_UPDATED, {
symbol: where.dataSource_date_symbol.symbol
});
return result;
}
/**
@ -258,6 +285,13 @@ export class MarketDataService {
}
);
return this.prismaService.$transaction(upsertPromises);
const result = await this.prismaService.$transaction(upsertPromises);
const symbols = [...new Set(data.map((d) => d.symbol as string))];
for (const symbol of symbols) {
this.eventEmitter.emit(MARKET_DATA_UPDATED, { symbol });
}
return result;
}
}

Loading…
Cancel
Save