Browse Source

Set up portfolio snapshot queue

pull/3725/head
Thomas Kaul 12 months ago
parent
commit
837a64feca
  1. 2
      apps/api/src/app/app.module.ts
  2. 4
      apps/api/src/app/portfolio/calculator/portfolio-calculator.factory.ts
  3. 35
      apps/api/src/app/portfolio/calculator/portfolio-calculator.ts
  4. 2
      apps/api/src/app/portfolio/portfolio.module.ts
  5. 13
      apps/api/src/services/data-gathering/data-gathering.processor.ts
  6. 8
      apps/api/src/services/data-gathering/data-gathering.service.ts
  7. 3
      apps/api/src/services/portfolio-snapshot/interfaces/portfolio-snapshot-queue-job.interface.ts
  8. 36
      apps/api/src/services/portfolio-snapshot/portfolio-snapshot.module.ts
  9. 42
      apps/api/src/services/portfolio-snapshot/portfolio-snapshot.processor.ts
  10. 33
      apps/api/src/services/portfolio-snapshot/portfolio-snapshot.service.ts
  11. 15
      libs/common/src/lib/config.ts

2
apps/api/src/app/app.module.ts

@ -4,6 +4,7 @@ import { CronService } from '@ghostfolio/api/services/cron.service';
import { DataGatheringModule } from '@ghostfolio/api/services/data-gathering/data-gathering.module';
import { DataProviderModule } from '@ghostfolio/api/services/data-provider/data-provider.module';
import { ExchangeRateDataModule } from '@ghostfolio/api/services/exchange-rate-data/exchange-rate-data.module';
import { PortfolioSnapshotQueueModule } from '@ghostfolio/api/services/portfolio-snapshot/portfolio-snapshot.module';
import { PrismaModule } from '@ghostfolio/api/services/prisma/prisma.module';
import { PropertyModule } from '@ghostfolio/api/services/property/property.module';
import { TwitterBotModule } from '@ghostfolio/api/services/twitter-bot/twitter-bot.module';
@ -81,6 +82,7 @@ import { UserModule } from './user/user.module';
OrderModule,
PlatformModule,
PortfolioModule,
PortfolioSnapshotQueueModule,
PrismaModule,
PropertyModule,
RedisCacheModule,

4
apps/api/src/app/portfolio/calculator/portfolio-calculator.factory.ts

@ -3,6 +3,7 @@ import { CurrentRateService } from '@ghostfolio/api/app/portfolio/current-rate.s
import { RedisCacheService } from '@ghostfolio/api/app/redis-cache/redis-cache.service';
import { ConfigurationService } from '@ghostfolio/api/services/configuration/configuration.service';
import { ExchangeRateDataService } from '@ghostfolio/api/services/exchange-rate-data/exchange-rate-data.service';
import { PortfolioSnapshotService } from '@ghostfolio/api/services/portfolio-snapshot/portfolio-snapshot.service';
import { Filter, HistoricalDataItem } from '@ghostfolio/common/interfaces';
import { Injectable } from '@nestjs/common';
@ -22,6 +23,7 @@ export class PortfolioCalculatorFactory {
private readonly configurationService: ConfigurationService,
private readonly currentRateService: CurrentRateService,
private readonly exchangeRateDataService: ExchangeRateDataService,
private readonly portfolioService: PortfolioSnapshotService,
private readonly redisCacheService: RedisCacheService
) {}
@ -51,6 +53,7 @@ export class PortfolioCalculatorFactory {
configurationService: this.configurationService,
currentRateService: this.currentRateService,
exchangeRateDataService: this.exchangeRateDataService,
portfolioService: this.portfolioService,
redisCacheService: this.redisCacheService
});
case PerformanceCalculationType.TWR:
@ -63,6 +66,7 @@ export class PortfolioCalculatorFactory {
userId,
configurationService: this.configurationService,
exchangeRateDataService: this.exchangeRateDataService,
portfolioService: this.portfolioService,
redisCacheService: this.redisCacheService
});
default:

35
apps/api/src/app/portfolio/calculator/portfolio-calculator.ts

