Browse Source

Migrate historical market data gathering to queue

pull/991/head
Thomas 3 years ago
parent
commit
00a2cca666
  1. 59
      apps/api/src/app/admin/admin.controller.ts
  2. 22
      apps/api/src/app/admin/queue/queue.controller.ts
  3. 21
      apps/api/src/app/admin/queue/queue.service.ts
  4. 25
      apps/api/src/app/order/order.service.ts
  5. 21
      apps/api/src/services/cron.service.ts
  6. 5
      apps/api/src/services/data-gathering.module.ts
  7. 110
      apps/api/src/services/data-gathering.processor.ts
  8. 130
      apps/api/src/services/data-gathering.service.ts
  9. 5
      apps/api/src/services/interfaces/interfaces.ts
  10. 9
      apps/client/src/app/components/admin-jobs/admin-jobs.component.ts
  11. 50
      apps/client/src/app/components/admin-jobs/admin-jobs.html
  12. 6
      apps/client/src/app/services/admin.service.ts
  13. 4
      libs/common/src/lib/config.ts
  14. 15
      libs/common/src/lib/interfaces/admin-jobs.interface.ts

59
apps/api/src/app/admin/admin.controller.ts

@ -3,6 +3,7 @@ import { MarketDataService } from '@ghostfolio/api/services/market-data.service'
import { PropertyDto } from '@ghostfolio/api/services/property/property.dto';
import {
DATA_GATHERING_QUEUE,
DATA_GATHERING_QUEUE_PRIORITY_HIGH,
GATHER_ASSET_PROFILE_PROCESS
} from '@ghostfolio/common/config';
import {
@ -31,6 +32,7 @@ import { DataSource, MarketData } from '@prisma/client';
import { Queue } from 'bull';
import { isDate } from 'date-fns';
import { StatusCodes, getReasonPhrase } from 'http-status-codes';
import ms from 'ms';
import { AdminService } from './admin.service';
import { UpdateMarketDataDto } from './update-market-data.dto';
@ -82,10 +84,21 @@ export class AdminController {
const uniqueAssets = await this.dataGatheringService.getUniqueAssets();
for (const { dataSource, symbol } of uniqueAssets) {
await this.dataGatheringQueue.add(GATHER_ASSET_PROFILE_PROCESS, {
dataSource,
symbol
});
await this.dataGatheringQueue.add(
GATHER_ASSET_PROFILE_PROCESS,
{
dataSource,
symbol
},
{
attempts: 20,
backoff: {
delay: ms('1 minute'),
type: 'exponential'
},
priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH
}
);
}
this.dataGatheringService.gatherMax();
@ -109,10 +122,21 @@ export class AdminController {
const uniqueAssets = await this.dataGatheringService.getUniqueAssets();
for (const { dataSource, symbol } of uniqueAssets) {
await this.dataGatheringQueue.add(GATHER_ASSET_PROFILE_PROCESS, {
dataSource,
symbol
});
await this.dataGatheringQueue.add(
GATHER_ASSET_PROFILE_PROCESS,
{
dataSource,
symbol
},
{
attempts: 20,
backoff: {
delay: ms('1 minute'),
type: 'exponential'
},
priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH
}
);
}
}
@ -134,10 +158,21 @@ export class AdminController {
);
}
await this.dataGatheringQueue.add(GATHER_ASSET_PROFILE_PROCESS, {
dataSource,
symbol
});
await this.dataGatheringQueue.add(
GATHER_ASSET_PROFILE_PROCESS,
{
dataSource,
symbol
},
{
attempts: 20,
backoff: {
delay: ms('1 minute'),
type: 'exponential'
},
priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH
}
);
}
@Post('gather/:dataSource/:symbol')

22
apps/api/src/app/admin/queue/queue.controller.ts

