|
|
@ -2,9 +2,7 @@ import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile.se |
|
|
|
import { |
|
|
|
DATA_GATHERING_QUEUE, |
|
|
|
DATA_GATHERING_QUEUE_PRIORITY_LOW, |
|
|
|
GATHER_HISTORICAL_MARKET_DATA_PROCESS, |
|
|
|
PROPERTY_LAST_DATA_GATHERING, |
|
|
|
PROPERTY_LOCKED_DATA_GATHERING |
|
|
|
GATHER_HISTORICAL_MARKET_DATA_PROCESS |
|
|
|
} from '@ghostfolio/common/config'; |
|
|
|
import { DATE_FORMAT, resetHours } from '@ghostfolio/common/helper'; |
|
|
|
import { UniqueAsset } from '@ghostfolio/common/interfaces'; |
|
|
@ -12,7 +10,7 @@ import { InjectQueue } from '@nestjs/bull'; |
|
|
|
import { Inject, Injectable, Logger } from '@nestjs/common'; |
|
|
|
import { DataSource } from '@prisma/client'; |
|
|
|
import { Queue } from 'bull'; |
|
|
|
import { differenceInHours, format, subDays } from 'date-fns'; |
|
|
|
import { format, subDays } from 'date-fns'; |
|
|
|
import ms from 'ms'; |
|
|
|
|
|
|
|
import { DataProviderService } from './data-provider/data-provider.service'; |
|
|
@ -37,155 +35,23 @@ export class DataGatheringService { |
|
|
|
) {} |
|
|
|
|
|
|
|
public async gather7Days() { |
|
|
|
const isDataGatheringNeeded = await this.isDataGatheringNeeded(); |
|
|
|
|
|
|
|
if (isDataGatheringNeeded) { |
|
|
|
Logger.log('7d data gathering has been started.', 'DataGatheringService'); |
|
|
|
console.time('data-gathering-7d'); |
|
|
|
|
|
|
|
await this.prismaService.property.create({ |
|
|
|
data: { |
|
|
|
key: PROPERTY_LOCKED_DATA_GATHERING, |
|
|
|
value: new Date().toISOString() |
|
|
|
} |
|
|
|
}); |
|
|
|
|
|
|
|
const dataGatheringItems = await this.getSymbols7D(); |
|
|
|
|
|
|
|
try { |
|
|
|
await this.gatherSymbols(dataGatheringItems); |
|
|
|
|
|
|
|
await this.prismaService.property.upsert({ |
|
|
|
create: { |
|
|
|
key: PROPERTY_LAST_DATA_GATHERING, |
|
|
|
value: new Date().toISOString() |
|
|
|
}, |
|
|
|
update: { value: new Date().toISOString() }, |
|
|
|
where: { key: PROPERTY_LAST_DATA_GATHERING } |
|
|
|
}); |
|
|
|
} catch (error) { |
|
|
|
Logger.error(error, 'DataGatheringService'); |
|
|
|
} |
|
|
|
|
|
|
|
await this.prismaService.property.delete({ |
|
|
|
where: { |
|
|
|
key: PROPERTY_LOCKED_DATA_GATHERING |
|
|
|
} |
|
|
|
}); |
|
|
|
|
|
|
|
Logger.log( |
|
|
|
'7d data gathering has been completed.', |
|
|
|
'DataGatheringService' |
|
|
|
); |
|
|
|
console.timeEnd('data-gathering-7d'); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
public async gatherMax() { |
|
|
|
const isDataGatheringLocked = await this.prismaService.property.findUnique({ |
|
|
|
where: { key: PROPERTY_LOCKED_DATA_GATHERING } |
|
|
|
}); |
|
|
|
|
|
|
|
if (!isDataGatheringLocked) { |
|
|
|
Logger.log( |
|
|
|
'Max data gathering has been started.', |
|
|
|
'DataGatheringService' |
|
|
|
); |
|
|
|
console.time('data-gathering-max'); |
|
|
|
|
|
|
|
await this.prismaService.property.create({ |
|
|
|
data: { |
|
|
|
key: PROPERTY_LOCKED_DATA_GATHERING, |
|
|
|
value: new Date().toISOString() |
|
|
|
} |
|
|
|
}); |
|
|
|
|
|
|
|
const symbols = await this.getSymbolsMax(); |
|
|
|
|
|
|
|
try { |
|
|
|
await this.gatherSymbols(symbols); |
|
|
|
|
|
|
|
await this.prismaService.property.upsert({ |
|
|
|
create: { |
|
|
|
key: PROPERTY_LAST_DATA_GATHERING, |
|
|
|
value: new Date().toISOString() |
|
|
|
}, |
|
|
|
update: { value: new Date().toISOString() }, |
|
|
|
where: { key: PROPERTY_LAST_DATA_GATHERING } |
|
|
|
}); |
|
|
|
} catch (error) { |
|
|
|
Logger.error(error, 'DataGatheringService'); |
|
|
|
} |
|
|
|
|
|
|
|
await this.prismaService.property.delete({ |
|
|
|
where: { |
|
|
|
key: PROPERTY_LOCKED_DATA_GATHERING |
|
|
|
} |
|
|
|
}); |
|
|
|
|
|
|
|
Logger.log( |
|
|
|
'Max data gathering has been completed.', |
|
|
|
'DataGatheringService' |
|
|
|
); |
|
|
|
console.timeEnd('data-gathering-max'); |
|
|
|
} |
|
|
|
const dataGatheringItems = await this.getSymbolsMax(); |
|
|
|
await this.gatherSymbols(dataGatheringItems); |
|
|
|
} |
|
|
|
|
|
|
|
public async gatherSymbol({ dataSource, symbol }: UniqueAsset) { |
|
|
|
const isDataGatheringLocked = await this.prismaService.property.findUnique({ |
|
|
|
where: { key: PROPERTY_LOCKED_DATA_GATHERING } |
|
|
|
}); |
|
|
|
|
|
|
|
if (!isDataGatheringLocked) { |
|
|
|
Logger.log( |
|
|
|
`Symbol data gathering for ${symbol} has been started.`, |
|
|
|
'DataGatheringService' |
|
|
|
); |
|
|
|
console.time('data-gathering-symbol'); |
|
|
|
|
|
|
|
await this.prismaService.property.create({ |
|
|
|
data: { |
|
|
|
key: PROPERTY_LOCKED_DATA_GATHERING, |
|
|
|
value: new Date().toISOString() |
|
|
|
} |
|
|
|
}); |
|
|
|
|
|
|
|
const symbols = (await this.getSymbolsMax()).filter( |
|
|
|
(dataGatheringItem) => { |
|
|
|
const symbols = (await this.getSymbolsMax()).filter((dataGatheringItem) => { |
|
|
|
return ( |
|
|
|
dataGatheringItem.dataSource === dataSource && |
|
|
|
dataGatheringItem.symbol === symbol |
|
|
|
); |
|
|
|
} |
|
|
|
); |
|
|
|
|
|
|
|
try { |
|
|
|
await this.gatherSymbols(symbols); |
|
|
|
|
|
|
|
await this.prismaService.property.upsert({ |
|
|
|
create: { |
|
|
|
key: PROPERTY_LAST_DATA_GATHERING, |
|
|
|
value: new Date().toISOString() |
|
|
|
}, |
|
|
|
update: { value: new Date().toISOString() }, |
|
|
|
where: { key: PROPERTY_LAST_DATA_GATHERING } |
|
|
|
}); |
|
|
|
} catch (error) { |
|
|
|
Logger.error(error, 'DataGatheringService'); |
|
|
|
} |
|
|
|
|
|
|
|
await this.prismaService.property.delete({ |
|
|
|
where: { |
|
|
|
key: PROPERTY_LOCKED_DATA_GATHERING |
|
|
|
} |
|
|
|
}); |
|
|
|
|
|
|
|
Logger.log( |
|
|
|
`Symbol data gathering for ${symbol} has been completed.`, |
|
|
|
'DataGatheringService' |
|
|
|
); |
|
|
|
console.timeEnd('data-gathering-symbol'); |
|
|
|
} |
|
|
|
await this.gatherSymbols(symbols); |
|
|
|
} |
|
|
|
|
|
|
|
public async gatherSymbolForDate({ |
|
|
@ -358,34 +224,6 @@ export class DataGatheringService { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
public async getDataGatheringProgress() { |
|
|
|
const isInProgress = await this.getIsInProgress(); |
|
|
|
|
|
|
|
if (isInProgress) { |
|
|
|
return this.dataGatheringProgress; |
|
|
|
} |
|
|
|
|
|
|
|
return undefined; |
|
|
|
} |
|
|
|
|
|
|
|
public async getIsInProgress() { |
|
|
|
return await this.prismaService.property.findUnique({ |
|
|
|
where: { key: PROPERTY_LOCKED_DATA_GATHERING } |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
|
public async getLastDataGathering() { |
|
|
|
const lastDataGathering = await this.prismaService.property.findUnique({ |
|
|
|
where: { key: PROPERTY_LAST_DATA_GATHERING } |
|
|
|
}); |
|
|
|
|
|
|
|
if (lastDataGathering?.value) { |
|
|
|
return new Date(lastDataGathering.value); |
|
|
|
} |
|
|
|
|
|
|
|
return undefined; |
|
|
|
} |
|
|
|
|
|
|
|
public async getSymbolsMax(): Promise<IDataGatheringItem[]> { |
|
|
|
const startDate = |
|
|
|
( |
|
|
@ -454,19 +292,6 @@ export class DataGatheringService { |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
|
public async reset() { |
|
|
|
Logger.log('Data gathering has been reset.', 'DataGatheringService'); |
|
|
|
|
|
|
|
await this.prismaService.property.deleteMany({ |
|
|
|
where: { |
|
|
|
OR: [ |
|
|
|
{ key: PROPERTY_LAST_DATA_GATHERING }, |
|
|
|
{ key: PROPERTY_LOCKED_DATA_GATHERING } |
|
|
|
] |
|
|
|
} |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
|
private async getSymbols7D(): Promise<IDataGatheringItem[]> { |
|
|
|
const startDate = subDays(resetHours(new Date()), 7); |
|
|
|
|
|
|
@ -529,16 +354,4 @@ export class DataGatheringService { |
|
|
|
|
|
|
|
return [...currencyPairsToGather, ...symbolProfilesToGather]; |
|
|
|
} |
|
|
|
|
|
|
|
private async isDataGatheringNeeded() { |
|
|
|
const lastDataGathering = await this.getLastDataGathering(); |
|
|
|
|
|
|
|
const isDataGatheringLocked = await this.prismaService.property.findUnique({ |
|
|
|
where: { key: PROPERTY_LOCKED_DATA_GATHERING } |
|
|
|
}); |
|
|
|
|
|
|
|
const diffInHours = differenceInHours(new Date(), lastDataGathering); |
|
|
|
|
|
|
|
return (diffInHours >= 1 || !lastDataGathering) && !isDataGatheringLocked; |
|
|
|
} |
|
|
|
} |
|
|
|