Browse Source

Feature/skip job creation for manual data source without scraper configuration (#1857)

* Skip job creation for MANUAL data source without scraper configuration

* Update changelog
pull/1858/head^2
Thomas Kaul 2 years ago
committed by GitHub
parent
commit
e23ff33e6f
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      CHANGELOG.md
  2. 66
      apps/api/src/app/admin/admin.controller.ts
  3. 10
      apps/api/src/app/order/order.service.ts
  4. 32
      apps/api/src/services/cron.service.ts
  5. 120
      apps/api/src/services/data-gathering.service.ts

8
CHANGELOG.md

@ -5,12 +5,20 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## Unreleased
### Changed
- Skipped creating queue jobs for asset profiles with `MANUAL` data source not having a scraper configuration
- Reduced the execution interval of the data gathering to every hour
## 1.254.0 - 2023-04-14
### Changed
- Improved the queue jobs implementation by adding in bulk
- Improved the queue jobs implementation by introducing unique job ids
- Reverted the execution interval of the data gathering from every 12 hours to every 4 hours
## 1.253.0 - 2023-04-14

66
apps/api/src/app/admin/admin.controller.ts

@ -100,19 +100,21 @@ export class AdminController {
const uniqueAssets = await this.dataGatheringService.getUniqueAssets();
for (const { dataSource, symbol } of uniqueAssets) {
await this.dataGatheringService.addJobToQueue(
GATHER_ASSET_PROFILE_PROCESS,
{
dataSource,
symbol
},
{
...GATHER_ASSET_PROFILE_PROCESS_OPTIONS,
jobId: `${dataSource}-${symbol}}`
}
);
}
await this.dataGatheringService.addJobsToQueue(
uniqueAssets.map(({ dataSource, symbol }) => {
return {
data: {
dataSource,
symbol
},
name: GATHER_ASSET_PROFILE_PROCESS,
opts: {
...GATHER_ASSET_PROFILE_PROCESS_OPTIONS,
jobId: `${dataSource}-${symbol}}`
}
};
})
);
this.dataGatheringService.gatherMax();
}
@ -134,19 +136,21 @@ export class AdminController {
const uniqueAssets = await this.dataGatheringService.getUniqueAssets();
for (const { dataSource, symbol } of uniqueAssets) {
await this.dataGatheringService.addJobToQueue(
GATHER_ASSET_PROFILE_PROCESS,
{
dataSource,
symbol
},
{
...GATHER_ASSET_PROFILE_PROCESS_OPTIONS,
jobId: `${dataSource}-${symbol}}`
}
);
}
await this.dataGatheringService.addJobsToQueue(
uniqueAssets.map(({ dataSource, symbol }) => {
return {
data: {
dataSource,
symbol
},
name: GATHER_ASSET_PROFILE_PROCESS,
opts: {
...GATHER_ASSET_PROFILE_PROCESS_OPTIONS,
jobId: `${dataSource}-${symbol}}`
}
};
})
);
}
@Post('gather/profile-data/:dataSource/:symbol')
@ -167,17 +171,17 @@ export class AdminController {
);
}
await this.dataGatheringService.addJobToQueue(
GATHER_ASSET_PROFILE_PROCESS,
{
await this.dataGatheringService.addJobToQueue({
data: {
dataSource,
symbol
},
{
name: GATHER_ASSET_PROFILE_PROCESS,
opts: {
...GATHER_ASSET_PROFILE_PROCESS_OPTIONS,
jobId: `${dataSource}-${symbol}}`
}
);
});
}
@Post('gather/:dataSource/:symbol')

10
apps/api/src/app/order/order.service.ts

@ -112,17 +112,17 @@ export class OrderService {
};
}
await this.dataGatheringService.addJobToQueue(
GATHER_ASSET_PROFILE_PROCESS,
{
await this.dataGatheringService.addJobToQueue({
data: {
dataSource: data.SymbolProfile.connectOrCreate.create.dataSource,
symbol: data.SymbolProfile.connectOrCreate.create.symbol
},
{
name: GATHER_ASSET_PROFILE_PROCESS,
opts: {
...GATHER_ASSET_PROFILE_PROCESS_OPTIONS,
jobId: `${data.SymbolProfile.connectOrCreate.create.dataSource}-${data.SymbolProfile.connectOrCreate.create.symbol}}`
}
);
});
const isDraft = isAfter(data.date as Date, endOfToday());

32
apps/api/src/services/cron.service.ts

@ -19,8 +19,8 @@ export class CronService {
private readonly twitterBotService: TwitterBotService
) {}
@Cron(CronExpression.EVERY_4_HOURS)
public async runEveryFourHours() {
@Cron(CronExpression.EVERY_HOUR)
public async runEveryHour() {
await this.dataGatheringService.gather7Days();
}
@ -38,18 +38,20 @@ export class CronService {
public async runEverySundayAtTwelvePm() {
const uniqueAssets = await this.dataGatheringService.getUniqueAssets();
for (const { dataSource, symbol } of uniqueAssets) {
await this.dataGatheringService.addJobToQueue(
GATHER_ASSET_PROFILE_PROCESS,
{
dataSource,
symbol
},
{
...GATHER_ASSET_PROFILE_PROCESS_OPTIONS,
jobId: `${dataSource}-${symbol}}`
}
);
}
await this.dataGatheringService.addJobsToQueue(
uniqueAssets.map(({ dataSource, symbol }) => {
return {
data: {
dataSource,
symbol
},
name: GATHER_ASSET_PROFILE_PROCESS,
opts: {
...GATHER_ASSET_PROFILE_PROCESS_OPTIONS,
jobId: `${dataSource}-${symbol}}`
}
};
})
);
}
}

120
apps/api/src/services/data-gathering.service.ts

@ -11,6 +11,7 @@ import { Inject, Injectable, Logger } from '@nestjs/common';
import { DataSource } from '@prisma/client';
import { JobOptions, Queue } from 'bull';
import { format, min, subDays, subYears } from 'date-fns';
import { isEmpty } from 'lodash';
import { DataProviderService } from './data-provider/data-provider.service';
import { DataEnhancerInterface } from './data-provider/interfaces/data-enhancer.interface';
@ -33,7 +34,15 @@ export class DataGatheringService {
private readonly symbolProfileService: SymbolProfileService
) {}
public async addJobToQueue(name: string, data: any, opts?: JobOptions) {
public async addJobToQueue({
data,
name,
opts
}: {
data: any;
name: string;
opts?: JobOptions;
}) {
return this.dataGatheringQueue.add(name, data, opts);
}
@ -223,48 +232,6 @@ export class DataGatheringService {
);
}
public async getSymbolsMax(): Promise<IDataGatheringItem[]> {
const startDate =
(
await this.prismaService.order.findFirst({
orderBy: [{ date: 'asc' }]
})
)?.date ?? new Date();
const currencyPairsToGather = this.exchangeRateDataService
.getCurrencyPairs()
.map(({ dataSource, symbol }) => {
return {
dataSource,
symbol,
date: min([startDate, subYears(new Date(), 10)])
};
});
const symbolProfilesToGather = (
await this.prismaService.symbolProfile.findMany({
orderBy: [{ symbol: 'asc' }],
select: {
dataSource: true,
Order: {
orderBy: [{ date: 'asc' }],
select: { date: true },
take: 1
},
scraperConfiguration: true,
symbol: true
}
})
).map((symbolProfile) => {
return {
...symbolProfile,
date: symbolProfile.Order?.[0]?.date ?? startDate
};
});
return [...currencyPairsToGather, ...symbolProfilesToGather];
}
public async getUniqueAssets(): Promise<UniqueAsset[]> {
const symbolProfiles = await this.prismaService.symbolProfile.findMany({
orderBy: [{ symbol: 'asc' }]
@ -299,7 +266,7 @@ export class DataGatheringService {
// Only consider symbols with incomplete market data for the last
// 7 days
const symbolsNotToGather = (
const symbolsWithCompleteMarketData = (
await this.prismaService.marketData.groupBy({
_count: true,
by: ['symbol'],
@ -317,8 +284,14 @@ export class DataGatheringService {
});
const symbolProfilesToGather = symbolProfiles
.filter(({ symbol }) => {
return !symbolsNotToGather.includes(symbol);
.filter(({ dataSource, scraperConfiguration, symbol }) => {
const manualDataSourceWithScraperConfiguration =
dataSource === 'MANUAL' && !isEmpty(scraperConfiguration);
return (
!symbolsWithCompleteMarketData.includes(symbol) &&
(dataSource !== 'MANUAL' || manualDataSourceWithScraperConfiguration)
);
})
.map((symbolProfile) => {
return {
@ -330,7 +303,7 @@ export class DataGatheringService {
const currencyPairsToGather = this.exchangeRateDataService
.getCurrencyPairs()
.filter(({ symbol }) => {
return !symbolsNotToGather.includes(symbol);
return !symbolsWithCompleteMarketData.includes(symbol);
})
.map(({ dataSource, symbol }) => {
return {
@ -342,4 +315,57 @@ export class DataGatheringService {
return [...currencyPairsToGather, ...symbolProfilesToGather];
}
private async getSymbolsMax(): Promise<IDataGatheringItem[]> {
const startDate =
(
await this.prismaService.order.findFirst({
orderBy: [{ date: 'asc' }]
})
)?.date ?? new Date();
const currencyPairsToGather = this.exchangeRateDataService
.getCurrencyPairs()
.map(({ dataSource, symbol }) => {
return {
dataSource,
symbol,
date: min([startDate, subYears(new Date(), 10)])
};
});
const symbolProfilesToGather = (
await this.prismaService.symbolProfile.findMany({
orderBy: [{ symbol: 'asc' }],
select: {
dataSource: true,
Order: {
orderBy: [{ date: 'asc' }],
select: { date: true },
take: 1
},
scraperConfiguration: true,
symbol: true
}
})
)
.filter((symbolProfile) => {
const manualDataSourceWithScraperConfiguration =
symbolProfile.dataSource === 'MANUAL' &&
!isEmpty(symbolProfile.scraperConfiguration);
return (
symbolProfile.dataSource !== 'MANUAL' ||
manualDataSourceWithScraperConfiguration
);
})
.map((symbolProfile) => {
return {
...symbolProfile,
date: symbolProfile.Order?.[0]?.date ?? startDate
};
});
return [...currencyPairsToGather, ...symbolProfilesToGather];
}
}

Loading…
Cancel
Save