|
|
@ -1,11 +1,13 @@ |
|
|
|
import { DataProviderService } from '@ghostfolio/api/services/data-provider/data-provider.service'; |
|
|
|
import { AssetProfileDelistedError } from '@ghostfolio/api/services/data-provider/errors/asset-profile-delisted.error'; |
|
|
|
import { IDataGatheringItem } from '@ghostfolio/api/services/interfaces/interfaces'; |
|
|
|
import { MarketDataService } from '@ghostfolio/api/services/market-data/market-data.service'; |
|
|
|
import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile/symbol-profile.service'; |
|
|
|
import { |
|
|
|
DATA_GATHERING_QUEUE, |
|
|
|
DEFAULT_PROCESSOR_GATHER_ASSET_PROFILE_CONCURRENCY, |
|
|
|
DEFAULT_PROCESSOR_GATHER_HISTORICAL_MARKET_DATA_CONCURRENCY, |
|
|
|
GATHER_ASSET_PROFILE_PROCESS, |
|
|
|
GATHER_ASSET_PROFILE_PROCESS_JOB_NAME, |
|
|
|
GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME |
|
|
|
} from '@ghostfolio/common/config'; |
|
|
|
import { DATE_FORMAT, getStartOfUtcDate } from '@ghostfolio/common/helper'; |
|
|
@ -33,7 +35,8 @@ export class DataGatheringProcessor { |
|
|
|
public constructor( |
|
|
|
private readonly dataGatheringService: DataGatheringService, |
|
|
|
private readonly dataProviderService: DataProviderService, |
|
|
|
private readonly marketDataService: MarketDataService |
|
|
|
private readonly marketDataService: MarketDataService, |
|
|
|
private readonly symbolProfileService: SymbolProfileService |
|
|
|
) {} |
|
|
|
|
|
|
|
@Process({ |
|
|
@ -42,28 +45,49 @@ export class DataGatheringProcessor { |
|
|
|
DEFAULT_PROCESSOR_GATHER_ASSET_PROFILE_CONCURRENCY.toString(), |
|
|
|
10 |
|
|
|
), |
|
|
|
name: GATHER_ASSET_PROFILE_PROCESS |
|
|
|
name: GATHER_ASSET_PROFILE_PROCESS_JOB_NAME |
|
|
|
}) |
|
|
|
public async gatherAssetProfile(job: Job<AssetProfileIdentifier>) { |
|
|
|
const { dataSource, symbol } = job.data; |
|
|
|
|
|
|
|
try { |
|
|
|
Logger.log( |
|
|
|
`Asset profile data gathering has been started for ${job.data.symbol} (${job.data.dataSource})`, |
|
|
|
`DataGatheringProcessor (${GATHER_ASSET_PROFILE_PROCESS})` |
|
|
|
`Asset profile data gathering has been started for ${symbol} (${dataSource})`, |
|
|
|
`DataGatheringProcessor (${GATHER_ASSET_PROFILE_PROCESS_JOB_NAME})` |
|
|
|
); |
|
|
|
|
|
|
|
await this.dataGatheringService.gatherAssetProfiles([job.data]); |
|
|
|
|
|
|
|
Logger.log( |
|
|
|
`Asset profile data gathering has been completed for ${job.data.symbol} (${job.data.dataSource})`, |
|
|
|
`DataGatheringProcessor (${GATHER_ASSET_PROFILE_PROCESS})` |
|
|
|
`Asset profile data gathering has been completed for ${symbol} (${dataSource})`, |
|
|
|
`DataGatheringProcessor (${GATHER_ASSET_PROFILE_PROCESS_JOB_NAME})` |
|
|
|
); |
|
|
|
} catch (error) { |
|
|
|
if (error instanceof AssetProfileDelistedError) { |
|
|
|
await this.symbolProfileService.updateSymbolProfile( |
|
|
|
{ |
|
|
|
dataSource, |
|
|
|
symbol |
|
|
|
}, |
|
|
|
{ |
|
|
|
isActive: false |
|
|
|
} |
|
|
|
); |
|
|
|
|
|
|
|
Logger.log( |
|
|
|
`Asset profile data gathering has been discarded for ${symbol} (${dataSource})`, |
|
|
|
`DataGatheringProcessor (${GATHER_ASSET_PROFILE_PROCESS_JOB_NAME})` |
|
|
|
); |
|
|
|
|
|
|
|
return job.discard(); |
|
|
|
} |
|
|
|
|
|
|
|
Logger.error( |
|
|
|
error, |
|
|
|
`DataGatheringProcessor (${GATHER_ASSET_PROFILE_PROCESS})` |
|
|
|
`DataGatheringProcessor (${GATHER_ASSET_PROFILE_PROCESS_JOB_NAME})` |
|
|
|
); |
|
|
|
|
|
|
|
throw new Error(error); |
|
|
|
throw error; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@ -76,8 +100,9 @@ export class DataGatheringProcessor { |
|
|
|
name: GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME |
|
|
|
}) |
|
|
|
public async gatherHistoricalMarketData(job: Job<IDataGatheringItem>) { |
|
|
|
try { |
|
|
|
const { dataSource, date, symbol } = job.data; |
|
|
|
|
|
|
|
try { |
|
|
|
let currentDate = parseISO(date as unknown as string); |
|
|
|
|
|
|
|
Logger.log( |
|
|
@ -142,12 +167,31 @@ export class DataGatheringProcessor { |
|
|
|
`DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME})` |
|
|
|
); |
|
|
|
} catch (error) { |
|
|
|
if (error instanceof AssetProfileDelistedError) { |
|
|
|
await this.symbolProfileService.updateSymbolProfile( |
|
|
|
{ |
|
|
|
dataSource, |
|
|
|
symbol |
|
|
|
}, |
|
|
|
{ |
|
|
|
isActive: false |
|
|
|
} |
|
|
|
); |
|
|
|
|
|
|
|
Logger.log( |
|
|
|
`Historical market data gathering has been discarded for ${symbol} (${dataSource})`, |
|
|
|
`DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME})` |
|
|
|
); |
|
|
|
|
|
|
|
return job.discard(); |
|
|
|
} |
|
|
|
|
|
|
|
Logger.error( |
|
|
|
error, |
|
|
|
`DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS_JOB_NAME})` |
|
|
|
); |
|
|
|
|
|
|
|
throw new Error(error); |
|
|
|
throw error; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|