|
@ -2,8 +2,7 @@ import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile.se |
|
|
import { |
|
|
import { |
|
|
DATA_GATHERING_QUEUE, |
|
|
DATA_GATHERING_QUEUE, |
|
|
GATHER_HISTORICAL_MARKET_DATA_PROCESS, |
|
|
GATHER_HISTORICAL_MARKET_DATA_PROCESS, |
|
|
GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS, |
|
|
GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS |
|
|
QUEUE_JOB_STATUS_LIST |
|
|
|
|
|
} from '@ghostfolio/common/config'; |
|
|
} from '@ghostfolio/common/config'; |
|
|
import { DATE_FORMAT, resetHours } from '@ghostfolio/common/helper'; |
|
|
import { DATE_FORMAT, resetHours } from '@ghostfolio/common/helper'; |
|
|
import { UniqueAsset } from '@ghostfolio/common/interfaces'; |
|
|
import { UniqueAsset } from '@ghostfolio/common/interfaces'; |
|
@ -12,6 +11,7 @@ import { Inject, Injectable, Logger } from '@nestjs/common'; |
|
|
import { DataSource } from '@prisma/client'; |
|
|
import { DataSource } from '@prisma/client'; |
|
|
import { JobOptions, Queue } from 'bull'; |
|
|
import { JobOptions, Queue } from 'bull'; |
|
|
import { format, min, subDays, subYears } from 'date-fns'; |
|
|
import { format, min, subDays, subYears } from 'date-fns'; |
|
|
|
|
|
import { isEmpty } from 'lodash'; |
|
|
|
|
|
|
|
|
import { DataProviderService } from './data-provider/data-provider.service'; |
|
|
import { DataProviderService } from './data-provider/data-provider.service'; |
|
|
import { DataEnhancerInterface } from './data-provider/interfaces/data-enhancer.interface'; |
|
|
import { DataEnhancerInterface } from './data-provider/interfaces/data-enhancer.interface'; |
|
@ -34,17 +34,22 @@ export class DataGatheringService { |
|
|
private readonly symbolProfileService: SymbolProfileService |
|
|
private readonly symbolProfileService: SymbolProfileService |
|
|
) {} |
|
|
) {} |
|
|
|
|
|
|
|
|
public async addJobToQueue(name: string, data: any, options?: JobOptions) { |
|
|
public async addJobToQueue({ |
|
|
const hasJob = await this.hasJob(name, data); |
|
|
data, |
|
|
|
|
|
name, |
|
|
if (hasJob) { |
|
|
opts |
|
|
Logger.log( |
|
|
}: { |
|
|
`Job ${name} with data ${JSON.stringify(data)} already exists.`, |
|
|
data: any; |
|
|
'DataGatheringService' |
|
|
name: string; |
|
|
); |
|
|
opts?: JobOptions; |
|
|
} else { |
|
|
}) { |
|
|
return this.dataGatheringQueue.add(name, data, options); |
|
|
return this.dataGatheringQueue.add(name, data, opts); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public async addJobsToQueue( |
|
|
|
|
|
jobs: { data: any; name: string; opts?: JobOptions }[] |
|
|
|
|
|
) { |
|
|
|
|
|
return this.dataGatheringQueue.addBulk(jobs); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
public async gather7Days() { |
|
|
public async gather7Days() { |
|
@ -209,59 +214,22 @@ export class DataGatheringService { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
public async gatherSymbols(aSymbolsWithStartDate: IDataGatheringItem[]) { |
|
|
public async gatherSymbols(aSymbolsWithStartDate: IDataGatheringItem[]) { |
|
|
for (const { dataSource, date, symbol } of aSymbolsWithStartDate) { |
|
|
await this.addJobsToQueue( |
|
|
await this.addJobToQueue( |
|
|
aSymbolsWithStartDate.map(({ dataSource, date, symbol }) => { |
|
|
GATHER_HISTORICAL_MARKET_DATA_PROCESS, |
|
|
return { |
|
|
{ |
|
|
data: { |
|
|
dataSource, |
|
|
dataSource, |
|
|
date, |
|
|
date, |
|
|
symbol |
|
|
symbol |
|
|
}, |
|
|
}, |
|
|
GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS |
|
|
name: GATHER_HISTORICAL_MARKET_DATA_PROCESS, |
|
|
); |
|
|
opts: { |
|
|
} |
|
|
...GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS, |
|
|
|
|
|
jobId: `${dataSource}-${symbol}-${format(date, DATE_FORMAT)}` |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
public async getSymbolsMax(): Promise<IDataGatheringItem[]> { |
|
|
|
|
|
const startDate = |
|
|
|
|
|
( |
|
|
|
|
|
await this.prismaService.order.findFirst({ |
|
|
|
|
|
orderBy: [{ date: 'asc' }] |
|
|
|
|
|
}) |
|
|
|
|
|
)?.date ?? new Date(); |
|
|
|
|
|
|
|
|
|
|
|
const currencyPairsToGather = this.exchangeRateDataService |
|
|
|
|
|
.getCurrencyPairs() |
|
|
|
|
|
.map(({ dataSource, symbol }) => { |
|
|
|
|
|
return { |
|
|
|
|
|
dataSource, |
|
|
|
|
|
symbol, |
|
|
|
|
|
date: min([startDate, subYears(new Date(), 10)]) |
|
|
|
|
|
}; |
|
|
}; |
|
|
}); |
|
|
|
|
|
|
|
|
|
|
|
const symbolProfilesToGather = ( |
|
|
|
|
|
await this.prismaService.symbolProfile.findMany({ |
|
|
|
|
|
orderBy: [{ symbol: 'asc' }], |
|
|
|
|
|
select: { |
|
|
|
|
|
dataSource: true, |
|
|
|
|
|
Order: { |
|
|
|
|
|
orderBy: [{ date: 'asc' }], |
|
|
|
|
|
select: { date: true }, |
|
|
|
|
|
take: 1 |
|
|
|
|
|
}, |
|
|
|
|
|
scraperConfiguration: true, |
|
|
|
|
|
symbol: true |
|
|
|
|
|
} |
|
|
|
|
|
}) |
|
|
}) |
|
|
).map((symbolProfile) => { |
|
|
); |
|
|
return { |
|
|
|
|
|
...symbolProfile, |
|
|
|
|
|
date: symbolProfile.Order?.[0]?.date ?? startDate |
|
|
|
|
|
}; |
|
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
|
|
return [...currencyPairsToGather, ...symbolProfilesToGather]; |
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
public async getUniqueAssets(): Promise<UniqueAsset[]> { |
|
|
public async getUniqueAssets(): Promise<UniqueAsset[]> { |
|
@ -298,7 +266,7 @@ export class DataGatheringService { |
|
|
|
|
|
|
|
|
// Only consider symbols with incomplete market data for the last
|
|
|
// Only consider symbols with incomplete market data for the last
|
|
|
// 7 days
|
|
|
// 7 days
|
|
|
const symbolsNotToGather = ( |
|
|
const symbolsWithCompleteMarketData = ( |
|
|
await this.prismaService.marketData.groupBy({ |
|
|
await this.prismaService.marketData.groupBy({ |
|
|
_count: true, |
|
|
_count: true, |
|
|
by: ['symbol'], |
|
|
by: ['symbol'], |
|
@ -316,8 +284,14 @@ export class DataGatheringService { |
|
|
}); |
|
|
}); |
|
|
|
|
|
|
|
|
const symbolProfilesToGather = symbolProfiles |
|
|
const symbolProfilesToGather = symbolProfiles |
|
|
.filter(({ symbol }) => { |
|
|
.filter(({ dataSource, scraperConfiguration, symbol }) => { |
|
|
return !symbolsNotToGather.includes(symbol); |
|
|
const manualDataSourceWithScraperConfiguration = |
|
|
|
|
|
dataSource === 'MANUAL' && !isEmpty(scraperConfiguration); |
|
|
|
|
|
|
|
|
|
|
|
return ( |
|
|
|
|
|
!symbolsWithCompleteMarketData.includes(symbol) && |
|
|
|
|
|
(dataSource !== 'MANUAL' || manualDataSourceWithScraperConfiguration) |
|
|
|
|
|
); |
|
|
}) |
|
|
}) |
|
|
.map((symbolProfile) => { |
|
|
.map((symbolProfile) => { |
|
|
return { |
|
|
return { |
|
@ -329,7 +303,7 @@ export class DataGatheringService { |
|
|
const currencyPairsToGather = this.exchangeRateDataService |
|
|
const currencyPairsToGather = this.exchangeRateDataService |
|
|
.getCurrencyPairs() |
|
|
.getCurrencyPairs() |
|
|
.filter(({ symbol }) => { |
|
|
.filter(({ symbol }) => { |
|
|
return !symbolsNotToGather.includes(symbol); |
|
|
return !symbolsWithCompleteMarketData.includes(symbol); |
|
|
}) |
|
|
}) |
|
|
.map(({ dataSource, symbol }) => { |
|
|
.map(({ dataSource, symbol }) => { |
|
|
return { |
|
|
return { |
|
@ -342,17 +316,56 @@ export class DataGatheringService { |
|
|
return [...currencyPairsToGather, ...symbolProfilesToGather]; |
|
|
return [...currencyPairsToGather, ...symbolProfilesToGather]; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private async hasJob(name: string, data: any) { |
|
|
private async getSymbolsMax(): Promise<IDataGatheringItem[]> { |
|
|
const jobs = await this.dataGatheringQueue.getJobs( |
|
|
const startDate = |
|
|
QUEUE_JOB_STATUS_LIST.filter((status) => { |
|
|
( |
|
|
return status !== 'completed'; |
|
|
await this.prismaService.order.findFirst({ |
|
|
|
|
|
orderBy: [{ date: 'asc' }] |
|
|
}) |
|
|
}) |
|
|
); |
|
|
)?.date ?? new Date(); |
|
|
|
|
|
|
|
|
|
|
|
const currencyPairsToGather = this.exchangeRateDataService |
|
|
|
|
|
.getCurrencyPairs() |
|
|
|
|
|
.map(({ dataSource, symbol }) => { |
|
|
|
|
|
return { |
|
|
|
|
|
dataSource, |
|
|
|
|
|
symbol, |
|
|
|
|
|
date: min([startDate, subYears(new Date(), 10)]) |
|
|
|
|
|
}; |
|
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
|
|
const symbolProfilesToGather = ( |
|
|
|
|
|
await this.prismaService.symbolProfile.findMany({ |
|
|
|
|
|
orderBy: [{ symbol: 'asc' }], |
|
|
|
|
|
select: { |
|
|
|
|
|
dataSource: true, |
|
|
|
|
|
Order: { |
|
|
|
|
|
orderBy: [{ date: 'asc' }], |
|
|
|
|
|
select: { date: true }, |
|
|
|
|
|
take: 1 |
|
|
|
|
|
}, |
|
|
|
|
|
scraperConfiguration: true, |
|
|
|
|
|
symbol: true |
|
|
|
|
|
} |
|
|
|
|
|
}) |
|
|
|
|
|
) |
|
|
|
|
|
.filter((symbolProfile) => { |
|
|
|
|
|
const manualDataSourceWithScraperConfiguration = |
|
|
|
|
|
symbolProfile.dataSource === 'MANUAL' && |
|
|
|
|
|
!isEmpty(symbolProfile.scraperConfiguration); |
|
|
|
|
|
|
|
|
return jobs.some((job) => { |
|
|
|
|
|
return ( |
|
|
return ( |
|
|
job.name === name && JSON.stringify(job.data) === JSON.stringify(data) |
|
|
symbolProfile.dataSource !== 'MANUAL' || |
|
|
|
|
|
manualDataSourceWithScraperConfiguration |
|
|
); |
|
|
); |
|
|
|
|
|
}) |
|
|
|
|
|
.map((symbolProfile) => { |
|
|
|
|
|
return { |
|
|
|
|
|
...symbolProfile, |
|
|
|
|
|
date: symbolProfile.Order?.[0]?.date ?? startDate |
|
|
|
|
|
}; |
|
|
}); |
|
|
}); |
|
|
|
|
|
|
|
|
|
|
|
return [...currencyPairsToGather, ...symbolProfilesToGather]; |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|