Browse Source

Feature/let data gathering queue jobs fail by throwing errors (#3281)

* Let data gathering queue jobs fail by throwing errors

* Update changelog
pull/3286/head^2
Thomas Kaul 9 months ago
committed by GitHub
parent
commit
15857118fe
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 2
      CHANGELOG.md
  2. 18
      apps/api/src/app/portfolio/portfolio.service.ts
  3. 20
      apps/api/src/app/symbol/symbol.service.ts
  4. 20
      apps/api/src/services/data-gathering/data-gathering.processor.ts
  5. 23
      apps/api/src/services/data-gathering/data-gathering.service.ts
  6. 18
      apps/api/src/services/data-provider/data-provider.service.ts

2
CHANGELOG.md

@ -19,6 +19,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Moved the interest calculations into the portfolio calculator
- Moved the liability calculations into the portfolio calculator
- Moved the (wealth) item calculations into the portfolio calculator
- Let queue jobs for asset profile data gathering fail by throwing an error
- Let queue jobs for historical market data gathering fail by throwing an error
## 2.72.0 - 2024-04-13

18
apps/api/src/app/portfolio/portfolio.service.ts

@ -822,11 +822,19 @@ export class PortfolioService {
);
if (isEmpty(historicalData)) {
historicalData = await this.dataProviderService.getHistoricalRaw(
[{ dataSource: DataSource.YAHOO, symbol: aSymbol }],
portfolioStart,
new Date()
);
try {
historicalData = await this.dataProviderService.getHistoricalRaw({
dataGatheringItems: [
{ dataSource: DataSource.YAHOO, symbol: aSymbol }
],
from: portfolioStart,
to: new Date()
});
} catch {
historicalData = {
[aSymbol]: {}
};
}
}
const historicalDataArray: HistoricalDataItem[] = [];

20
apps/api/src/app/symbol/symbol.service.ts

@ -74,11 +74,21 @@ export class SymbolService {
date = new Date(),
symbol
}: IDataGatheringItem): Promise<IDataProviderHistoricalResponse> {
const historicalData = await this.dataProviderService.getHistoricalRaw(
[{ dataSource, symbol }],
date,
date
);
let historicalData: {
[symbol: string]: {
[date: string]: IDataProviderHistoricalResponse;
};
} = {
[symbol]: {}
};
try {
historicalData = await this.dataProviderService.getHistoricalRaw({
dataGatheringItems: [{ dataSource, symbol }],
from: date,
to: date
});
} catch {}
return {
marketPrice:

20
apps/api/src/services/data-gathering/data-gathering.processor.ts

@ -37,7 +37,17 @@ export class DataGatheringProcessor {
@Process({ concurrency: 1, name: GATHER_ASSET_PROFILE_PROCESS })
public async gatherAssetProfile(job: Job<UniqueAsset>) {
try {
Logger.log(
`Asset profile data gathering has been started for ${job.data.symbol} (${job.data.dataSource})`,
`DataGatheringProcessor (${GATHER_ASSET_PROFILE_PROCESS})`
);
await this.dataGatheringService.gatherAssetProfiles([job.data]);
Logger.log(
`Asset profile data gathering has been completed for ${job.data.symbol} (${job.data.dataSource})`,
`DataGatheringProcessor (${GATHER_ASSET_PROFILE_PROCESS})`
);
} catch (error) {
Logger.error(
error,
@ -62,11 +72,11 @@ export class DataGatheringProcessor {
`DataGatheringProcessor (${GATHER_HISTORICAL_MARKET_DATA_PROCESS})`
);
const historicalData = await this.dataProviderService.getHistoricalRaw(
[{ dataSource, symbol }],
currentDate,
new Date()
);
const historicalData = await this.dataProviderService.getHistoricalRaw({
dataGatheringItems: [{ dataSource, symbol }],
from: currentDate,
to: new Date()
});
const data: Prisma.MarketDataUpdateInput[] = [];
let lastMarketPrice: number;

23
apps/api/src/services/data-gathering/data-gathering.service.ts

@ -104,11 +104,11 @@ export class DataGatheringService {
symbol: string;
}) {
try {
const historicalData = await this.dataProviderService.getHistoricalRaw(
[{ dataSource, symbol }],
date,
date
);
const historicalData = await this.dataProviderService.getHistoricalRaw({
dataGatheringItems: [{ dataSource, symbol }],
from: date,
to: date
});
const marketPrice =
historicalData[symbol][format(date, DATE_FORMAT)].marketPrice;
@ -230,17 +230,12 @@ export class DataGatheringService {
error,
'DataGatheringService'
);
if (uniqueAssets.length === 1) {
throw error;
}
}
}
Logger.log(
`Asset profile data gathering has been completed for ${uniqueAssets
.map(({ dataSource, symbol }) => {
return `${symbol} (${dataSource})`;
})
.join(',')}.`,
'DataGatheringService'
);
}
public async gatherSymbols({

18
apps/api/src/services/data-provider/data-provider.service.ts

@ -233,15 +233,17 @@ export class DataProviderService {
}
}
public async getHistoricalRaw(
aDataGatheringItems: UniqueAsset[],
from: Date,
to: Date
): Promise<{
public async getHistoricalRaw({
dataGatheringItems,
from,
to
}: {
dataGatheringItems: UniqueAsset[];
from: Date;
to: Date;
}): Promise<{
[symbol: string]: { [date: string]: IDataProviderHistoricalResponse };
}> {
let dataGatheringItems = aDataGatheringItems;
for (const { currency, rootCurrency } of DERIVED_CURRENCIES) {
if (
this.hasCurrency({
@ -330,6 +332,8 @@ export class DataProviderService {
}
} catch (error) {
Logger.error(error, 'DataProviderService');
throw error;
}
return result;

Loading…
Cancel
Save