Browse Source

Set up statistics gathering queue

pull/6696/head
Thomas Kaul 2 months ago
parent
commit
969f810b53
  1. 7
      apps/api/src/app/admin/queue/queue.module.ts
  2. 24
      apps/api/src/app/admin/queue/queue.service.ts
  3. 15
      apps/api/src/app/info/info.service.ts
  4. 2
      apps/api/src/services/cron/cron.service.ts
  5. 135
      apps/api/src/services/queues/statistics-gathering/statistics-gathering.processor.ts
  6. 46
      apps/api/src/services/queues/statistics-gathering/statistics-gathering.service.ts
  7. 13
      libs/common/src/lib/config.ts

7
apps/api/src/app/admin/queue/queue.module.ts

@ -1,5 +1,6 @@
import { DataGatheringModule } from '@ghostfolio/api/services/queues/data-gathering/data-gathering.module'; import { DataGatheringModule } from '@ghostfolio/api/services/queues/data-gathering/data-gathering.module';
import { PortfolioSnapshotQueueModule } from '@ghostfolio/api/services/queues/portfolio-snapshot/portfolio-snapshot.module'; import { PortfolioSnapshotQueueModule } from '@ghostfolio/api/services/queues/portfolio-snapshot/portfolio-snapshot.module';
import { StatisticsGatheringModule } from '@ghostfolio/api/services/queues/statistics-gathering/statistics-gathering.module';
import { Module } from '@nestjs/common'; import { Module } from '@nestjs/common';
@ -8,7 +9,11 @@ import { QueueService } from './queue.service';
@Module({ @Module({
controllers: [QueueController], controllers: [QueueController],
imports: [DataGatheringModule, PortfolioSnapshotQueueModule], imports: [
DataGatheringModule,
PortfolioSnapshotQueueModule,
StatisticsGatheringModule
],
providers: [QueueService] providers: [QueueService]
}) })
export class QueueModule {} export class QueueModule {}

24
apps/api/src/app/admin/queue/queue.service.ts

