Browse Source

Add Datagathering process for missing values only

pull/5027/head
Dan 9 months ago
parent
commit
b0cfb2b6fd
  1. 17
      apps/api/src/app/admin/admin.controller.ts
  2. 159
      apps/api/src/services/queues/data-gathering/data-gathering.processor.ts
  3. 52
      apps/api/src/services/queues/data-gathering/data-gathering.service.ts
  4. 10
      apps/client/src/app/components/admin-market-data/asset-profile-dialog/asset-profile-dialog.component.ts
  5. 13
      apps/client/src/app/components/admin-market-data/asset-profile-dialog/asset-profile-dialog.html
  6. 16
      apps/client/src/app/services/admin.service.ts
  7. 12
      libs/common/src/lib/config.ts

17
apps/api/src/app/admin/admin.controller.ts

@ -158,7 +158,22 @@ export class AdminController {
@Param('dataSource') dataSource: DataSource, @Param('dataSource') dataSource: DataSource,
@Param('symbol') symbol: string @Param('symbol') symbol: string
): Promise<void> { ): Promise<void> {
this.dataGatheringService.gatherSymbol({ dataSource, symbol }); await this.dataGatheringService.gatherSymbol({ dataSource, symbol });
return;
}
@Post('gatherMissing/:dataSource/:symbol')
@UseGuards(AuthGuard('jwt'), HasPermissionGuard)
@HasPermission(permissions.accessAdminControl)
public async gatherSymbolMissingOnly(
@Param('dataSource') dataSource: DataSource,
@Param('symbol') symbol: string
): Promise<void> {
await this.dataGatheringService.gatherSymbolMissingOnly({
dataSource,
symbol
});
return; return;
} }

159
apps/api/src/services/queues/data-gathering/data-gathering.processor.ts

