Browse Source

Feature/improve asset profile data gathering (#5997)

* Improve asset profile data gathering

* Update changelog
pull/5995/head^2
Thomas Kaul 3 weeks ago
committed by GitHub
parent
commit
d341c4804a
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 1
      CHANGELOG.md
  2. 4
      apps/api/src/app/admin/admin.controller.ts
  3. 4
      apps/api/src/services/cron/cron.service.ts
  4. 53
      apps/api/src/services/queues/data-gathering/data-gathering.service.ts

1
CHANGELOG.md

@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed ### Changed
- Restricted the asset profile data gathering on Sundays to only process outdated asset profiles
- Eliminated `uuid` in favor of using `randomUUID` from `node:crypto` - Eliminated `uuid` in favor of using `randomUUID` from `node:crypto`
- Upgraded `color` from version `5.0.0` to `5.0.3` - Upgraded `color` from version `5.0.0` to `5.0.3`

4
apps/api/src/app/admin/admin.controller.ts

@ -93,7 +93,7 @@ export class AdminController {
@UseGuards(AuthGuard('jwt'), HasPermissionGuard) @UseGuards(AuthGuard('jwt'), HasPermissionGuard)
public async gatherMax(): Promise<void> { public async gatherMax(): Promise<void> {
const assetProfileIdentifiers = const assetProfileIdentifiers =
await this.dataGatheringService.getAllActiveAssetProfileIdentifiers(); await this.dataGatheringService.getActiveAssetProfileIdentifiers();
await this.dataGatheringService.addJobsToQueue( await this.dataGatheringService.addJobsToQueue(
assetProfileIdentifiers.map(({ dataSource, symbol }) => { assetProfileIdentifiers.map(({ dataSource, symbol }) => {
@ -120,7 +120,7 @@ export class AdminController {
@UseGuards(AuthGuard('jwt'), HasPermissionGuard) @UseGuards(AuthGuard('jwt'), HasPermissionGuard)
public async gatherProfileData(): Promise<void> { public async gatherProfileData(): Promise<void> {
const assetProfileIdentifiers = const assetProfileIdentifiers =
await this.dataGatheringService.getAllActiveAssetProfileIdentifiers(); await this.dataGatheringService.getActiveAssetProfileIdentifiers();
await this.dataGatheringService.addJobsToQueue( await this.dataGatheringService.addJobsToQueue(
assetProfileIdentifiers.map(({ dataSource, symbol }) => { assetProfileIdentifiers.map(({ dataSource, symbol }) => {

4
apps/api/src/services/cron/cron.service.ts

@ -59,7 +59,9 @@ export class CronService {
public async runEverySundayAtTwelvePm() { public async runEverySundayAtTwelvePm() {
if (await this.isDataGatheringEnabled()) { if (await this.isDataGatheringEnabled()) {
const assetProfileIdentifiers = const assetProfileIdentifiers =
await this.dataGatheringService.getAllActiveAssetProfileIdentifiers(); await this.dataGatheringService.getActiveAssetProfileIdentifiers({
maxAge: '60 days'
});
await this.dataGatheringService.addJobsToQueue( await this.dataGatheringService.addJobsToQueue(
assetProfileIdentifiers.map(({ dataSource, symbol }) => { assetProfileIdentifiers.map(({ dataSource, symbol }) => {

53
apps/api/src/services/queues/data-gathering/data-gathering.service.ts

@ -28,8 +28,9 @@ import { InjectQueue } from '@nestjs/bull';
import { Inject, Injectable, Logger } from '@nestjs/common'; import { Inject, Injectable, Logger } from '@nestjs/common';
import { DataSource } from '@prisma/client'; import { DataSource } from '@prisma/client';
import { JobOptions, Queue } from 'bull'; import { JobOptions, Queue } from 'bull';
import { format, min, subDays, subYears } from 'date-fns'; import { format, min, subDays, subMilliseconds, subYears } from 'date-fns';
import { isEmpty } from 'lodash'; import { isEmpty } from 'lodash';
import ms, { StringValue } from 'ms';
@Injectable() @Injectable()
export class DataGatheringService { export class DataGatheringService {
@ -160,8 +161,7 @@ export class DataGatheringService {
); );
if (!assetProfileIdentifiers) { if (!assetProfileIdentifiers) {
assetProfileIdentifiers = assetProfileIdentifiers = await this.getActiveAssetProfileIdentifiers();
await this.getAllActiveAssetProfileIdentifiers();
} }
if (assetProfileIdentifiers.length <= 0) { if (assetProfileIdentifiers.length <= 0) {
@ -301,29 +301,36 @@ export class DataGatheringService {
); );
} }
public async getAllActiveAssetProfileIdentifiers(): Promise< /**
AssetProfileIdentifier[] * Returns active asset profile identifiers
> { *
const symbolProfiles = await this.prismaService.symbolProfile.findMany({ * @param {StringValue} maxAge - Optional. Specifies the maximum allowed age
orderBy: [{ symbol: 'asc' }], * of a profiles last update timestamp. Only asset profiles considered stale
* are returned.
*/
public async getActiveAssetProfileIdentifiers({
maxAge
}: {
maxAge?: StringValue;
} = {}): Promise<AssetProfileIdentifier[]> {
return this.prismaService.symbolProfile.findMany({
orderBy: [{ symbol: 'asc' }, { dataSource: 'asc' }],
select: {
dataSource: true,
symbol: true
},
where: { where: {
isActive: true dataSource: {
notIn: ['MANUAL', 'RAPID_API']
},
isActive: true,
...(maxAge && {
updatedAt: {
lt: subMilliseconds(new Date(), ms(maxAge))
}
})
} }
}); });
return symbolProfiles
.filter(({ dataSource }) => {
return (
dataSource !== DataSource.MANUAL &&
dataSource !== DataSource.RAPID_API
);
})
.map(({ dataSource, symbol }) => {
return {
dataSource,
symbol
};
});
} }
private async getAssetProfileIdentifiersWithCompleteMarketData(): Promise< private async getAssetProfileIdentifiersWithCompleteMarketData(): Promise<

Loading…
Cancel
Save