Browse Source

Detect duplicate jobs

pull/991/head
Thomas 3 years ago
parent
commit
99e346824b
  1. 43
      apps/api/src/app/admin/admin.controller.ts
  2. 25
      apps/api/src/app/order/order.service.ts
  3. 21
      apps/api/src/services/cron.service.ts
  4. 2
      apps/api/src/services/data-gathering.module.ts
  5. 13
      apps/api/src/services/data-gathering.processor.ts
  6. 57
      apps/api/src/services/data-gathering.service.ts
  7. 11
      apps/api/src/services/data-provider/alpha-vantage/alpha-vantage.service.ts
  8. 9
      apps/api/src/services/data-provider/eod-historical-data/eod-historical-data.service.ts
  9. 9
      apps/api/src/services/data-provider/ghostfolio-scraper-api/ghostfolio-scraper-api.service.ts
  10. 9
      apps/api/src/services/data-provider/google-sheets/google-sheets.service.ts
  11. 9
      apps/api/src/services/data-provider/rakuten-rapid-api/rakuten-rapid-api.service.ts
  12. 18
      apps/api/src/services/data-provider/yahoo-finance/yahoo-finance.service.ts
  13. 5
      apps/client/src/app/components/admin-jobs/admin-jobs.component.ts
  14. 3
      apps/client/src/app/components/admin-jobs/admin-jobs.html
  15. 2
      apps/client/src/app/components/admin-overview/admin-overview.html
  16. 2
      apps/client/src/app/services/admin.service.ts
  17. 25
      libs/common/src/lib/config.ts

43
apps/api/src/app/admin/admin.controller.ts

