From c5654b084b28440fe0b29a2c670d9f097cfc0ef4 Mon Sep 17 00:00:00 2001 From: Thomas Kaul <4159106+dtslvr@users.noreply.github.com> Date: Sat, 21 Sep 2024 18:14:28 +0200 Subject: [PATCH] Expose concurrency of data gathering processor * PROCESSOR_CONCURRENCY_GATHER_ASSET_PROFILE * PROCESSOR_CONCURRENCY_GATHER_HISTORICAL_MARKET_DATA --- .../configuration/configuration.service.ts | 8 ++++++++ .../data-gathering/data-gathering.processor.ts | 17 +++++++++++++++-- libs/common/src/lib/config.ts | 2 ++ 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/apps/api/src/services/configuration/configuration.service.ts b/apps/api/src/services/configuration/configuration.service.ts index 0f9246107..cca393a2a 100644 --- a/apps/api/src/services/configuration/configuration.service.ts +++ b/apps/api/src/services/configuration/configuration.service.ts @@ -1,6 +1,8 @@ import { Environment } from '@ghostfolio/api/services/interfaces/environment.interface'; import { CACHE_TTL_NO_CACHE, + DEFAULT_PROCESSOR_CONCURRENCY_GATHER_ASSET_PROFILE, + DEFAULT_PROCESSOR_CONCURRENCY_GATHER_HISTORICAL_MARKET_DATA, DEFAULT_PROCESSOR_CONCURRENCY_PORTFOLIO_SNAPSHOT, DEFAULT_ROOT_URL } from '@ghostfolio/common/config'; @@ -48,6 +50,12 @@ export class ConfigurationService { MAX_ACTIVITIES_TO_IMPORT: num({ default: Number.MAX_SAFE_INTEGER }), MAX_CHART_ITEMS: num({ default: 365 }), PORT: port({ default: 3333 }), + PROCESSOR_CONCURRENCY_GATHER_ASSET_PROFILE: num({ + default: DEFAULT_PROCESSOR_CONCURRENCY_GATHER_ASSET_PROFILE + }), + PROCESSOR_CONCURRENCY_GATHER_HISTORICAL_MARKET_DATA: num({ + default: DEFAULT_PROCESSOR_CONCURRENCY_GATHER_HISTORICAL_MARKET_DATA + }), PROCESSOR_CONCURRENCY_PORTFOLIO_SNAPSHOT: num({ default: DEFAULT_PROCESSOR_CONCURRENCY_PORTFOLIO_SNAPSHOT }), diff --git a/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts b/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts index 62f52d45b..2745aa288 100644 --- a/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts +++ b/apps/api/src/services/queues/data-gathering/data-gathering.processor.ts @@ -3,6 +3,8 @@ import { IDataGatheringItem } from '@ghostfolio/api/services/interfaces/interfac import { MarketDataService } from '@ghostfolio/api/services/market-data/market-data.service'; import { DATA_GATHERING_QUEUE, + DEFAULT_PROCESSOR_CONCURRENCY_GATHER_ASSET_PROFILE, + DEFAULT_PROCESSOR_CONCURRENCY_GATHER_HISTORICAL_MARKET_DATA, GATHER_ASSET_PROFILE_PROCESS, GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME } from '@ghostfolio/common/config'; @@ -34,7 +36,14 @@ export class DataGatheringProcessor { private readonly marketDataService: MarketDataService ) {} - @Process({ concurrency: 1, name: GATHER_ASSET_PROFILE_PROCESS }) + @Process({ + concurrency: parseInt( + process.env.PROCESSOR_CONCURRENCY_GATHER_ASSET_PROFILE ?? + DEFAULT_PROCESSOR_CONCURRENCY_GATHER_ASSET_PROFILE.toString(), + 10 + ), + name: GATHER_ASSET_PROFILE_PROCESS + }) public async gatherAssetProfile(job: Job) { try { Logger.log( @@ -59,7 +68,11 @@ export class DataGatheringProcessor { } @Process({ - concurrency: 1, + concurrency: parseInt( + process.env.PROCESSOR_CONCURRENCY_GATHER_HISTORICAL_MARKET_DATA ?? + DEFAULT_PROCESSOR_CONCURRENCY_GATHER_HISTORICAL_MARKET_DATA.toString(), + 10 + ), name: GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME }) public async gatherHistoricalMarketData(job: Job) { diff --git a/libs/common/src/lib/config.ts b/libs/common/src/lib/config.ts index 6fc5650cc..19ec965fa 100644 --- a/libs/common/src/lib/config.ts +++ b/libs/common/src/lib/config.ts @@ -48,6 +48,8 @@ export const DEFAULT_CURRENCY = 'USD'; export const DEFAULT_DATE_FORMAT_MONTH_YEAR = 'MMM yyyy'; export const DEFAULT_LANGUAGE_CODE = 'en'; export const DEFAULT_PAGE_SIZE = 50; +export const DEFAULT_PROCESSOR_CONCURRENCY_GATHER_ASSET_PROFILE = 1; +export const DEFAULT_PROCESSOR_CONCURRENCY_GATHER_HISTORICAL_MARKET_DATA = 1; export const DEFAULT_PROCESSOR_CONCURRENCY_PORTFOLIO_SNAPSHOT = 1; export const DEFAULT_ROOT_URL = 'https://localhost:4200';