diff --git a/apps/api/src/app/app.module.ts b/apps/api/src/app/app.module.ts index ca19d63bc..7cacd93cf 100644 --- a/apps/api/src/app/app.module.ts +++ b/apps/api/src/app/app.module.ts @@ -4,6 +4,7 @@ import { CronService } from '@ghostfolio/api/services/cron.service'; import { DataGatheringModule } from '@ghostfolio/api/services/data-gathering/data-gathering.module'; import { DataProviderModule } from '@ghostfolio/api/services/data-provider/data-provider.module'; import { ExchangeRateDataModule } from '@ghostfolio/api/services/exchange-rate-data/exchange-rate-data.module'; +import { PortfolioSnapshotQueueModule } from '@ghostfolio/api/services/portfolio-snapshot/portfolio-snapshot.module'; import { PrismaModule } from '@ghostfolio/api/services/prisma/prisma.module'; import { PropertyModule } from '@ghostfolio/api/services/property/property.module'; import { TwitterBotModule } from '@ghostfolio/api/services/twitter-bot/twitter-bot.module'; @@ -81,6 +82,7 @@ import { UserModule } from './user/user.module'; OrderModule, PlatformModule, PortfolioModule, + PortfolioSnapshotQueueModule, PrismaModule, PropertyModule, RedisCacheModule, diff --git a/apps/api/src/app/portfolio/calculator/portfolio-calculator.factory.ts b/apps/api/src/app/portfolio/calculator/portfolio-calculator.factory.ts index b531ffc9d..01e5fded2 100644 --- a/apps/api/src/app/portfolio/calculator/portfolio-calculator.factory.ts +++ b/apps/api/src/app/portfolio/calculator/portfolio-calculator.factory.ts @@ -3,6 +3,7 @@ import { CurrentRateService } from '@ghostfolio/api/app/portfolio/current-rate.s import { RedisCacheService } from '@ghostfolio/api/app/redis-cache/redis-cache.service'; import { ConfigurationService } from '@ghostfolio/api/services/configuration/configuration.service'; import { ExchangeRateDataService } from '@ghostfolio/api/services/exchange-rate-data/exchange-rate-data.service'; +import { PortfolioSnapshotService } from '@ghostfolio/api/services/portfolio-snapshot/portfolio-snapshot.service'; import { Filter, HistoricalDataItem } from '@ghostfolio/common/interfaces'; import { Injectable } from '@nestjs/common'; @@ -22,6 +23,7 @@ export class PortfolioCalculatorFactory { private readonly configurationService: ConfigurationService, private readonly currentRateService: CurrentRateService, private readonly exchangeRateDataService: ExchangeRateDataService, + private readonly portfolioService: PortfolioSnapshotService, private readonly redisCacheService: RedisCacheService ) {} @@ -51,6 +53,7 @@ export class PortfolioCalculatorFactory { configurationService: this.configurationService, currentRateService: this.currentRateService, exchangeRateDataService: this.exchangeRateDataService, + portfolioService: this.portfolioService, redisCacheService: this.redisCacheService }); case PerformanceCalculationType.TWR: @@ -63,6 +66,7 @@ export class PortfolioCalculatorFactory { userId, configurationService: this.configurationService, exchangeRateDataService: this.exchangeRateDataService, + portfolioService: this.portfolioService, redisCacheService: this.redisCacheService }); default: diff --git a/apps/api/src/app/portfolio/calculator/portfolio-calculator.ts b/apps/api/src/app/portfolio/calculator/portfolio-calculator.ts index 2938bd734..73cf2ae8c 100644 --- a/apps/api/src/app/portfolio/calculator/portfolio-calculator.ts +++ b/apps/api/src/app/portfolio/calculator/portfolio-calculator.ts @@ -10,8 +10,13 @@ import { LogPerformance } from '@ghostfolio/api/interceptors/performance-logging import { ConfigurationService } from '@ghostfolio/api/services/configuration/configuration.service'; import { ExchangeRateDataService } from '@ghostfolio/api/services/exchange-rate-data/exchange-rate-data.service'; import { IDataGatheringItem } from '@ghostfolio/api/services/interfaces/interfaces'; +import { PortfolioSnapshotService } from '@ghostfolio/api/services/portfolio-snapshot/portfolio-snapshot.service'; import { getIntervalFromDateRange } from '@ghostfolio/common/calculation-helper'; import { CACHE_TTL_INFINITE } from '@ghostfolio/common/config'; +import { + PORTFOLIO_PROCESS_JOB_NAME, + PORTFOLIO_PROCESS_JOB_OPTIONS +} from '@ghostfolio/common/config'; import { DATE_FORMAT, getSum, @@ -59,6 +64,7 @@ export abstract class PortfolioCalculator { private endDate: Date; private exchangeRateDataService: ExchangeRateDataService; private filters: Filter[]; + private portfolioService: PortfolioSnapshotService; private redisCacheService: RedisCacheService; private snapshot: PortfolioSnapshot; private snapshotPromise: Promise; @@ -74,6 +80,7 @@ export abstract class PortfolioCalculator { currentRateService, exchangeRateDataService, filters, + portfolioService, redisCacheService, userId }: { @@ -84,6 +91,7 @@ export abstract class PortfolioCalculator { currentRateService: CurrentRateService; exchangeRateDataService: ExchangeRateDataService; filters: Filter[]; + portfolioService: PortfolioSnapshotService; redisCacheService: RedisCacheService; userId: string; }) { @@ -132,6 +140,7 @@ export abstract class PortfolioCalculator { return a.date?.localeCompare(b.date); }); + this.portfolioService = portfolioService; this.redisCacheService = redisCacheService; this.userId = userId; @@ -1069,10 +1078,36 @@ export abstract class PortfolioCalculator { if (isCachedPortfolioSnapshotExpired) { // Compute in the background + this.portfolioService.addJobToQueue({ + data: { + userId: this.userId + }, + name: PORTFOLIO_PROCESS_JOB_NAME, + opts: { + ...PORTFOLIO_PROCESS_JOB_OPTIONS + // jobId + // priority + } + }); this.computeAndCacheSnapshot(); } } else { // Wait for computation + // TODO + const job = await this.portfolioService.addJobToQueue({ + data: { + userId: this.userId + }, + name: PORTFOLIO_PROCESS_JOB_NAME, + opts: { + ...PORTFOLIO_PROCESS_JOB_OPTIONS + // jobId + // priority + } + }); + + await job.finished(); + this.snapshot = await this.computeAndCacheSnapshot(); } } diff --git a/apps/api/src/app/portfolio/portfolio.module.ts b/apps/api/src/app/portfolio/portfolio.module.ts index ad81e9e15..00514e94e 100644 --- a/apps/api/src/app/portfolio/portfolio.module.ts +++ b/apps/api/src/app/portfolio/portfolio.module.ts @@ -15,6 +15,7 @@ import { DataProviderModule } from '@ghostfolio/api/services/data-provider/data- import { ExchangeRateDataModule } from '@ghostfolio/api/services/exchange-rate-data/exchange-rate-data.module'; import { ImpersonationModule } from '@ghostfolio/api/services/impersonation/impersonation.module'; import { MarketDataModule } from '@ghostfolio/api/services/market-data/market-data.module'; +import { PortfolioSnapshotQueueModule } from '@ghostfolio/api/services/portfolio-snapshot/portfolio-snapshot.module'; import { PrismaModule } from '@ghostfolio/api/services/prisma/prisma.module'; import { SymbolProfileModule } from '@ghostfolio/api/services/symbol-profile/symbol-profile.module'; @@ -40,6 +41,7 @@ import { RulesService } from './rules.service'; MarketDataModule, OrderModule, PerformanceLoggingModule, + PortfolioSnapshotQueueModule, PrismaModule, RedactValuesInResponseModule, RedisCacheModule, diff --git a/apps/api/src/services/data-gathering/data-gathering.processor.ts b/apps/api/src/services/data-gathering/data-gathering.processor.ts index d8a6a7644..62f52d45b 100644 --- a/apps/api/src/services/data-gathering/data-gathering.processor.ts +++ b/apps/api/src/services/data-gathering/data-gathering.processor.ts @@ -4,7 +4,7 @@ import { MarketDataService } from '@ghostfolio/api/services/market-data/market-d import { DATA_GATHERING_QUEUE, GATHER_ASSET_PROFILE_PROCESS, - GATHER_HISTORICAL_MARKET_DATA_PROCESS + GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME } from '@ghostfolio/common/config'; import { DATE_FORMAT, getStartOfUtcDate } from '@ghostfolio/common/helper'; import { AssetProfileIdentifier } from '@ghostfolio/common/interfaces'; @@ -58,7 +58,10 @@ export class DataGatheringProcessor { } } - @Process({ concurrency: 1, name: GATHER_HISTORICAL_MARKET_DATA_PROCESS }) + @Process({ + concurrency: 1, + name: GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME + }) public async gatherHistoricalMarketData(job: Job) { try { const { dataSource, date, symbol } = job.data; @@ -69,7 +72,7 @@ export class DataGatheringProcessor { currentDate, DATE_FORMAT )}`, - `DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS})` + `DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME})` ); const historicalData = await this.dataProviderService.getHistoricalRaw({ @@ -123,12 +126,12 @@ export class DataGatheringProcessor { currentDate, DATE_FORMAT )}`, - `DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS})` + `DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME})` ); } catch (error) { Logger.error( error, - `DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS})` + `DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME})` ); throw new Error(error); diff --git a/apps/api/src/services/data-gathering/data-gathering.service.ts b/apps/api/src/services/data-gathering/data-gathering.service.ts index 8b8c65a21..72b8ac716 100644 --- a/apps/api/src/services/data-gathering/data-gathering.service.ts +++ b/apps/api/src/services/data-gathering/data-gathering.service.ts @@ -11,8 +11,8 @@ import { DATA_GATHERING_QUEUE_PRIORITY_HIGH, DATA_GATHERING_QUEUE_PRIORITY_LOW, DATA_GATHERING_QUEUE_PRIORITY_MEDIUM, - GATHER_HISTORICAL_MARKET_DATA_PROCESS, - GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS, + GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME, + GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_OPTIONS, PROPERTY_BENCHMARKS } from '@ghostfolio/common/config'; import { @@ -279,9 +279,9 @@ export class DataGatheringService { date, symbol }, - name: GATHER_HISTORICAL_MARKET_DATA_PROCESS, + name: GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME, opts: { - ...GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS, + ...GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_OPTIONS, priority, jobId: `${getAssetProfileIdentifier({ dataSource, diff --git a/apps/api/src/services/portfolio-snapshot/interfaces/portfolio-snapshot-queue-job.interface.ts b/apps/api/src/services/portfolio-snapshot/interfaces/portfolio-snapshot-queue-job.interface.ts new file mode 100644 index 000000000..10e6d133d --- /dev/null +++ b/apps/api/src/services/portfolio-snapshot/interfaces/portfolio-snapshot-queue-job.interface.ts @@ -0,0 +1,3 @@ +export interface IPortfolioSnapshotQueueJob { + userId: string; +} diff --git a/apps/api/src/services/portfolio-snapshot/portfolio-snapshot.module.ts b/apps/api/src/services/portfolio-snapshot/portfolio-snapshot.module.ts new file mode 100644 index 000000000..818048d47 --- /dev/null +++ b/apps/api/src/services/portfolio-snapshot/portfolio-snapshot.module.ts @@ -0,0 +1,36 @@ +import { ConfigurationModule } from '@ghostfolio/api/services/configuration/configuration.module'; +import { DataProviderModule } from '@ghostfolio/api/services/data-provider/data-provider.module'; +import { ExchangeRateDataModule } from '@ghostfolio/api/services/exchange-rate-data/exchange-rate-data.module'; +import { MarketDataModule } from '@ghostfolio/api/services/market-data/market-data.module'; +import { PortfolioSnapshotService } from '@ghostfolio/api/services/portfolio-snapshot/portfolio-snapshot.service'; +import { PrismaModule } from '@ghostfolio/api/services/prisma/prisma.module'; +import { PropertyModule } from '@ghostfolio/api/services/property/property.module'; +import { SymbolProfileModule } from '@ghostfolio/api/services/symbol-profile/symbol-profile.module'; +import { PORTFOLIO_SNAPSHOT_QUEUE } from '@ghostfolio/common/config'; + +import { BullModule } from '@nestjs/bull'; +import { Module } from '@nestjs/common'; + +import { PortfolioSnapshotProcessor } from './portfolio-snapshot.processor'; + +@Module({ + imports: [ + BullModule.registerQueue({ + // limiter: { + // duration: ms('4 seconds'), + // max: 1 + // }, + name: PORTFOLIO_SNAPSHOT_QUEUE + }), + ConfigurationModule, + DataProviderModule, + ExchangeRateDataModule, + MarketDataModule, + PrismaModule, + PropertyModule, + SymbolProfileModule + ], + providers: [PortfolioSnapshotProcessor, PortfolioSnapshotService], + exports: [BullModule, PortfolioSnapshotService] +}) +export class PortfolioSnapshotQueueModule {} diff --git a/apps/api/src/services/portfolio-snapshot/portfolio-snapshot.processor.ts b/apps/api/src/services/portfolio-snapshot/portfolio-snapshot.processor.ts new file mode 100644 index 000000000..d7eb1db60 --- /dev/null +++ b/apps/api/src/services/portfolio-snapshot/portfolio-snapshot.processor.ts @@ -0,0 +1,42 @@ +import { + PORTFOLIO_PROCESS_JOB_NAME, + PORTFOLIO_SNAPSHOT_QUEUE +} from '@ghostfolio/common/config'; + +import { Process, Processor } from '@nestjs/bull'; +import { Injectable, Logger } from '@nestjs/common'; +import { Job } from 'bull'; +import ms from 'ms'; +import { setTimeout } from 'timers/promises'; + +import { IPortfolioSnapshotQueueJob } from './interfaces/portfolio-snapshot-queue-job.interface'; + +@Injectable() +@Processor(PORTFOLIO_SNAPSHOT_QUEUE) +export class PortfolioSnapshotProcessor { + public constructor() {} + + @Process({ concurrency: 1, name: PORTFOLIO_PROCESS_JOB_NAME }) + public async calculatePortfolioSnapshot( + job: Job + ) { + try { + Logger.log( + `Portfolio snapshot calculation of user ${job.data.userId} has been started`, + `PortfolioProcessor (${PORTFOLIO_PROCESS_JOB_NAME})` + ); + + // TODO: Do something + await setTimeout(ms('1 second')); + + Logger.log( + `Portfolio snapshot calculation of user ${job.data.userId} has been completed`, + `PortfolioProcessor (${PORTFOLIO_PROCESS_JOB_NAME})` + ); + } catch (error) { + Logger.error(error, `PortfolioProcessor (${PORTFOLIO_PROCESS_JOB_NAME})`); + + throw new Error(error); + } + } +} diff --git a/apps/api/src/services/portfolio-snapshot/portfolio-snapshot.service.ts b/apps/api/src/services/portfolio-snapshot/portfolio-snapshot.service.ts new file mode 100644 index 000000000..ed06267c4 --- /dev/null +++ b/apps/api/src/services/portfolio-snapshot/portfolio-snapshot.service.ts @@ -0,0 +1,33 @@ +import { PORTFOLIO_SNAPSHOT_QUEUE } from '@ghostfolio/common/config'; + +import { InjectQueue } from '@nestjs/bull'; +import { Injectable } from '@nestjs/common'; +import { JobOptions, Queue } from 'bull'; + +import { IPortfolioSnapshotQueueJob } from './interfaces/portfolio-snapshot-queue-job.interface'; + +@Injectable() +export class PortfolioSnapshotService { + public constructor( + @InjectQueue(PORTFOLIO_SNAPSHOT_QUEUE) + private readonly portfolioSnapshotQueue: Queue + ) {} + + public async addJobToQueue({ + data, + name, + opts + }: { + data: IPortfolioSnapshotQueueJob; + name: string; + opts?: JobOptions; + }) { + return this.portfolioSnapshotQueue.add(name, data, opts); + } + + // public async addJobsToQueue( + // jobs: { data: IPortfolioSnapshotQueueJob; name: string; opts?: JobOptions }[] + // ) { + // return this.portfolioSnapshotQueue.addBulk(jobs); + // } +} diff --git a/libs/common/src/lib/config.ts b/libs/common/src/lib/config.ts index 00e756810..6161c2634 100644 --- a/libs/common/src/lib/config.ts +++ b/libs/common/src/lib/config.ts @@ -40,6 +40,8 @@ export const DATA_GATHERING_QUEUE_PRIORITY_MEDIUM = Math.round( DATA_GATHERING_QUEUE_PRIORITY_LOW / 2 ); +export const PORTFOLIO_SNAPSHOT_QUEUE = 'PORTFOLIO_SNAPSHOT_QUEUE'; + export const DEFAULT_CURRENCY = 'USD'; export const DEFAULT_DATE_FORMAT_MONTH_YEAR = 'MMM yyyy'; export const DEFAULT_LANGUAGE_CODE = 'en'; @@ -76,9 +78,9 @@ export const GATHER_ASSET_PROFILE_PROCESS_OPTIONS: JobOptions = { }, removeOnComplete: true }; -export const GATHER_HISTORICAL_MARKET_DATA_PROCESS = +export const GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME = 'GATHER_HISTORICAL_MARKET_DATA'; -export const GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS: JobOptions = { +export const GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_OPTIONS: JobOptions = { attempts: 12, backoff: { delay: ms('1 minute'), @@ -86,6 +88,15 @@ export const GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS: JobOptions = { }, removeOnComplete: true }; +export const PORTFOLIO_PROCESS_JOB_NAME = 'PORTFOLIO'; +export const PORTFOLIO_PROCESS_JOB_OPTIONS: JobOptions = { + // attempts: 12, + // backoff: { + // delay: ms('1 minute'), + // type: 'exponential' + // }, + removeOnComplete: true +}; export const HEADER_KEY_IMPERSONATION = 'Impersonation-Id'; export const HEADER_KEY_TIMEZONE = 'Timezone';