@ -1,20 +1,25 @@
import { DataProviderService } from '@ghostfolio/api/services/data-provider/data-provider.service'; import { DataProviderService } from '@ghostfolio/api/services/data-provider/data-provider.service';
import { IDataGatheringItem } from '@ghostfolio/api/services/interfaces/interfaces'; import {
IDataGatheringItem,
IDataProviderHistoricalResponse
} from '@ghostfolio/api/services/interfaces/interfaces';
import { MarketDataService } from '@ghostfolio/api/services/market-data/market-data.service'; import { MarketDataService } from '@ghostfolio/api/services/market-data/market-data.service';
import { import {
DATA_GATHERING_QUEUE, DATA_GATHERING_QUEUE,
DEFAULT_PROCESSOR_CONCURRENCY_GATHER_ASSET_PROFILE, DEFAULT_PROCESSOR_CONCURRENCY_GATHER_ASSET_PROFILE,
DEFAULT_PROCESSOR_CONCURRENCY_GATHER_HISTORICAL_MARKET_DATA, DEFAULT_PROCESSOR_CONCURRENCY_GATHER_HISTORICAL_MARKET_DATA,
GATHER_ASSET_PROFILE_PROCESS, GATHER_ASSET_PROFILE_PROCESS,
GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME,
GATHER_MISSING_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME
} from '@ghostfolio/common/config'; } from '@ghostfolio/common/config';
import { DATE_FORMAT, getStartOfUtcDate } from '@ghostfolio/common/helper'; import { DATE_FORMAT, getStartOfUtcDate } from '@ghostfolio/common/helper';
import { AssetProfileIdentifier } from '@ghostfolio/common/interfaces'; import { AssetProfileIdentifier } from '@ghostfolio/common/interfaces';
import { Process, Processor } from '@nestjs/bull'; import { Process, Processor } from '@nestjs/bull';
import { Injectable, Logger } from '@nestjs/common'; import { Injectable, Logger } from '@nestjs/common';
import { Prisma } from '@prisma/client'; import { DataSource, Prisma } from '@prisma/client';
import { Job } from 'bull'; import { Job } from 'bull';
import { isNumber } from 'class-validator';
import { import {
addDays, addDays,
format, format,
@ -22,7 +27,9 @@ import {
getMonth, getMonth,
getYear, getYear,
isBefore, isBefore,
parseISO parseISO,
eachDayOfInterval,
isEqual
} from 'date-fns'; } from 'date-fns';
import { DataGatheringService } from './data-gathering.service'; import { DataGatheringService } from './data-gathering.service';
@ -150,4 +157,148 @@ export class DataGatheringProcessor {
throw new Error(error); throw new Error(error);
} }
} }
@Process({
concurrency: parseInt(
process.env.PROCESSOR_CONCURRENCY_GATHER_HISTORICAL_MARKET_DATA ??
DEFAULT_PROCESSOR_CONCURRENCY_GATHER_HISTORICAL_MARKET_DATA.toString(),
10
),
name: GATHER_MISSING_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME
})
public async gatherMissingHistoricalMarketData(job: Job<IDataGatheringItem>) {
try {
const { dataSource, date, symbol } = job.data;
Logger.log(
`Historical market data gathering for missing values has been started for ${symbol} (${dataSource}) at ${format(
date,
DATE_FORMAT
)}`,
`DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME})`
);
const entries = await this.marketDataService.marketDataItems({
where: {
AND: {
symbol: {
equals: symbol
},
dataSource: {
equals: dataSource
}
}
},
orderBy: {
date: 'asc'
},
take: 1
});
const firstEntry = entries[0];
const marketData = await this.marketDataService
.getRange({
assetProfileIdentifiers: [{ dataSource, symbol }],
dateQuery: {
gte: addDays(firstEntry.date, -10)
}
})
.then((md) => md.map((m) => m.date));
let dates = eachDayOfInterval(
{
start: firstEntry.date,
end: new Date()
},
{
step: 1
}
);
dates = dates.filter((d) => !marketData.some((md) => isEqual(md,d)));
const historicalData = await this.dataProviderService.getHistoricalRaw({
dataGatheringItems: [{ dataSource, symbol }],
from: firstEntry.date,
to: new Date()
});
const data: Prisma.MarketDataUpdateInput[] =
this.mapToMarketUpsertDataInputs(
dates,
historicalData,
symbol,
dataSource
);
await this.marketDataService.updateMany({ data });
Logger.log(
`Historical market data gathering for missing values has been completed for ${symbol} (${dataSource}) at ${format(
date,
DATE_FORMAT
)}`,
`DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME})`
);
} catch (error) {
Logger.error(
error,
`DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME})`
);
throw new Error(error);
}
}
private mapToMarketUpsertDataInputs(
missingMarketData: Date[],
historicalData: Record<
string,
Record<string, IDataProviderHistoricalResponse>
>,
symbol: string,
dataSource: DataSource
): Prisma.MarketDataUpdateInput[] {
return missingMarketData.map((date) => {
if (
isNumber(
historicalData[symbol]?.[format(date, DATE_FORMAT)]?.marketPrice
)
) {
return {
date,
symbol,
dataSource,
marketPrice:
historicalData[symbol]?.[format(date, DATE_FORMAT)]?.marketPrice
};
} else {
let earlierDate = date;
let index = 0;
while (
!isNumber(
historicalData[symbol]?.[format(earlierDate, DATE_FORMAT)]
?.marketPrice
)
) {
earlierDate = addDays(earlierDate, -1);
index++;
if (index > 10) {
break;
}
}
if (
isNumber(
historicalData[symbol]?.[format(earlierDate, DATE_FORMAT)]
?.marketPrice
)
) {
return {
date,
symbol,
dataSource,
marketPrice:
historicalData[symbol]?.[format(earlierDate, DATE_FORMAT)]
?.marketPrice
};
}
}
});
}
} }

52
apps/api/src/services/queues/data-gathering/data-gathering.service.ts