@ -3,9 +3,11 @@ import { hasPermission, permissions } from '@ghostfolio/common/permissions';
import type { RequestWithUser } from '@ghostfolio/common/types';
import {
Controller,
Delete,
Get,
HttpException,
Inject,
Param,
UseGuards
} from '@nestjs/common';
import { REQUEST } from '@nestjs/core';
@ -21,7 +23,7 @@ export class QueueController {
@Inject(REQUEST) private readonly request: RequestWithUser
) {}
@Get('jobs')
@Get('job')
@UseGuards(AuthGuard('jwt'))
public async getJobs(): Promise<AdminJobs> {
if (
@ -38,4 +40,22 @@ export class QueueController {
return this.queueService.getJobs({});
}
@Delete('job/:id')
@UseGuards(AuthGuard('jwt'))
public async deleteJob(@Param('id') id: string): Promise<void> {
if (
!hasPermission(
this.request.user.permissions,
permissions.accessAdminControl
)
) {
throw new HttpException(
getReasonPhrase(StatusCodes.FORBIDDEN),
StatusCodes.FORBIDDEN
);
}
return this.queueService.deleteJob(id);
}
}

21
apps/api/src/app/admin/queue/queue.service.ts

@ -11,6 +11,10 @@ export class QueueService {
private readonly dataGatheringQueue: Queue
) {}
public async deleteJob(aId: string) {
return (await this.dataGatheringQueue.getJob(aId))?.remove();
}
public async getJobs({
limit = 1000
}: {
@ -25,8 +29,23 @@ export class QueueService {
'waiting'
]);
const jobsWithState = await Promise.all(
jobs.slice(0, limit).map(async (job) => {
return {
attemptsMade: job.attemptsMade + 1,
data: job.data,
finishedOn: job.finishedOn,
id: job.id,
name: job.name,
stacktrace: job.stacktrace,
state: await job.getState(),
timestamp: job.timestamp
};
})
);
return {
jobs: jobs.slice(0, limit)
jobs: jobsWithState
};
}
}

25
apps/api/src/app/order/order.service.ts

@ -6,6 +6,7 @@ import { PrismaService } from '@ghostfolio/api/services/prisma.service';
import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile.service';
import {
DATA_GATHERING_QUEUE,
DATA_GATHERING_QUEUE_PRIORITY_HIGH,
GATHER_ASSET_PROFILE_PROCESS
} from '@ghostfolio/common/config';
import { Filter } from '@ghostfolio/common/interfaces';
@ -24,6 +25,7 @@ import Big from 'big.js';
import { Queue } from 'bull';
import { endOfToday, isAfter } from 'date-fns';
import { groupBy } from 'lodash';
import ms from 'ms';
import { v4 as uuidv4 } from 'uuid';
import { Activity } from './interfaces/activities.interface';
@ -120,10 +122,21 @@ export class OrderService {
data.SymbolProfile.connectOrCreate.create.symbol.toUpperCase();
}
await this.dataGatheringQueue.add(GATHER_ASSET_PROFILE_PROCESS, {
dataSource: data.SymbolProfile.connectOrCreate.create.dataSource,
symbol: data.SymbolProfile.connectOrCreate.create.symbol
});
await this.dataGatheringQueue.add(
GATHER_ASSET_PROFILE_PROCESS,
{
dataSource: data.SymbolProfile.connectOrCreate.create.dataSource,
symbol: data.SymbolProfile.connectOrCreate.create.symbol
},
{
attempts: 20,
backoff: {
delay: ms('1 minute'),
type: 'exponential'
},
priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH
}
);
const isDraft = isAfter(data.date as Date, endOfToday());
@ -138,8 +151,6 @@ export class OrderService {
]);
}
await this.cacheService.flush();
delete data.accountId;
delete data.assetClass;
delete data.assetSubClass;
@ -330,8 +341,6 @@ export class OrderService {
}
}
await this.cacheService.flush();
delete data.assetClass;
delete data.assetSubClass;
delete data.currency;

21
apps/api/src/services/cron.service.ts