@ -1,7 +1,8 @@
import { import {
DATA_GATHERING_QUEUE, DATA_GATHERING_QUEUE,
PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE, PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE,
QUEUE_JOB_STATUS_LIST QUEUE_JOB_STATUS_LIST,
STATISTICS_GATHERING_QUEUE
} from '@ghostfolio/common/config'; } from '@ghostfolio/common/config';
import { AdminJobs } from '@ghostfolio/common/interfaces'; import { AdminJobs } from '@ghostfolio/common/interfaces';
@ -15,7 +16,9 @@ export class QueueService {
@InjectQueue(DATA_GATHERING_QUEUE) @InjectQueue(DATA_GATHERING_QUEUE)
private readonly dataGatheringQueue: Queue, private readonly dataGatheringQueue: Queue,
@InjectQueue(PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE) @InjectQueue(PORTFOLIO_SNAPSHOT_COMPUTATION_QUEUE)
private readonly portfolioSnapshotQueue: Queue private readonly portfolioSnapshotQueue: Queue,
@InjectQueue(STATISTICS_GATHERING_QUEUE)
private readonly statisticsGatheringQueue: Queue
) {} ) {}
public async deleteJob(aId: string) { public async deleteJob(aId: string) {
@ -38,6 +41,7 @@ export class QueueService {
await this.dataGatheringQueue.clean(300, queueStatus); await this.dataGatheringQueue.clean(300, queueStatus);
await this.portfolioSnapshotQueue.clean(300, queueStatus); await this.portfolioSnapshotQueue.clean(300, queueStatus);
await this.statisticsGatheringQueue.clean(300, queueStatus);
} }
} }
@ -58,13 +62,19 @@ export class QueueService {
limit?: number; limit?: number;
status?: JobStatus[]; status?: JobStatus[];
}): Promise<AdminJobs> { }): Promise<AdminJobs> {
const [dataGatheringJobs, portfolioSnapshotJobs] = await Promise.all([ const [dataGatheringJobs, portfolioSnapshotJobs, statisticsGatheringJobs] =
this.dataGatheringQueue.getJobs(status), await Promise.all([
this.portfolioSnapshotQueue.getJobs(status) this.dataGatheringQueue.getJobs(status),
]); this.portfolioSnapshotQueue.getJobs(status),
this.statisticsGatheringQueue.getJobs(status)
]);
const jobsWithState = await Promise.all( const jobsWithState = await Promise.all(
[...dataGatheringJobs, ...portfolioSnapshotJobs] [
...dataGatheringJobs,
...portfolioSnapshotJobs,
...statisticsGatheringJobs
]
.filter((job) => { .filter((job) => {
return job; return job;
}) })

15
apps/api/src/app/info/info.service.ts

@ -24,6 +24,7 @@ import { permissions } from '@ghostfolio/common/permissions';
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { JwtService } from '@nestjs/jwt'; import { JwtService } from '@nestjs/jwt';
import { subDays } from 'date-fns'; import { subDays } from 'date-fns';
import { isNil } from 'lodash';
@Injectable() @Injectable()
export class InfoService { export class InfoService {
@ -235,10 +236,16 @@ export class InfoService {
uptime: uptime ? Number.parseFloat(uptime) : undefined uptime: uptime ? Number.parseFloat(uptime) : undefined
}; };
await this.redisCacheService.set( if (
InfoService.CACHE_KEY_STATISTICS, Object.values(statistics).every((value) => {
JSON.stringify(statistics) return !isNil(value);
); })
) {
await this.redisCacheService.set(
InfoService.CACHE_KEY_STATISTICS,
JSON.stringify(statistics)
);
}
return statistics; return statistics;
} }

2
apps/api/src/services/cron/cron.service.ts

@ -34,7 +34,7 @@ export class CronService {
@Cron(CronExpression.EVERY_HOUR) @Cron(CronExpression.EVERY_HOUR)
public async runEveryHour() { public async runEveryHour() {
await this.statisticsGatheringService?.addJobToQueue(); await this.statisticsGatheringService?.addJobsToQueue();
} }
@Cron(CronService.EVERY_HOUR_AT_RANDOM_MINUTE) @Cron(CronService.EVERY_HOUR_AT_RANDOM_MINUTE)

135
apps/api/src/services/queues/statistics-gathering/statistics-gathering.processor.ts

@ -1,7 +1,10 @@
import { ConfigurationService } from '@ghostfolio/api/services/configuration/configuration.service'; import { ConfigurationService } from '@ghostfolio/api/services/configuration/configuration.service';
import { PropertyService } from '@ghostfolio/api/services/property/property.service'; import { PropertyService } from '@ghostfolio/api/services/property/property.service';
import { import {
GATHER_STATISTICS_PROCESS_JOB_NAME, GATHER_STATISTICS_DOCKER_HUB_PULLS_PROCESS_JOB_NAME,
GATHER_STATISTICS_GITHUB_CONTRIBUTORS_PROCESS_JOB_NAME,
GATHER_STATISTICS_GITHUB_STARGAZERS_PROCESS_JOB_NAME,
GATHER_STATISTICS_UPTIME_PROCESS_JOB_NAME,
HEADER_KEY_TOKEN, HEADER_KEY_TOKEN,
PROPERTY_BETTER_UPTIME_MONITOR_ID, PROPERTY_BETTER_UPTIME_MONITOR_ID,
PROPERTY_DOCKER_HUB_PULLS, PROPERTY_DOCKER_HUB_PULLS,
@ -28,46 +31,82 @@ export class StatisticsGatheringProcessor {
private readonly propertyService: PropertyService private readonly propertyService: PropertyService
) {} ) {}
@Process(GATHER_STATISTICS_PROCESS_JOB_NAME) @Process(GATHER_STATISTICS_DOCKER_HUB_PULLS_PROCESS_JOB_NAME)
public async gatherStatistics() { public async gatherDockerHubPullsStatistics() {
Logger.log( Logger.log(
'Statistics gathering has been started', 'Docker Hub pulls statistics gathering has been started',
'StatisticsGatheringProcessor' 'StatisticsGatheringProcessor'
); );
const [dockerHubPulls, gitHubContributors, gitHubStargazers, uptime] = const dockerHubPulls = await this.countDockerHubPulls();
await Promise.all([
this.countDockerHubPulls(), await this.propertyService.put({
this.countGitHubContributors(), key: PROPERTY_DOCKER_HUB_PULLS,
this.countGitHubStargazers(), value: String(dockerHubPulls)
this.getUptime() });
]);
Logger.log(
await Promise.all([ 'Docker Hub pulls statistics gathering has been completed',
dockerHubPulls !== undefined && 'StatisticsGatheringProcessor'
this.propertyService.put({ );
key: PROPERTY_DOCKER_HUB_PULLS, }
value: String(dockerHubPulls)
}), @Process(GATHER_STATISTICS_GITHUB_CONTRIBUTORS_PROCESS_JOB_NAME)
gitHubContributors !== undefined && public async gatherGitHubContributorsStatistics() {
this.propertyService.put({ Logger.log(
key: PROPERTY_GITHUB_CONTRIBUTORS, 'GitHub contributors statistics gathering has been started',
value: String(gitHubContributors) 'StatisticsGatheringProcessor'
}), );
gitHubStargazers !== undefined &&
this.propertyService.put({ const gitHubContributors = await this.countGitHubContributors();
key: PROPERTY_GITHUB_STARGAZERS,
value: String(gitHubStargazers) await this.propertyService.put({
}), key: PROPERTY_GITHUB_CONTRIBUTORS,
uptime !== undefined && value: String(gitHubContributors)
this.propertyService.put({ });
key: PROPERTY_UPTIME,
value: String(uptime) Logger.log(
}) 'GitHub contributors statistics gathering has been completed',
]); 'StatisticsGatheringProcessor'
);
}
@Process(GATHER_STATISTICS_GITHUB_STARGAZERS_PROCESS_JOB_NAME)
public async gatherGitHubStargazersStatistics() {
Logger.log(
'GitHub stargazers statistics gathering has been started',
'StatisticsGatheringProcessor'
);
const gitHubStargazers = await this.countGitHubStargazers();
await this.propertyService.put({
key: PROPERTY_GITHUB_STARGAZERS,
value: String(gitHubStargazers)
});
Logger.log(
'GitHub stargazers statistics gathering has been completed',
'StatisticsGatheringProcessor'
);
}
@Process(GATHER_STATISTICS_UPTIME_PROCESS_JOB_NAME)
public async gatherUptimeStatistics() {
Logger.log(
'Uptime statistics gathering has been started',
'StatisticsGatheringProcessor'
);
const uptime = await this.getUptime();
await this.propertyService.put({
key: PROPERTY_UPTIME,
value: String(uptime)
});
Logger.log( Logger.log(
'Statistics gathering has been completed', 'Uptime statistics gathering has been completed',
'StatisticsGatheringProcessor' 'StatisticsGatheringProcessor'
); );
} }
@ -88,7 +127,7 @@ export class StatisticsGatheringProcessor {
} catch (error) { } catch (error) {
Logger.error(error, 'StatisticsGatheringProcessor - DockerHub'); Logger.error(error, 'StatisticsGatheringProcessor - DockerHub');
return undefined; throw error;
} }
} }
@ -102,15 +141,29 @@ export class StatisticsGatheringProcessor {
const $ = cheerio.load(body); const $ = cheerio.load(body);
return extractNumberFromString({ console.log(
value: $( $(
'a[href="/ghostfolio/ghostfolio/graphs/contributors"] .Counter' 'a[href="/ghostfolio/ghostfolio/graphs/contributors"] .Counter'
).text() ).text()
);
const value = $(
'a[href="/ghostfolio/ghostfolio/graphs/contributors"] .Counter'
).text();
if (!value) {
throw new Error(
'Could not find the number of contributors in the page'
);
}
return extractNumberFromString({
value
}); });
} catch (error) { } catch (error) {
Logger.error(error, 'StatisticsGatheringProcessor - GitHub'); Logger.error(error, 'StatisticsGatheringProcessor - GitHub');
return undefined; throw error;
} }
} }
@ -130,7 +183,7 @@ export class StatisticsGatheringProcessor {
} catch (error) { } catch (error) {
Logger.error(error, 'StatisticsGatheringProcessor - GitHub'); Logger.error(error, 'StatisticsGatheringProcessor - GitHub');
return undefined; throw error;
} }
} }
@ -161,7 +214,7 @@ export class StatisticsGatheringProcessor {
} catch (error) { } catch (error) {
Logger.error(error, 'StatisticsGatheringProcessor - Better Stack'); Logger.error(error, 'StatisticsGatheringProcessor - Better Stack');
return undefined; throw error;
} }
} }
} }