@ -13,6 +13,8 @@ import {
DATA_GATHERING_QUEUE_PRIORITY_MEDIUM, DATA_GATHERING_QUEUE_PRIORITY_MEDIUM,
GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME, GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME,
GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_OPTIONS, GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_OPTIONS,
GATHER_MISSING_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME,
GATHER_MISSING_HISTORICAL_MARKET_DATA_PROCESS_JOB_OPTIONS,
PROPERTY_BENCHMARKS PROPERTY_BENCHMARKS
} from '@ghostfolio/common/config'; } from '@ghostfolio/common/config';
import { import {
@ -28,7 +30,6 @@ import {
import { InjectQueue } from '@nestjs/bull'; import { InjectQueue } from '@nestjs/bull';
import { Inject, Injectable, Logger } from '@nestjs/common'; import { Inject, Injectable, Logger } from '@nestjs/common';
import { DataSource } from '@prisma/client'; import { DataSource } from '@prisma/client';
import AwaitLock from 'await-lock';
import { JobOptions, Queue } from 'bull'; import { JobOptions, Queue } from 'bull';
import { format, min, subDays, subYears } from 'date-fns'; import { format, min, subDays, subYears } from 'date-fns';
import { isEmpty } from 'lodash'; import { isEmpty } from 'lodash';
@ -48,8 +49,6 @@ export class DataGatheringService {
private readonly symbolProfileService: SymbolProfileService private readonly symbolProfileService: SymbolProfileService
) {} ) {}
lock = new AwaitLock();
public async addJobToQueue({ public async addJobToQueue({
data, data,
name, name,
@ -114,6 +113,24 @@ export class DataGatheringService {
}); });
} }
public async gatherSymbolMissingOnly({
dataSource,
symbol
}: AssetProfileIdentifier) {
const dataGatheringItems = (await this.getSymbolsMax()).filter(
(dataGatheringItem) => {
return (
dataGatheringItem.dataSource === dataSource &&
dataGatheringItem.symbol === symbol
);
}
);
await this.gatherMissingDataSymbols({
dataGatheringItems,
priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH
});
}
public async gatherSymbolForDate({ public async gatherSymbolForDate({
dataSource, dataSource,
date, date,
@ -296,6 +313,35 @@ export class DataGatheringService {
); );
} }
public async gatherMissingDataSymbols({
dataGatheringItems,
priority
}: {
dataGatheringItems: IDataGatheringItem[];
priority: number;
}) {
await this.addJobsToQueue(
dataGatheringItems.map(({ dataSource, date, symbol }) => {
return {
data: {
dataSource,
date,
symbol
},
name: GATHER_MISSING_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME,
opts: {
...GATHER_MISSING_HISTORICAL_MARKET_DATA_PROCESS_JOB_OPTIONS,
priority,
jobId: `${getAssetProfileIdentifier({
dataSource,
symbol
})}-missing-${format(date, DATE_FORMAT)}`
}
};
})
);
}
public async getAllAssetProfileIdentifiers(): Promise< public async getAllAssetProfileIdentifiers(): Promise<
AssetProfileIdentifier[] AssetProfileIdentifier[]
> { > {

10
apps/client/src/app/components/admin-market-data/asset-profile-dialog/asset-profile-dialog.component.ts

@ -223,6 +223,16 @@ export class AssetProfileDialog implements OnDestroy, OnInit {
.subscribe(() => {}); .subscribe(() => {});
} }
public onGatherSymbolMissingOnly({
dataSource,
symbol
}: AssetProfileIdentifier) {
this.adminService
.gatherSymbolMissingOnly({ dataSource, symbol })
.pipe(takeUntil(this.unsubscribeSubject))
.subscribe(() => {});
}
public onImportHistoricalData() { public onImportHistoricalData() {
try { try {
const marketData = csvToJson( const marketData = csvToJson(

13
apps/client/src/app/components/admin-market-data/asset-profile-dialog/asset-profile-dialog.html

@ -31,6 +31,19 @@
> >
<ng-container i18n>Gather Historical Data</ng-container> <ng-container i18n>Gather Historical Data</ng-container>
</button> </button>
<button
mat-menu-item
type="button"
[disabled]="assetProfileForm.dirty"
(click)="
onGatherSymbolMissingOnly({
dataSource: data.dataSource,
symbol: data.symbol
})
"
>
<ng-container i18n>Gather Missing Historical Data</ng-container>
</button>
<button <button
mat-menu-item mat-menu-item
type="button" type="button"

16
apps/client/src/app/services/admin.service.ts

@ -198,6 +198,22 @@ export class AdminService {
return this.http.post<MarketData | void>(url, {}); return this.http.post<MarketData | void>(url, {});
} }
public gatherSymbolMissingOnly({
dataSource,
date,
symbol
}: AssetProfileIdentifier & {
date?: Date;
}) {
let url = `/api/v1/admin/gatherMissing/${dataSource}/${symbol}`;
if (date) {
url = `${url}/${format(date, DATE_FORMAT)}`;
}
return this.http.post<MarketData | void>(url, {});
}
public fetchSymbolForDate({ public fetchSymbolForDate({
dataSource, dataSource,
dateString, dateString,

12
libs/common/src/lib/config.ts

@ -95,6 +95,18 @@ export const GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_OPTIONS: JobOptions = {
removeOnComplete: true removeOnComplete: true
}; };
export const GATHER_MISSING_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME =
'GATHER_MISSING_HISTORICAL_MARKET_DATA';
export const GATHER_MISSING_HISTORICAL_MARKET_DATA_PROCESS_JOB_OPTIONS: JobOptions =
{
attempts: 12,
backoff: {
delay: ms('1 minute'),
type: 'exponential'
},
removeOnComplete: true
};
export const PORTFOLIO_SNAPSHOT_PROCESS_JOB_NAME = 'PORTFOLIO'; export const PORTFOLIO_SNAPSHOT_PROCESS_JOB_NAME = 'PORTFOLIO';
export const PORTFOLIO_SNAPSHOT_PROCESS_JOB_OPTIONS: JobOptions = { export const PORTFOLIO_SNAPSHOT_PROCESS_JOB_OPTIONS: JobOptions = {
removeOnComplete: true removeOnComplete: true

Loading…
Cancel
Save