@ -10,8 +10,13 @@ import { LogPerformance } from '@ghostfolio/api/interceptors/performance-logging
import { ConfigurationService } from '@ghostfolio/api/services/configuration/configuration.service';
import { ExchangeRateDataService } from '@ghostfolio/api/services/exchange-rate-data/exchange-rate-data.service';
import { IDataGatheringItem } from '@ghostfolio/api/services/interfaces/interfaces';
import { PortfolioSnapshotService } from '@ghostfolio/api/services/portfolio-snapshot/portfolio-snapshot.service';
import { getIntervalFromDateRange } from '@ghostfolio/common/calculation-helper';
import { CACHE_TTL_INFINITE } from '@ghostfolio/common/config';
import {
PORTFOLIO_PROCESS_JOB_NAME,
PORTFOLIO_PROCESS_JOB_OPTIONS
} from '@ghostfolio/common/config';
import {
DATE_FORMAT,
getSum,
@ -59,6 +64,7 @@ export abstract class PortfolioCalculator {
private endDate: Date;
private exchangeRateDataService: ExchangeRateDataService;
private filters: Filter[];
private portfolioService: PortfolioSnapshotService;
private redisCacheService: RedisCacheService;
private snapshot: PortfolioSnapshot;
private snapshotPromise: Promise<void>;
@ -74,6 +80,7 @@ export abstract class PortfolioCalculator {
currentRateService,
exchangeRateDataService,
filters,
portfolioService,
redisCacheService,
userId
}: {
@ -84,6 +91,7 @@ export abstract class PortfolioCalculator {
currentRateService: CurrentRateService;
exchangeRateDataService: ExchangeRateDataService;
filters: Filter[];
portfolioService: PortfolioSnapshotService;
redisCacheService: RedisCacheService;
userId: string;
}) {
@ -132,6 +140,7 @@ export abstract class PortfolioCalculator {
return a.date?.localeCompare(b.date);
});
this.portfolioService = portfolioService;
this.redisCacheService = redisCacheService;
this.userId = userId;
@ -1069,10 +1078,36 @@ export abstract class PortfolioCalculator {
if (isCachedPortfolioSnapshotExpired) {
// Compute in the background
this.portfolioService.addJobToQueue({
data: {
userId: this.userId
},
name: PORTFOLIO_PROCESS_JOB_NAME,
opts: {
...PORTFOLIO_PROCESS_JOB_OPTIONS
// jobId
// priority
}
});
this.computeAndCacheSnapshot();
}
} else {
// Wait for computation
// TODO
const job = await this.portfolioService.addJobToQueue({
data: {
userId: this.userId
},
name: PORTFOLIO_PROCESS_JOB_NAME,
opts: {
...PORTFOLIO_PROCESS_JOB_OPTIONS
// jobId
// priority
}
});
await job.finished();
this.snapshot = await this.computeAndCacheSnapshot();
}
}

2
apps/api/src/app/portfolio/portfolio.module.ts

@ -15,6 +15,7 @@ import { DataProviderModule } from '@ghostfolio/api/services/data-provider/data-
import { ExchangeRateDataModule } from '@ghostfolio/api/services/exchange-rate-data/exchange-rate-data.module';
import { ImpersonationModule } from '@ghostfolio/api/services/impersonation/impersonation.module';
import { MarketDataModule } from '@ghostfolio/api/services/market-data/market-data.module';
import { PortfolioSnapshotQueueModule } from '@ghostfolio/api/services/portfolio-snapshot/portfolio-snapshot.module';
import { PrismaModule } from '@ghostfolio/api/services/prisma/prisma.module';
import { SymbolProfileModule } from '@ghostfolio/api/services/symbol-profile/symbol-profile.module';
@ -40,6 +41,7 @@ import { RulesService } from './rules.service';
MarketDataModule,
OrderModule,
PerformanceLoggingModule,
PortfolioSnapshotQueueModule,
PrismaModule,
RedactValuesInResponseModule,
RedisCacheModule,

13
apps/api/src/services/data-gathering/data-gathering.processor.ts

@ -4,7 +4,7 @@ import { MarketDataService } from '@ghostfolio/api/services/market-data/market-d
import {
DATA_GATHERING_QUEUE,
GATHER_ASSET_PROFILE_PROCESS,
GATHER_HISTORICAL_MARKET_DATA_PROCESS
GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME
} from '@ghostfolio/common/config';
import { DATE_FORMAT, getStartOfUtcDate } from '@ghostfolio/common/helper';
import { AssetProfileIdentifier } from '@ghostfolio/common/interfaces';
@ -58,7 +58,10 @@ export class DataGatheringProcessor {
}
}
@Process({ concurrency: 1, name: GATHER_HISTORICAL_MARKET_DATA_PROCESS })
@Process({
concurrency: 1,
name: GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME
})
public async gatherHistoricalMarketData(job: Job<IDataGatheringItem>) {
try {
const { dataSource, date, symbol } = job.data;
@ -69,7 +72,7 @@ export class DataGatheringProcessor {
currentDate,
DATE_FORMAT
)}`,
`DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS})`
`DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME})`
);
const historicalData = await this.dataProviderService.getHistoricalRaw({
@ -123,12 +126,12 @@ export class DataGatheringProcessor {
currentDate,
DATE_FORMAT
)}`,
`DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS})`
`DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME})`
);
} catch (error) {
Logger.error(
error,
`DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS})`
`DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME})`
);
throw new Error(error);

8
apps/api/src/services/data-gathering/data-gathering.service.ts

@ -11,8 +11,8 @@ import {
DATA_GATHERING_QUEUE_PRIORITY_HIGH,
DATA_GATHERING_QUEUE_PRIORITY_LOW,
DATA_GATHERING_QUEUE_PRIORITY_MEDIUM,
GATHER_HISTORICAL_MARKET_DATA_PROCESS,
GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS,
GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME,
GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_OPTIONS,
PROPERTY_BENCHMARKS
} from '@ghostfolio/common/config';
import {
@ -279,9 +279,9 @@ export class DataGatheringService {
date,
symbol
},
name: GATHER_HISTORICAL_MARKET_DATA_PROCESS,
name: GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME,
opts: {
...GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS,
...GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_OPTIONS,
priority,
jobId: `${getAssetProfileIdentifier({
dataSource,

3
apps/api/src/services/portfolio-snapshot/interfaces/portfolio-snapshot-queue-job.interface.ts

@ -0,0 +1,3 @@
export interface IPortfolioSnapshotQueueJob {
userId: string;
}

36
apps/api/src/services/portfolio-snapshot/portfolio-snapshot.module.ts

@ -0,0 +1,36 @@
import { ConfigurationModule } from '@ghostfolio/api/services/configuration/configuration.module';
import { DataProviderModule } from '@ghostfolio/api/services/data-provider/data-provider.module';
import { ExchangeRateDataModule } from '@ghostfolio/api/services/exchange-rate-data/exchange-rate-data.module';
import { MarketDataModule } from '@ghostfolio/api/services/market-data/market-data.module';
import { PortfolioSnapshotService } from '@ghostfolio/api/services/portfolio-snapshot/portfolio-snapshot.service';
import { PrismaModule } from '@ghostfolio/api/services/prisma/prisma.module';
import { PropertyModule } from '@ghostfolio/api/services/property/property.module';
import { SymbolProfileModule } from '@ghostfolio/api/services/symbol-profile/symbol-profile.module';
import { PORTFOLIO_SNAPSHOT_QUEUE } from '@ghostfolio/common/config';
import { BullModule } from '@nestjs/bull';
import { Module } from '@nestjs/common';
import { PortfolioSnapshotProcessor } from './portfolio-snapshot.processor';
@Module({
imports: [
BullModule.registerQueue({
// limiter: {
// duration: ms('4 seconds'),
// max: 1
// },
name: PORTFOLIO_SNAPSHOT_QUEUE
}),
ConfigurationModule,
DataProviderModule,
ExchangeRateDataModule,
MarketDataModule,
PrismaModule,
PropertyModule,
SymbolProfileModule
],
providers: [PortfolioSnapshotProcessor, PortfolioSnapshotService],
exports: [BullModule, PortfolioSnapshotService]
})
export class PortfolioSnapshotQueueModule {}

42
apps/api/src/services/portfolio-snapshot/portfolio-snapshot.processor.ts

@ -0,0 +1,42 @@
import {
PORTFOLIO_PROCESS_JOB_NAME,
PORTFOLIO_SNAPSHOT_QUEUE
} from '@ghostfolio/common/config';
import { Process, Processor } from '@nestjs/bull';
import { Injectable, Logger } from '@nestjs/common';
import { Job } from 'bull';
import ms from 'ms';
import { setTimeout } from 'timers/promises';
import { IPortfolioSnapshotQueueJob } from './interfaces/portfolio-snapshot-queue-job.interface';
@Injectable()
@Processor(PORTFOLIO_SNAPSHOT_QUEUE)
export class PortfolioSnapshotProcessor {
public constructor() {}
@Process({ concurrency: 1, name: PORTFOLIO_PROCESS_JOB_NAME })
public async calculatePortfolioSnapshot(
job: Job<IPortfolioSnapshotQueueJob>
) {
try {
Logger.log(
`Portfolio snapshot calculation of user ${job.data.userId} has been started`,
`PortfolioProcessor (${PORTFOLIO_PROCESS_JOB_NAME})`
);
// TODO: Do something
await setTimeout(ms('1 second'));
Logger.log(
`Portfolio snapshot calculation of user ${job.data.userId} has been completed`,
`PortfolioProcessor (${PORTFOLIO_PROCESS_JOB_NAME})`
);
} catch (error) {
Logger.error(error, `PortfolioProcessor (${PORTFOLIO_PROCESS_JOB_NAME})`);
throw new Error(error);
}
}
}

33
apps/api/src/services/portfolio-snapshot/portfolio-snapshot.service.ts

@ -0,0 +1,33 @@
import { PORTFOLIO_SNAPSHOT_QUEUE } from '@ghostfolio/common/config';
import { InjectQueue } from '@nestjs/bull';
import { Injectable } from '@nestjs/common';
import { JobOptions, Queue } from 'bull';
import { IPortfolioSnapshotQueueJob } from './interfaces/portfolio-snapshot-queue-job.interface';
@Injectable()
export class PortfolioSnapshotService {
public constructor(
@InjectQueue(PORTFOLIO_SNAPSHOT_QUEUE)
private readonly portfolioSnapshotQueue: Queue
) {}
public async addJobToQueue({
data,
name,
opts
}: {
data: IPortfolioSnapshotQueueJob;
name: string;
opts?: JobOptions;
}) {
return this.portfolioSnapshotQueue.add(name, data, opts);
}
// public async addJobsToQueue(
// jobs: { data: IPortfolioSnapshotQueueJob; name: string; opts?: JobOptions }[]
// ) {
// return this.portfolioSnapshotQueue.addBulk(jobs);
// }
}

15
libs/common/src/lib/config.ts

@ -40,6 +40,8 @@ export const DATA_GATHERING_QUEUE_PRIORITY_MEDIUM = Math.round(
DATA_GATHERING_QUEUE_PRIORITY_LOW / 2
);
export const PORTFOLIO_SNAPSHOT_QUEUE = 'PORTFOLIO_SNAPSHOT_QUEUE';
export const DEFAULT_CURRENCY = 'USD';
export const DEFAULT_DATE_FORMAT_MONTH_YEAR = 'MMM yyyy';
export const DEFAULT_LANGUAGE_CODE = 'en';
@ -76,9 +78,9 @@ export const GATHER_ASSET_PROFILE_PROCESS_OPTIONS: JobOptions = {
},
removeOnComplete: true
};
export const GATHER_HISTORICAL_MARKET_DATA_PROCESS =
export const GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME =
'GATHER_HISTORICAL_MARKET_DATA';
export const GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS: JobOptions = {
export const GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_OPTIONS: JobOptions = {
attempts: 12,
backoff: {
delay: ms('1 minute'),
@ -86,6 +88,15 @@ export const GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS: JobOptions = {
},
removeOnComplete: true
};
export const PORTFOLIO_PROCESS_JOB_NAME = 'PORTFOLIO';
export const PORTFOLIO_PROCESS_JOB_OPTIONS: JobOptions = {
// attempts: 12,
// backoff: {
// delay: ms('1 minute'),
// type: 'exponential'
// },
removeOnComplete: true
};
export const HEADER_KEY_IMPERSONATION = 'Impersonation-Id';
export const HEADER_KEY_TIMEZONE = 'Timezone';

Loading…
Cancel
Save