46
apps/api/src/services/queues/statistics-gathering/statistics-gathering.service.ts

@ -1,6 +1,9 @@
import { import {
GATHER_STATISTICS_PROCESS_JOB_NAME, GATHER_STATISTICS_DOCKER_HUB_PULLS_PROCESS_JOB_NAME,
GATHER_STATISTICS_GITHUB_CONTRIBUTORS_PROCESS_JOB_NAME,
GATHER_STATISTICS_GITHUB_STARGAZERS_PROCESS_JOB_NAME,
GATHER_STATISTICS_PROCESS_JOB_OPTIONS, GATHER_STATISTICS_PROCESS_JOB_OPTIONS,
GATHER_STATISTICS_UPTIME_PROCESS_JOB_NAME,
STATISTICS_GATHERING_QUEUE STATISTICS_GATHERING_QUEUE
} from '@ghostfolio/common/config'; } from '@ghostfolio/common/config';
@ -15,11 +18,40 @@ export class StatisticsGatheringService {
private readonly statisticsGatheringQueue: Queue private readonly statisticsGatheringQueue: Queue
) {} ) {}
public async addJobToQueue() { public async addJobsToQueue() {
return this.statisticsGatheringQueue.add( return Promise.all([
GATHER_STATISTICS_PROCESS_JOB_NAME, this.statisticsGatheringQueue.add(
{}, GATHER_STATISTICS_DOCKER_HUB_PULLS_PROCESS_JOB_NAME,
GATHER_STATISTICS_PROCESS_JOB_OPTIONS {},
); {
...GATHER_STATISTICS_PROCESS_JOB_OPTIONS,
jobId: GATHER_STATISTICS_DOCKER_HUB_PULLS_PROCESS_JOB_NAME
}
),
this.statisticsGatheringQueue.add(
GATHER_STATISTICS_GITHUB_CONTRIBUTORS_PROCESS_JOB_NAME,
{},
{
...GATHER_STATISTICS_PROCESS_JOB_OPTIONS,
jobId: GATHER_STATISTICS_GITHUB_CONTRIBUTORS_PROCESS_JOB_NAME
}
),
this.statisticsGatheringQueue.add(
GATHER_STATISTICS_GITHUB_STARGAZERS_PROCESS_JOB_NAME,
{},
{
...GATHER_STATISTICS_PROCESS_JOB_OPTIONS,
jobId: GATHER_STATISTICS_GITHUB_STARGAZERS_PROCESS_JOB_NAME
}
),
this.statisticsGatheringQueue.add(
GATHER_STATISTICS_UPTIME_PROCESS_JOB_NAME,
{},
{
...GATHER_STATISTICS_PROCESS_JOB_OPTIONS,
jobId: GATHER_STATISTICS_UPTIME_PROCESS_JOB_NAME
}
)
]);
} }
} }

