|
@ -1,6 +1,5 @@ |
|
|
import { DataProviderService } from '@ghostfolio/api/services/data-provider/data-provider.service'; |
|
|
import { DataProviderService } from '@ghostfolio/api/services/data-provider/data-provider.service'; |
|
|
import { IDataGatheringItem } from '@ghostfolio/api/services/interfaces/interfaces'; |
|
|
import { IDataGatheringItem } from '@ghostfolio/api/services/interfaces/interfaces'; |
|
|
import { PrismaService } from '@ghostfolio/api/services/prisma/prisma.service'; |
|
|
|
|
|
import { |
|
|
import { |
|
|
DATA_GATHERING_QUEUE, |
|
|
DATA_GATHERING_QUEUE, |
|
|
GATHER_ASSET_PROFILE_PROCESS, |
|
|
GATHER_ASSET_PROFILE_PROCESS, |
|
@ -11,6 +10,7 @@ import { UniqueAsset } from '@ghostfolio/common/interfaces'; |
|
|
import { MarketDataService } from '@ghostfolio/api/services/market-data/market-data.service'; |
|
|
import { MarketDataService } from '@ghostfolio/api/services/market-data/market-data.service'; |
|
|
import { Process, Processor } from '@nestjs/bull'; |
|
|
import { Process, Processor } from '@nestjs/bull'; |
|
|
import { Injectable, Logger } from '@nestjs/common'; |
|
|
import { Injectable, Logger } from '@nestjs/common'; |
|
|
|
|
|
import { Prisma } from '@prisma/client'; |
|
|
import { Job } from 'bull'; |
|
|
import { Job } from 'bull'; |
|
|
import { |
|
|
import { |
|
|
format, |
|
|
format, |
|
@ -32,7 +32,7 @@ export class DataGatheringProcessor { |
|
|
private readonly marketDataService: MarketDataService |
|
|
private readonly marketDataService: MarketDataService |
|
|
) {} |
|
|
) {} |
|
|
|
|
|
|
|
|
@Process(GATHER_ASSET_PROFILE_PROCESS) |
|
|
@Process({ concurrency: 1, name: GATHER_ASSET_PROFILE_PROCESS }) |
|
|
public async gatherAssetProfile(job: Job<UniqueAsset>) { |
|
|
public async gatherAssetProfile(job: Job<UniqueAsset>) { |
|
|
try { |
|
|
try { |
|
|
await this.dataGatheringService.gatherAssetProfiles([job.data]); |
|
|
await this.dataGatheringService.gatherAssetProfiles([job.data]); |
|
@ -46,18 +46,27 @@ export class DataGatheringProcessor { |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@Process(GATHER_HISTORICAL_MARKET_DATA_PROCESS) |
|
|
@Process({ concurrency: 1, name: GATHER_HISTORICAL_MARKET_DATA_PROCESS }) |
|
|
public async gatherHistoricalMarketData(job: Job<IDataGatheringItem>) { |
|
|
public async gatherHistoricalMarketData(job: Job<IDataGatheringItem>) { |
|
|
try { |
|
|
try { |
|
|
const { dataSource, date, symbol } = job.data; |
|
|
const { dataSource, date, symbol } = job.data; |
|
|
|
|
|
let currentDate = parseISO(<string>(<unknown>date)); |
|
|
|
|
|
|
|
|
|
|
|
Logger.log( |
|
|
|
|
|
`Historical market data gathering has been started for ${symbol} (${dataSource}) at ${format( |
|
|
|
|
|
currentDate, |
|
|
|
|
|
DATE_FORMAT |
|
|
|
|
|
)}`,
|
|
|
|
|
|
`DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS})` |
|
|
|
|
|
); |
|
|
|
|
|
|
|
|
const historicalData = await this.dataProviderService.getHistoricalRaw( |
|
|
const historicalData = await this.dataProviderService.getHistoricalRaw( |
|
|
[{ dataSource, symbol }], |
|
|
[{ dataSource, symbol }], |
|
|
parseISO(<string>(<unknown>date)), |
|
|
currentDate, |
|
|
new Date() |
|
|
new Date() |
|
|
); |
|
|
); |
|
|
|
|
|
|
|
|
let currentDate = parseISO(<string>(<unknown>date)); |
|
|
const data: Prisma.MarketDataUpdateInput[] = []; |
|
|
let lastMarketPrice: number; |
|
|
let lastMarketPrice: number; |
|
|
|
|
|
|
|
|
while ( |
|
|
while ( |
|
@ -83,21 +92,13 @@ export class DataGatheringProcessor { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if (lastMarketPrice) { |
|
|
if (lastMarketPrice) { |
|
|
try { |
|
|
data.push({ |
|
|
await this.marketDataService.updateMarketData({ |
|
|
|
|
|
data: { |
|
|
|
|
|
marketPrice: lastMarketPrice, |
|
|
|
|
|
state: 'CLOSE' |
|
|
|
|
|
}, |
|
|
|
|
|
where: { |
|
|
|
|
|
dataSource_date_symbol: { |
|
|
|
|
|
dataSource, |
|
|
dataSource, |
|
|
symbol, |
|
|
symbol, |
|
|
date: getStartOfUtcDate(currentDate) |
|
|
date: getStartOfUtcDate(currentDate), |
|
|
} |
|
|
marketPrice: lastMarketPrice, |
|
|
} |
|
|
state: 'CLOSE' |
|
|
}); |
|
|
}); |
|
|
} catch {} |
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Count month one up for iteration
|
|
|
// Count month one up for iteration
|
|
@ -111,8 +112,13 @@ export class DataGatheringProcessor { |
|
|
); |
|
|
); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
await this.marketDataService.updateMany({ data }); |
|
|
|
|
|
|
|
|
Logger.log( |
|
|
Logger.log( |
|
|
`Historical market data gathering has been completed for ${symbol} (${dataSource}).`, |
|
|
`Historical market data gathering has been completed for ${symbol} (${dataSource}) at ${format( |
|
|
|
|
|
currentDate, |
|
|
|
|
|
DATE_FORMAT |
|
|
|
|
|
)}`,
|
|
|
`DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS})` |
|
|
`DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS})` |
|
|
); |
|
|
); |
|
|
} catch (error) { |
|
|
} catch (error) { |
|
|