@ -2,9 +2,8 @@ import { DataGatheringService } from '@ghostfolio/api/services/data-gathering.se
import { MarketDataService } from '@ghostfolio/api/services/market-data.service';
import { PropertyDto } from '@ghostfolio/api/services/property/property.dto';
import {
DATA_GATHERING_QUEUE,
DATA_GATHERING_QUEUE_PRIORITY_HIGH,
GATHER_ASSET_PROFILE_PROCESS
GATHER_ASSET_PROFILE_PROCESS,
GATHER_ASSET_PROFILE_PROCESS_OPTIONS
} from '@ghostfolio/common/config';
import {
AdminData,
@ -13,7 +12,6 @@ import {
} from '@ghostfolio/common/interfaces';
import { hasPermission, permissions } from '@ghostfolio/common/permissions';
import type { RequestWithUser } from '@ghostfolio/common/types';
import { InjectQueue } from '@nestjs/bull';
import {
Body,
Controller,
@ -29,10 +27,8 @@ import {
import { REQUEST } from '@nestjs/core';
import { AuthGuard } from '@nestjs/passport';
import { DataSource, MarketData } from '@prisma/client';
import { Queue } from 'bull';
import { isDate } from 'date-fns';
import { StatusCodes, getReasonPhrase } from 'http-status-codes';
import ms from 'ms';
import { AdminService } from './admin.service';
import { UpdateMarketDataDto } from './update-market-data.dto';
@ -41,8 +37,6 @@ import { UpdateMarketDataDto } from './update-market-data.dto';
export class AdminController {
public constructor(
private readonly adminService: AdminService,
@InjectQueue(DATA_GATHERING_QUEUE)
private readonly dataGatheringQueue: Queue,
private readonly dataGatheringService: DataGatheringService,
private readonly marketDataService: MarketDataService,
@Inject(REQUEST) private readonly request: RequestWithUser
@ -102,20 +96,13 @@ export class AdminController {
const uniqueAssets = await this.dataGatheringService.getUniqueAssets();
for (const { dataSource, symbol } of uniqueAssets) {
await this.dataGatheringQueue.add(
await this.dataGatheringService.addJobToQueue(
GATHER_ASSET_PROFILE_PROCESS,
{
dataSource,
symbol
},
{
attempts: 20,
backoff: {
delay: ms('1 minute'),
type: 'exponential'
},
priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH
}
GATHER_ASSET_PROFILE_PROCESS_OPTIONS
);
}
@ -140,20 +127,13 @@ export class AdminController {
const uniqueAssets = await this.dataGatheringService.getUniqueAssets();
for (const { dataSource, symbol } of uniqueAssets) {
await this.dataGatheringQueue.add(
await this.dataGatheringService.addJobToQueue(
GATHER_ASSET_PROFILE_PROCESS,
{
dataSource,
symbol
},
{
attempts: 20,
backoff: {
delay: ms('1 minute'),
type: 'exponential'
},
priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH
}
GATHER_ASSET_PROFILE_PROCESS_OPTIONS
);
}
}
@ -176,20 +156,13 @@ export class AdminController {
);
}
await this.dataGatheringQueue.add(
await this.dataGatheringService.addJobToQueue(
GATHER_ASSET_PROFILE_PROCESS,
{
dataSource,
symbol
},
{
attempts: 20,
backoff: {
delay: ms('1 minute'),
type: 'exponential'
},
priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH
}
GATHER_ASSET_PROFILE_PROCESS_OPTIONS
);
}

25
apps/api/src/app/order/order.service.ts

@ -1,17 +1,14 @@
import { AccountService } from '@ghostfolio/api/app/account/account.service';
import { CacheService } from '@ghostfolio/api/app/cache/cache.service';
import { DataGatheringService } from '@ghostfolio/api/services/data-gathering.service';
import { ExchangeRateDataService } from '@ghostfolio/api/services/exchange-rate-data.service';
import { PrismaService } from '@ghostfolio/api/services/prisma.service';
import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile.service';
import {
DATA_GATHERING_QUEUE,
DATA_GATHERING_QUEUE_PRIORITY_HIGH,
GATHER_ASSET_PROFILE_PROCESS
GATHER_ASSET_PROFILE_PROCESS,
GATHER_ASSET_PROFILE_PROCESS_OPTIONS
} from '@ghostfolio/common/config';
import { Filter } from '@ghostfolio/common/interfaces';
import { OrderWithAccount } from '@ghostfolio/common/types';
import { InjectQueue } from '@nestjs/bull';
import { Injectable } from '@nestjs/common';
import {
AssetClass,
@ -22,10 +19,8 @@ import {
Type as TypeOfOrder
} from '@prisma/client';
import Big from 'big.js';
import { Queue } from 'bull';
import { endOfToday, isAfter } from 'date-fns';
import { groupBy } from 'lodash';
import ms from 'ms';
import { v4 as uuidv4 } from 'uuid';
import { Activity } from './interfaces/activities.interface';
@ -34,11 +29,8 @@ import { Activity } from './interfaces/activities.interface';
export class OrderService {
public constructor(
private readonly accountService: AccountService,
private readonly cacheService: CacheService,
@InjectQueue(DATA_GATHERING_QUEUE)
private readonly dataGatheringQueue: Queue,
private readonly exchangeRateDataService: ExchangeRateDataService,
private readonly dataGatheringService: DataGatheringService,
private readonly exchangeRateDataService: ExchangeRateDataService,
private readonly prismaService: PrismaService,
private readonly symbolProfileService: SymbolProfileService
) {}
@ -122,20 +114,13 @@ export class OrderService {
data.SymbolProfile.connectOrCreate.create.symbol.toUpperCase();
}
await this.dataGatheringQueue.add(
await this.dataGatheringService.addJobToQueue(
GATHER_ASSET_PROFILE_PROCESS,
{
dataSource: data.SymbolProfile.connectOrCreate.create.dataSource,
symbol: data.SymbolProfile.connectOrCreate.create.symbol
},
{
attempts: 20,
backoff: {
delay: ms('1 minute'),
type: 'exponential'
},
priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH
}
GATHER_ASSET_PROFILE_PROCESS_OPTIONS
);
const isDraft = isAfter(data.date as Date, endOfToday());

21
apps/api/src/services/cron.service.ts

@ -1,13 +1,9 @@
import {
DATA_GATHERING_QUEUE,
DATA_GATHERING_QUEUE_PRIORITY_HIGH,
GATHER_ASSET_PROFILE_PROCESS
GATHER_ASSET_PROFILE_PROCESS,
GATHER_ASSET_PROFILE_PROCESS_OPTIONS
} from '@ghostfolio/common/config';
import { InjectQueue } from '@nestjs/bull';
import { Injectable } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { Queue } from 'bull';
import ms from 'ms';
import { DataGatheringService } from './data-gathering.service';
import { ExchangeRateDataService } from './exchange-rate-data.service';
@ -16,8 +12,6 @@ import { TwitterBotService } from './twitter-bot/twitter-bot.service';
@Injectable()
export class CronService {
public constructor(
@InjectQueue(DATA_GATHERING_QUEUE)
private readonly dataGatheringQueue: Queue,
private readonly dataGatheringService: DataGatheringService,
private readonly exchangeRateDataService: ExchangeRateDataService,
private readonly twitterBotService: TwitterBotService
@ -43,20 +37,13 @@ export class CronService {
const uniqueAssets = await this.dataGatheringService.getUniqueAssets();
for (const { dataSource, symbol } of uniqueAssets) {
await this.dataGatheringQueue.add(
await this.dataGatheringService.addJobToQueue(
GATHER_ASSET_PROFILE_PROCESS,
{
dataSource,
symbol
},
{
attempts: 20,
backoff: {
delay: ms('1 minute'),
type: 'exponential'
},
priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH
}
GATHER_ASSET_PROFILE_PROCESS_OPTIONS
);
}
}

2
apps/api/src/services/data-gathering.module.ts

@ -16,7 +16,7 @@ import { SymbolProfileModule } from './symbol-profile.module';
imports: [
BullModule.registerQueue({
limiter: {
duration: ms('1 second'),
duration: ms('5 seconds'),
max: 1
},
name: DATA_GATHERING_QUEUE

13
apps/api/src/services/data-gathering.processor.ts

@ -99,14 +99,6 @@ export class DataGatheringProcessor {
}
});
} catch {}
} else {
Logger.warn(
`Failed to gather data for symbol ${symbol} from ${dataSource} at ${format(
currentDate,
DATE_FORMAT
)}.`,
`DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS})`
);
}
// Count month one up for iteration
@ -119,6 +111,11 @@ export class DataGatheringProcessor {
)
);
}
Logger.log(
`Historical market data gathering has been completed for ${symbol} (${dataSource}).`,
`DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS})`
);
} catch (error) {
Logger.error(
error,

57
apps/api/src/services/data-gathering.service.ts

@ -1,17 +1,17 @@
import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile.service';
import {
DATA_GATHERING_QUEUE,
DATA_GATHERING_QUEUE_PRIORITY_LOW,
GATHER_HISTORICAL_MARKET_DATA_PROCESS
GATHER_HISTORICAL_MARKET_DATA_PROCESS,
GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS,
QUEUE_JOB_STATUS_LIST
} from '@ghostfolio/common/config';
import { DATE_FORMAT, resetHours } from '@ghostfolio/common/helper';
import { UniqueAsset } from '@ghostfolio/common/interfaces';
import { InjectQueue } from '@nestjs/bull';
import { Inject, Injectable, Logger } from '@nestjs/common';
import { DataSource } from '@prisma/client';
import { Queue } from 'bull';
import { JobOptions, Queue } from 'bull';
import { format, subDays } from 'date-fns';
import ms from 'ms';
import { DataProviderService } from './data-provider/data-provider.service';
import { DataEnhancerInterface } from './data-provider/interfaces/data-enhancer.interface';
@ -21,8 +21,6 @@ import { PrismaService } from './prisma.service';
@Injectable()
export class DataGatheringService {
private dataGatheringProgress: number;
public constructor(
@Inject('DataEnhancers')
private readonly dataEnhancers: DataEnhancerInterface[],
@ -34,6 +32,19 @@ export class DataGatheringService {
private readonly symbolProfileService: SymbolProfileService
) {}
public async addJobToQueue(name: string, data: any, options?: JobOptions) {
const hasJob = await this.hasJob(name, data);
if (hasJob) {
Logger.log(
`Job ${name} with data ${JSON.stringify(data)} already exists.`,
'DataGatheringService'
);
} else {
return this.dataGatheringQueue.add(name, data, options);
}
}
public async gather7Days() {
const dataGatheringItems = await this.getSymbols7D();
await this.gatherSymbols(dataGatheringItems);
@ -101,15 +112,6 @@ export class DataGatheringService {
uniqueAssets = await this.getUniqueAssets();
}
Logger.log(
`Asset profile data gathering has been started for ${uniqueAssets
.map(({ dataSource, symbol }) => {
return `${symbol} (${dataSource})`;
})
.join(',')}.`,
'DataGatheringService'
);
const assetProfiles = await this.dataProviderService.getAssetProfiles(
uniqueAssets
);
@ -205,21 +207,14 @@ export class DataGatheringService {
continue;
}
await this.dataGatheringQueue.add(
await this.addJobToQueue(
GATHER_HISTORICAL_MARKET_DATA_PROCESS,
{
dataSource,
date,
symbol
},
{
attempts: 20,
backoff: {
delay: ms('1 minute'),
type: 'exponential'
},
priority: DATA_GATHERING_QUEUE_PRIORITY_LOW
}
GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS
);
}
}
@ -354,4 +349,18 @@ export class DataGatheringService {
return [...currencyPairsToGather, ...symbolProfilesToGather];
}
private async hasJob(name: string, data: any) {
const jobs = await this.dataGatheringQueue.getJobs(
QUEUE_JOB_STATUS_LIST.filter((status) => {
return status !== 'completed';
})
);
return jobs.some((job) => {
return (
job.name === name && JSON.stringify(job.data) === JSON.stringify(data)
);
});
}
}

11
apps/api/src/services/data-provider/alpha-vantage/alpha-vantage.service.ts

@ -9,7 +9,7 @@ import { DATE_FORMAT } from '@ghostfolio/common/helper';
import { Granularity } from '@ghostfolio/common/types';
import { Injectable, Logger } from '@nestjs/common';
import { DataSource, SymbolProfile } from '@prisma/client';
import { isAfter, isBefore, parse } from 'date-fns';
import { format, isAfter, isBefore, parse } from 'date-fns';
import { IAlphaVantageHistoricalResponse } from './interfaces/interfaces';
@ -76,9 +76,12 @@ export class AlphaVantageService implements DataProviderInterface {
return response;
} catch (error) {
Logger.error(error, 'AlphaVantageService');
return {};
throw new Error(
`Could not get historical market data for ${aSymbol} (${this.getName()}) from ${format(
from,
DATE_FORMAT
)} to ${format(to, DATE_FORMAT)}: [${error.name}] ${error.message}`
);
}
}

9
apps/api/src/services/data-provider/eod-historical-data/eod-historical-data.service.ts

@ -72,10 +72,13 @@ export class EodHistoricalDataService implements DataProviderInterface {
{ [aSymbol]: {} }
);
} catch (error) {
Logger.error(error, 'EodHistoricalDataService');
throw new Error(
`Could not get historical market data for ${aSymbol} (${this.getName()}) from ${format(
from,
DATE_FORMAT
)} to ${format(to, DATE_FORMAT)}: [${error.name}] ${error.message}`
);
}
return {};
}
public getName(): DataSource {

9
apps/api/src/services/data-provider/ghostfolio-scraper-api/ghostfolio-scraper-api.service.ts

@ -87,10 +87,13 @@ export class GhostfolioScraperApiService implements DataProviderInterface {
}
};
} catch (error) {
Logger.error(error, 'GhostfolioScraperApiService');
throw new Error(
`Could not get historical market data for ${aSymbol} (${this.getName()}) from ${format(
from,
DATE_FORMAT
)} to ${format(to, DATE_FORMAT)}: [${error.name}] ${error.message}`
);
}
return {};
}
public getName(): DataSource {

9
apps/api/src/services/data-provider/google-sheets/google-sheets.service.ts

@ -71,10 +71,13 @@ export class GoogleSheetsService implements DataProviderInterface {
[symbol]: historicalData
};
} catch (error) {
Logger.error(error, 'GoogleSheetsService');
throw new Error(
`Could not get historical market data for ${aSymbol} (${this.getName()}) from ${format(
from,
DATE_FORMAT
)} to ${format(to, DATE_FORMAT)}: [${error.name}] ${error.message}`
);
}
return {};
}
public getName(): DataSource {

9
apps/api/src/services/data-provider/rakuten-rapid-api/rakuten-rapid-api.service.ts

@ -90,7 +90,14 @@ export class RakutenRapidApiService implements DataProviderInterface {
}
};
}
} catch (error) {}
} catch (error) {
throw new Error(
`Could not get historical market data for ${aSymbol} (${this.getName()}) from ${format(
from,
DATE_FORMAT
)} to ${format(to, DATE_FORMAT)}: [${error.name}] ${error.message}`
);
}
return {};
}

18
apps/api/src/services/data-provider/yahoo-finance/yahoo-finance.service.ts

@ -131,7 +131,13 @@ export class YahooFinanceService implements DataProviderInterface {
if (url) {
response.url = url;
}
} catch {}
} catch (error) {
throw new Error(
`Could not get asset profile for ${aSymbol} (${this.getName()}): [${
error.name
}] ${error.message}`
);
}
return response;
}
@ -185,12 +191,12 @@ export class YahooFinanceService implements DataProviderInterface {
return response;
} catch (error) {
Logger.warn(
`Skipping yahooFinance2.getHistorical("${aSymbol}"): [${error.name}] ${error.message}`,
'YahooFinanceService'
throw new Error(
`Could not get historical market data for ${aSymbol} (${this.getName()}) from ${format(
from,
DATE_FORMAT
)} to ${format(to, DATE_FORMAT)}: [${error.name}] ${error.message}`
);
return {};
}
}