13
libs/common/src/lib/config.ts

@ -185,7 +185,6 @@ export const GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_OPTIONS: JobOptions = {
removeOnComplete: true removeOnComplete: true
}; };
export const GATHER_STATISTICS_PROCESS_JOB_NAME = 'GATHER_STATISTICS';
export const GATHER_STATISTICS_PROCESS_JOB_OPTIONS: JobOptions = { export const GATHER_STATISTICS_PROCESS_JOB_OPTIONS: JobOptions = {
attempts: 5, attempts: 5,
backoff: { backoff: {
@ -195,6 +194,18 @@ export const GATHER_STATISTICS_PROCESS_JOB_OPTIONS: JobOptions = {
removeOnComplete: true removeOnComplete: true
}; };
export const GATHER_STATISTICS_DOCKER_HUB_PULLS_PROCESS_JOB_NAME =
'GATHER_STATISTICS_DOCKER_HUB_PULLS';
export const GATHER_STATISTICS_GITHUB_CONTRIBUTORS_PROCESS_JOB_NAME =
'GATHER_STATISTICS_GITHUB_CONTRIBUTORS';
export const GATHER_STATISTICS_GITHUB_STARGAZERS_PROCESS_JOB_NAME =
'GATHER_STATISTICS_GITHUB_STARGAZERS';
export const GATHER_STATISTICS_UPTIME_PROCESS_JOB_NAME =
'GATHER_STATISTICS_UPTIME';
export const INVESTMENT_ACTIVITY_TYPES = [ export const INVESTMENT_ACTIVITY_TYPES = [
Type.BUY, Type.BUY,
Type.DIVIDEND, Type.DIVIDEND,

Loading…
Cancel
Save