Browse Source

Feature/optimize 7d data gathering by prioritization (#3575)

* Optimize 7d data gathering by prioritization

* Update changelog
pull/3574/head
Thomas Kaul 7 months ago
committed by GitHub
parent
commit
d5c56fb16c
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 4
      CHANGELOG.md
  2. 10
      apps/api/src/app/admin/admin.controller.ts
  3. 5
      apps/api/src/services/cron.service.ts
  4. 118
      apps/api/src/services/data-gathering/data-gathering.service.ts
  5. 34
      apps/api/src/services/symbol-profile/symbol-profile.service.ts

4
CHANGELOG.md

@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## Unreleased ## Unreleased
### Changed
- Optimized the 7d data gathering by prioritizing the currencies
### Fixed ### Fixed
- Fixed the table sorting of the holdings tab on the home page - Fixed the table sorting of the holdings tab on the home page

10
apps/api/src/app/admin/admin.controller.ts

@ -81,10 +81,11 @@ export class AdminController {
@Post('gather/max') @Post('gather/max')
@UseGuards(AuthGuard('jwt'), HasPermissionGuard) @UseGuards(AuthGuard('jwt'), HasPermissionGuard)
public async gatherMax(): Promise<void> { public async gatherMax(): Promise<void> {
const uniqueAssets = await this.dataGatheringService.getUniqueAssets(); const assetProfileIdentifiers =
await this.dataGatheringService.getAllAssetProfileIdentifiers();
await this.dataGatheringService.addJobsToQueue( await this.dataGatheringService.addJobsToQueue(
uniqueAssets.map(({ dataSource, symbol }) => { assetProfileIdentifiers.map(({ dataSource, symbol }) => {
return { return {
data: { data: {
dataSource, dataSource,
@ -107,10 +108,11 @@ export class AdminController {
@Post('gather/profile-data') @Post('gather/profile-data')
@UseGuards(AuthGuard('jwt'), HasPermissionGuard) @UseGuards(AuthGuard('jwt'), HasPermissionGuard)
public async gatherProfileData(): Promise<void> { public async gatherProfileData(): Promise<void> {
const uniqueAssets = await this.dataGatheringService.getUniqueAssets(); const assetProfileIdentifiers =
await this.dataGatheringService.getAllAssetProfileIdentifiers();
await this.dataGatheringService.addJobsToQueue( await this.dataGatheringService.addJobsToQueue(
uniqueAssets.map(({ dataSource, symbol }) => { assetProfileIdentifiers.map(({ dataSource, symbol }) => {
return { return {
data: { data: {
dataSource, dataSource,

5
apps/api/src/services/cron.service.ts

@ -45,10 +45,11 @@ export class CronService {
@Cron(CronService.EVERY_SUNDAY_AT_LUNCH_TIME) @Cron(CronService.EVERY_SUNDAY_AT_LUNCH_TIME)
public async runEverySundayAtTwelvePm() { public async runEverySundayAtTwelvePm() {
if (await this.isDataGatheringEnabled()) { if (await this.isDataGatheringEnabled()) {
const uniqueAssets = await this.dataGatheringService.getUniqueAssets(); const assetProfileIdentifiers =
await this.dataGatheringService.getAllAssetProfileIdentifiers();
await this.dataGatheringService.addJobsToQueue( await this.dataGatheringService.addJobsToQueue(
uniqueAssets.map(({ dataSource, symbol }) => { assetProfileIdentifiers.map(({ dataSource, symbol }) => {
return { return {
data: { data: {
dataSource, dataSource,

118
apps/api/src/services/data-gathering/data-gathering.service.ts

@ -10,6 +10,7 @@ import {
DATA_GATHERING_QUEUE, DATA_GATHERING_QUEUE,
DATA_GATHERING_QUEUE_PRIORITY_HIGH, DATA_GATHERING_QUEUE_PRIORITY_HIGH,
DATA_GATHERING_QUEUE_PRIORITY_LOW, DATA_GATHERING_QUEUE_PRIORITY_LOW,
DATA_GATHERING_QUEUE_PRIORITY_MEDIUM,
GATHER_HISTORICAL_MARKET_DATA_PROCESS, GATHER_HISTORICAL_MARKET_DATA_PROCESS,
GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS, GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS,
PROPERTY_BENCHMARKS PROPERTY_BENCHMARKS
@ -62,9 +63,22 @@ export class DataGatheringService {
} }
public async gather7Days() { public async gather7Days() {
const dataGatheringItems = await this.getSymbols7D();
await this.gatherSymbols({ await this.gatherSymbols({
dataGatheringItems, dataGatheringItems: await this.getCurrencies7D(),
priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH
});
await this.gatherSymbols({
dataGatheringItems: await this.getSymbols7D({
withUserSubscription: true
}),
priority: DATA_GATHERING_QUEUE_PRIORITY_MEDIUM
});
await this.gatherSymbols({
dataGatheringItems: await this.getSymbols7D({
withUserSubscription: false
}),
priority: DATA_GATHERING_QUEUE_PRIORITY_LOW priority: DATA_GATHERING_QUEUE_PRIORITY_LOW
}); });
} }
@ -138,7 +152,7 @@ export class DataGatheringService {
}); });
if (!uniqueAssets) { if (!uniqueAssets) {
uniqueAssets = await this.getUniqueAssets(); uniqueAssets = await this.getAllAssetProfileIdentifiers();
} }
if (uniqueAssets.length <= 0) { if (uniqueAssets.length <= 0) {
@ -270,7 +284,7 @@ export class DataGatheringService {
); );
} }
public async getUniqueAssets(): Promise<UniqueAsset[]> { public async getAllAssetProfileIdentifiers(): Promise<UniqueAsset[]> {
const symbolProfiles = await this.prismaService.symbolProfile.findMany({ const symbolProfiles = await this.prismaService.symbolProfile.findMany({
orderBy: [{ symbol: 'asc' }] orderBy: [{ symbol: 'asc' }]
}); });
@ -290,73 +304,83 @@ export class DataGatheringService {
}); });
} }
private getEarliestDate(aStartDate: Date) { private async getAssetProfileIdentifiersWithCompleteMarketData(): Promise<
return min([aStartDate, subYears(new Date(), 10)]); UniqueAsset[]
} > {
return (
private async getSymbols7D(): Promise<IDataGatheringItem[]> {
const startDate = subDays(resetHours(new Date()), 7);
const symbolProfiles = await this.prismaService.symbolProfile.findMany({
orderBy: [{ symbol: 'asc' }],
select: {
dataSource: true,
scraperConfiguration: true,
symbol: true
}
});
// Only consider symbols with incomplete market data for the last
// 7 days
const symbolsWithCompleteMarketData = (
await this.prismaService.marketData.groupBy({ await this.prismaService.marketData.groupBy({
_count: true, _count: true,
by: ['symbol'], by: ['dataSource', 'symbol'],
orderBy: [{ symbol: 'asc' }], orderBy: [{ symbol: 'asc' }],
where: { where: {
date: { gt: startDate }, date: { gt: subDays(resetHours(new Date()), 7) },
state: 'CLOSE' state: 'CLOSE'
} }
}) })
) )
.filter((group) => { .filter(({ _count }) => {
return group._count >= 6; return _count >= 6;
}) })
.map((group) => { .map(({ dataSource, symbol }) => {
return group.symbol; return { dataSource, symbol };
}); });
}
const symbolProfilesToGather = symbolProfiles private async getCurrencies7D(): Promise<IDataGatheringItem[]> {
const assetProfileIdentifiersWithCompleteMarketData =
await this.getAssetProfileIdentifiersWithCompleteMarketData();
return this.exchangeRateDataService
.getCurrencyPairs()
.filter(({ dataSource, symbol }) => {
return !assetProfileIdentifiersWithCompleteMarketData.some((item) => {
return item.dataSource === dataSource && item.symbol === symbol;
});
})
.map(({ dataSource, symbol }) => {
return {
dataSource,
symbol,
date: subDays(resetHours(new Date()), 7)
};
});
}
private getEarliestDate(aStartDate: Date) {
return min([aStartDate, subYears(new Date(), 10)]);
}
private async getSymbols7D({
withUserSubscription = false
}: {
withUserSubscription?: boolean;
}): Promise<IDataGatheringItem[]> {
const symbolProfiles =
await this.symbolProfileService.getSymbolProfilesByUserSubscription({
withUserSubscription
});
const assetProfileIdentifiersWithCompleteMarketData =
await this.getAssetProfileIdentifiersWithCompleteMarketData();
return symbolProfiles
.filter(({ dataSource, scraperConfiguration, symbol }) => { .filter(({ dataSource, scraperConfiguration, symbol }) => {
const manualDataSourceWithScraperConfiguration = const manualDataSourceWithScraperConfiguration =
dataSource === 'MANUAL' && !isEmpty(scraperConfiguration); dataSource === 'MANUAL' && !isEmpty(scraperConfiguration);
return ( return (
!symbolsWithCompleteMarketData.includes(symbol) && !assetProfileIdentifiersWithCompleteMarketData.some((item) => {
return item.dataSource === dataSource && item.symbol === symbol;
}) &&
(dataSource !== 'MANUAL' || manualDataSourceWithScraperConfiguration) (dataSource !== 'MANUAL' || manualDataSourceWithScraperConfiguration)
); );
}) })
.map((symbolProfile) => { .map((symbolProfile) => {
return { return {
...symbolProfile, ...symbolProfile,
date: startDate date: subDays(resetHours(new Date()), 7)
};
});
const currencyPairsToGather = this.exchangeRateDataService
.getCurrencyPairs()
.filter(({ symbol }) => {
return !symbolsWithCompleteMarketData.includes(symbol);
})
.map(({ dataSource, symbol }) => {
return {
dataSource,
symbol,
date: startDate
}; };
}); });
return [...currencyPairsToGather, ...symbolProfilesToGather];
} }
private async getSymbolsMax(): Promise<IDataGatheringItem[]> { private async getSymbolsMax(): Promise<IDataGatheringItem[]> {

34
apps/api/src/services/symbol-profile/symbol-profile.service.ts

@ -91,6 +91,40 @@ export class SymbolProfileService {
}); });
} }
public async getSymbolProfilesByUserSubscription({
withUserSubscription = false
}: {
withUserSubscription?: boolean;
}) {
return this.prismaService.symbolProfile.findMany({
include: {
Order: {
include: {
User: true
}
}
},
orderBy: [{ symbol: 'asc' }],
where: {
Order: withUserSubscription
? {
some: {
User: {
Subscription: { some: { expiresAt: { gt: new Date() } } }
}
}
}
: {
every: {
User: {
Subscription: { none: { expiresAt: { gt: new Date() } } }
}
}
}
}
});
}
public updateSymbolProfile({ public updateSymbolProfile({
assetClass, assetClass,
assetSubClass, assetSubClass,

Loading…
Cancel
Save