@ -1,11 +1,13 @@
import {
DATA_GATHERING_QUEUE,
DATA_GATHERING_QUEUE_PRIORITY_HIGH,
GATHER_ASSET_PROFILE_PROCESS
} from '@ghostfolio/common/config';
import { InjectQueue } from '@nestjs/bull';
import { Injectable } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { Queue } from 'bull';
import ms from 'ms';
import { DataGatheringService } from './data-gathering.service';
import { ExchangeRateDataService } from './exchange-rate-data.service';
@ -41,10 +43,21 @@ export class CronService {
const uniqueAssets = await this.dataGatheringService.getUniqueAssets();
for (const { dataSource, symbol } of uniqueAssets) {
await this.dataGatheringQueue.add(GATHER_ASSET_PROFILE_PROCESS, {
dataSource,
symbol
});
await this.dataGatheringQueue.add(
GATHER_ASSET_PROFILE_PROCESS,
{
dataSource,
symbol
},
{
attempts: 20,
backoff: {
delay: ms('1 minute'),
type: 'exponential'
},
priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH
}
);
}
}
}

5
apps/api/src/services/data-gathering.module.ts

@ -6,6 +6,7 @@ import { PrismaModule } from '@ghostfolio/api/services/prisma.module';
import { DATA_GATHERING_QUEUE } from '@ghostfolio/common/config';
import { BullModule } from '@nestjs/bull';
import { Module } from '@nestjs/common';
import ms from 'ms';
import { DataGatheringProcessor } from './data-gathering.processor';
import { ExchangeRateDataModule } from './exchange-rate-data.module';
@ -14,6 +15,10 @@ import { SymbolProfileModule } from './symbol-profile.module';
@Module({
imports: [
BullModule.registerQueue({
limiter: {
duration: ms('1 second'),
max: 1
},
name: DATA_GATHERING_QUEUE
}),
ConfigurationModule,

110
apps/api/src/services/data-gathering.processor.ts

@ -1,19 +1,34 @@
import {
DATA_GATHERING_QUEUE,
GATHER_ASSET_PROFILE_PROCESS
GATHER_ASSET_PROFILE_PROCESS,
GATHER_HISTORICAL_MARKET_DATA_PROCESS
} from '@ghostfolio/common/config';
import { DATE_FORMAT } from '@ghostfolio/common/helper';
import { UniqueAsset } from '@ghostfolio/common/interfaces';
import { Process, Processor } from '@nestjs/bull';
import { Injectable, Logger } from '@nestjs/common';
import { Job } from 'bull';
import {
isBefore,
getYear,
getMonth,
getDate,
format,
parseISO
} from 'date-fns';
import { DataGatheringService } from './data-gathering.service';
import { DataProviderService } from './data-provider/data-provider.service';
import { IDataGatheringItem } from './interfaces/interfaces';
import { PrismaService } from './prisma.service';
@Injectable()
@Processor(DATA_GATHERING_QUEUE)
export class DataGatheringProcessor {
public constructor(
private readonly dataGatheringService: DataGatheringService
private readonly dataGatheringService: DataGatheringService,
private readonly dataProviderService: DataProviderService,
private readonly prismaService: PrismaService
) {}
@Process(GATHER_ASSET_PROFILE_PROCESS)
@ -21,7 +36,96 @@ export class DataGatheringProcessor {
try {
await this.dataGatheringService.gatherAssetProfiles([job.data]);
} catch (error) {
Logger.error(error, 'DataGatheringProcessor');
Logger.error(
error,
`DataGatheringProcessor (${GATHER_ASSET_PROFILE_PROCESS})`
);
throw new Error(error);
}
}
@Process(GATHER_HISTORICAL_MARKET_DATA_PROCESS)
public async gatherHistoricalMarketData(job: Job<IDataGatheringItem>) {
try {
const { dataSource, date, symbol } = job.data;
const historicalData = await this.dataProviderService.getHistoricalRaw(
[{ dataSource, symbol }],
parseISO(<string>(<unknown>date)),
new Date()
);
let currentDate = parseISO(<string>(<unknown>date));
let lastMarketPrice: number;
while (
isBefore(
currentDate,
new Date(
Date.UTC(
getYear(new Date()),
getMonth(new Date()),
getDate(new Date()),
0
)
)
)
) {
if (
historicalData[symbol]?.[format(currentDate, DATE_FORMAT)]
?.marketPrice
) {
lastMarketPrice =
historicalData[symbol]?.[format(currentDate, DATE_FORMAT)]
?.marketPrice;
}
if (lastMarketPrice) {
try {
await this.prismaService.marketData.create({
data: {
dataSource,
symbol,
date: new Date(
Date.UTC(
getYear(currentDate),
getMonth(currentDate),
getDate(currentDate),
0
)
),
marketPrice: lastMarketPrice
}
});
} catch {}
} else {
Logger.warn(
`Failed to gather data for symbol ${symbol} from ${dataSource} at ${format(
currentDate,
DATE_FORMAT
)}.`,
`DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS})`
);
}
// Count month one up for iteration
currentDate = new Date(
Date.UTC(
getYear(currentDate),
getMonth(currentDate),
getDate(currentDate) + 1,
0
)
);
}
} catch (error) {
Logger.error(
error,
`DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS})`
);
throw new Error(error);
}
}
}

130
apps/api/src/services/data-gathering.service.ts

@ -1,21 +1,19 @@
import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile.service';
import {
DATA_GATHERING_QUEUE,
DATA_GATHERING_QUEUE_PRIORITY_LOW,
GATHER_HISTORICAL_MARKET_DATA_PROCESS,
PROPERTY_LAST_DATA_GATHERING,
PROPERTY_LOCKED_DATA_GATHERING
} from '@ghostfolio/common/config';
import { DATE_FORMAT, resetHours } from '@ghostfolio/common/helper';
import { UniqueAsset } from '@ghostfolio/common/interfaces';
import { InjectQueue } from '@nestjs/bull';
import { Inject, Injectable, Logger } from '@nestjs/common';
import { DataSource } from '@prisma/client';
import {
differenceInHours,
format,
getDate,
getMonth,
getYear,
isBefore,
subDays
} from 'date-fns';
import { Queue } from 'bull';
import { differenceInHours, format, subDays } from 'date-fns';
import ms from 'ms';
import { DataProviderService } from './data-provider/data-provider.service';
import { DataEnhancerInterface } from './data-provider/interfaces/data-enhancer.interface';
@ -30,6 +28,8 @@ export class DataGatheringService {
public constructor(
@Inject('DataEnhancers')
private readonly dataEnhancers: DataEnhancerInterface[],
@InjectQueue(DATA_GATHERING_QUEUE)
private readonly dataGatheringQueue: Queue,
private readonly dataProviderService: DataProviderService,
private readonly exchangeRateDataService: ExchangeRateDataService,
private readonly prismaService: PrismaService,
@ -50,10 +50,10 @@ export class DataGatheringService {
}
});
const symbols = await this.getSymbols7D();
const dataGatheringItems = await this.getSymbols7D();
try {
await this.gatherSymbols(symbols);
await this.gatherSymbols(dataGatheringItems);
await this.prismaService.property.upsert({
create: {
@ -334,107 +334,27 @@ export class DataGatheringService {
}
public async gatherSymbols(aSymbolsWithStartDate: IDataGatheringItem[]) {
let hasError = false;
let symbolCounter = 0;
for (const { dataSource, date, symbol } of aSymbolsWithStartDate) {
if (dataSource === 'MANUAL') {
continue;
}
this.dataGatheringProgress = symbolCounter / aSymbolsWithStartDate.length;
try {
const historicalData = await this.dataProviderService.getHistoricalRaw(
[{ dataSource, symbol }],
await this.dataGatheringQueue.add(
GATHER_HISTORICAL_MARKET_DATA_PROCESS,
{
dataSource,
date,
new Date()
);
let currentDate = date;
let lastMarketPrice: number;
while (
isBefore(
currentDate,
new Date(
Date.UTC(
getYear(new Date()),
getMonth(new Date()),
getDate(new Date()),
0
)
)
)
) {
if (
historicalData[symbol]?.[format(currentDate, DATE_FORMAT)]
?.marketPrice
) {
lastMarketPrice =
historicalData[symbol]?.[format(currentDate, DATE_FORMAT)]
?.marketPrice;
}
if (lastMarketPrice) {
try {
await this.prismaService.marketData.create({
data: {
dataSource,
symbol,
date: new Date(
Date.UTC(
getYear(currentDate),
getMonth(currentDate),
getDate(currentDate),
0
)
),
marketPrice: lastMarketPrice
}
});
} catch {}
} else {
Logger.warn(
`Failed to gather data for symbol ${symbol} from ${dataSource} at ${format(
currentDate,
DATE_FORMAT
)}.`,
'DataGatheringService'
);
}
// Count month one up for iteration
currentDate = new Date(
Date.UTC(
getYear(currentDate),
getMonth(currentDate),
getDate(currentDate) + 1,
0
)
);
symbol
},
{
attempts: 20,
backoff: {
delay: ms('1 minute'),
type: 'exponential'
},
priority: DATA_GATHERING_QUEUE_PRIORITY_LOW
}
} catch (error) {
hasError = true;
Logger.error(error, 'DataGatheringService');
}
if (symbolCounter > 0 && symbolCounter % 100 === 0) {
Logger.log(
`Data gathering progress: ${(
this.dataGatheringProgress * 100
).toFixed(2)}%`,
'DataGatheringService'
);
}
symbolCounter += 1;
}
await this.exchangeRateDataService.initialize();
if (hasError) {
throw '';
);
}
}

5
apps/api/src/services/interfaces/interfaces.ts

@ -1,3 +1,4 @@
import { UniqueAsset } from '@ghostfolio/common/interfaces';
import { MarketState } from '@ghostfolio/common/types';
import {
Account,
@ -32,8 +33,6 @@ export interface IDataProviderResponse {
marketState: MarketState;
}
export interface IDataGatheringItem {
dataSource: DataSource;
export interface IDataGatheringItem extends UniqueAsset {
date?: Date;
symbol: string;
}

9
apps/client/src/app/components/admin-jobs/admin-jobs.component.ts

@ -53,6 +53,15 @@ export class AdminJobsComponent implements OnDestroy, OnInit {
this.fetchJobs();
}
public onDeleteJob(aId: string) {
this.adminService
.deleteJob(aId)
.pipe(takeUntil(this.unsubscribeSubject))
.subscribe(() => {
this.fetchJobs();
});
}
public onViewStacktrace(aStacktrace: AdminJobs['jobs'][0]['stacktrace']) {
alert(JSON.stringify(aStacktrace, null, ' '));
}

50
apps/client/src/app/components/admin-jobs/admin-jobs.html

@ -4,10 +4,11 @@
<table class="gf-table w-100">
<thead>
<tr class="mat-header-row">
<th class="mat-header-cell px-1 py-2" i18n>#</th>
<th class="mat-header-cell px-1 py-2 text-right" i18n>#</th>
<th class="mat-header-cell px-1 py-2" i18n>Type</th>
<th class="mat-header-cell px-1 py-2" i18n>Data Source</th>
<th class="mat-header-cell px-1 py-2" i18n>Symbol</th>
<th class="mat-header-cell px-1 py-2 text-right" i18n>Attempts</th>
<th class="mat-header-cell px-1 py-2" i18n>Created</th>
<th class="mat-header-cell px-1 py-2" i18n>Finished</th>
<th class="mat-header-cell px-1 py-2" i18n>Status</th>
@ -17,10 +18,28 @@
<tbody>
<ng-container *ngFor="let job of jobs">
<tr class="mat-row">
<td class="mat-cell px-1 py-2">{{ job.id }}</td>
<td class="mat-cell px-1 py-2">{{ job.name }}</td>
<td class="mat-cell px-1 py-2 text-right">{{ job.id }}</td>
<td class="mat-cell px-1 py-2">
<span class="align-items-center d-flex">
<ion-icon
class="mr-1"
name="arrow-down-circle-outline"
></ion-icon>
<ng-container *ngIf="job.name === 'GATHER_ASSET_PROFILE'">
<span i18n>Asset Profile</span>
</ng-container>
<ng-container
*ngIf="job.name === 'GATHER_HISTORICAL_MARKET_DATA'"
>
<span i18n>Historical Market Data</span>
</ng-container>
</span>
</td>
<td class="mat-cell px-1 py-2">{{ job.data?.dataSource }}</td>
<td class="mat-cell px-1 py-2">{{ job.data?.symbol }}</td>
<td class="mat-cell px-1 py-2 text-right">
{{ job.attemptsMade }}
</td>
<td class="mat-cell px-1 py-2">
{{ job.timestamp | date: defaultDateTimeFormat }}
</td>
@ -29,21 +48,19 @@
</td>
<td class="mat-cell px-1 py-2">
<ion-icon
*ngIf="job.finishedOn"
*ngIf="job.state === 'completed'"
class="text-success"
name="checkmark-circle-outline"
></ion-icon>
<ng-container *ngIf="!job.finishedOn">
<ion-icon
*ngIf="job.stacktrace?.length >= 1"
class="text-danger"
name="alert-circle-outline"
></ion-icon>
<ion-icon
*ngIf="job.stacktrace?.length < 1"
name="time-outline"
></ion-icon>
</ng-container>
<ion-icon
*ngIf="job.state === 'delayed'"
name="time-outline"
></ion-icon>
<ion-icon
*ngIf="job.state === 'failed'"
class="text-danger"
name="alert-circle-outline"
></ion-icon>
</td>
<td class="mat-cell px-1 py-2">
<button
@ -63,6 +80,9 @@
>
View Stacktrace
</button>
<button i18n mat-menu-item (click)="onDeleteJob(job.id)">
Delete Job
</button>
</mat-menu>
</td>
</tr>

6
apps/client/src/app/services/admin.service.ts

@ -18,6 +18,10 @@ import { Observable, map } from 'rxjs';
export class AdminService {
public constructor(private http: HttpClient) {}
public deleteJob(aId: string) {
return this.http.delete<void>(`/api/v1/admin/queue/job/${aId}`);
}
public deleteProfileData({ dataSource, symbol }: UniqueAsset) {
return this.http.delete<void>(
`/api/v1/admin/profile-data/${dataSource}/${symbol}`
@ -44,7 +48,7 @@ export class AdminService {
}
public fetchJobs() {
return this.http.get<AdminJobs>(`/api/v1/admin/queue/jobs`);
return this.http.get<AdminJobs>(`/api/v1/admin/queue/job`);
}
public gatherMax() {

4
libs/common/src/lib/config.ts

@ -43,10 +43,14 @@ export const warnColorRgb = {
export const ASSET_SUB_CLASS_EMERGENCY_FUND = 'EMERGENCY_FUND';
export const DATA_GATHERING_QUEUE = 'DATA_GATHERING_QUEUE';
export const DATA_GATHERING_QUEUE_PRIORITY_LOW = Number.MAX_SAFE_INTEGER;
export const DATA_GATHERING_QUEUE_PRIORITY_HIGH = 1;
export const DEFAULT_DATE_FORMAT_MONTH_YEAR = 'MMM yyyy';
export const GATHER_ASSET_PROFILE_PROCESS = 'GATHER_ASSET_PROFILE';
export const GATHER_HISTORICAL_MARKET_DATA_PROCESS =
'GATHER_HISTORICAL_MARKET_DATA';
export const PROPERTY_BENCHMARKS = 'BENCHMARKS';
export const PROPERTY_COUPONS = 'COUPONS';

15
libs/common/src/lib/interfaces/admin-jobs.interface.ts

@ -1,5 +1,16 @@
import { Job } from 'bull';
import { Job, JobStatus } from 'bull';
export interface AdminJobs {
jobs: Job<any>[];
jobs: (Pick<
Job<any>,
| 'attemptsMade'
| 'data'
| 'finishedOn'
| 'id'
| 'name'
| 'stacktrace'
| 'timestamp'
> & {
state: JobStatus | 'stuck';
})[];
}

Loading…
Cancel
Save