5
apps/client/src/app/components/admin-jobs/admin-jobs.component.ts

@ -80,11 +80,12 @@ export class AdminJobsComponent implements OnDestroy, OnInit {
}
public onDeleteJobs() {
const currentFilter = this.filterForm.get('status').value;
this.adminService
.deleteJobs({})
.deleteJobs({ status: currentFilter ? [currentFilter] : undefined })
.pipe(takeUntil(this.unsubscribeSubject))
.subscribe(() => {
const currentFilter = this.filterForm.get('status').value;
this.fetchJobs(currentFilter ? [currentFilter] : undefined);
});
}

3
apps/client/src/app/components/admin-jobs/admin-jobs.html

@ -79,6 +79,7 @@
<ion-icon
*ngIf="job.state === 'delayed'"
name="time-outline"
[ngClass]="{ 'text-danger': job.stacktrace?.length > 0 }"
></ion-icon>
<ion-icon
*ngIf="job.state === 'failed'"
@ -107,7 +108,7 @@
<button
i18n
mat-menu-item
[disabled]="job.stacktrace?.length < 1"
[disabled]="job.stacktrace?.length <= 0"
(click)="onViewStacktrace(job.stacktrace)"
>
View Stacktrace

2
apps/client/src/app/components/admin-overview/admin-overview.html

@ -30,7 +30,7 @@
class="mr-1"
name="cloud-download-outline"
></ion-icon>
<span i18n>Start Data Gathering</span>
<span i18n>Gather Recent Data</span>
</button>
</div>
<div class="mb-2">

2
apps/client/src/app/services/admin.service.ts

@ -23,7 +23,7 @@ export class AdminService {
return this.http.delete<void>(`/api/v1/admin/queue/job/${aId}`);
}
public deleteJobs({ status }: { status?: JobStatus[] }) {
public deleteJobs({ status }: { status: JobStatus[] }) {
let params = new HttpParams();
if (status?.length > 0) {

25
libs/common/src/lib/config.ts

@ -1,5 +1,6 @@
import { DataSource } from '@prisma/client';
import { JobStatus } from 'bull';
import { JobOptions, JobStatus } from 'bull';
import ms from 'ms';
import { ToggleOption } from './types';
@ -50,8 +51,30 @@ export const DATA_GATHERING_QUEUE_PRIORITY_HIGH = 1;
export const DEFAULT_DATE_FORMAT_MONTH_YEAR = 'MMM yyyy';
export const GATHER_ASSET_PROFILE_PROCESS = 'GATHER_ASSET_PROFILE';
export const GATHER_ASSET_PROFILE_PROCESS_OPTIONS: JobOptions = {
attempts: 20,
backoff: {
delay: ms('1 minute'),
type: 'exponential'
},
priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH,
removeOnComplete: {
age: ms('2 weeks') / 1000
}
};
export const GATHER_HISTORICAL_MARKET_DATA_PROCESS =
'GATHER_HISTORICAL_MARKET_DATA';
export const GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS: JobOptions = {
attempts: 20,
backoff: {
delay: ms('1 minute'),
type: 'exponential'
},
priority: DATA_GATHERING_QUEUE_PRIORITY_LOW,
removeOnComplete: {
age: ms('2 weeks') / 1000
}
};
export const PROPERTY_BENCHMARKS = 'BENCHMARKS';
export const PROPERTY_COUPONS = 'COUPONS';

Loading…
Cancel
Save