Browse Source

Data Gathering fix

pull/5027/head
Dan 2 years ago
parent
commit
375e305c66
  1. 12
      apps/api/src/services/data-gathering/data-gathering.service.ts

12
apps/api/src/services/data-gathering/data-gathering.service.ts

@ -24,7 +24,7 @@ import { DataSource } from '@prisma/client';
import { JobOptions, Queue } from 'bull'; import { JobOptions, Queue } from 'bull';
import { format, min, subDays, subYears } from 'date-fns'; import { format, min, subDays, subYears } from 'date-fns';
import { isEmpty } from 'lodash'; import { isEmpty } from 'lodash';
import { Lock } from 'async-lock'; import AwaitLock from 'await-lock';
@Injectable() @Injectable()
export class DataGatheringService { export class DataGatheringService {
@ -41,6 +41,8 @@ export class DataGatheringService {
private readonly symbolProfileService: SymbolProfileService private readonly symbolProfileService: SymbolProfileService
) {} ) {}
lock = new AwaitLock();
public async addJobToQueue({ public async addJobToQueue({
data, data,
name, name,
@ -90,7 +92,6 @@ export class DataGatheringService {
date: Date; date: Date;
symbol: string; symbol: string;
}) { }) {
const lock = new Lock();
try { try {
const historicalData = await this.dataProviderService.getHistoricalRaw( const historicalData = await this.dataProviderService.getHistoricalRaw(
[{ dataSource, symbol }], [{ dataSource, symbol }],
@ -102,7 +103,8 @@ export class DataGatheringService {
historicalData[symbol][format(date, DATE_FORMAT)].marketPrice; historicalData[symbol][format(date, DATE_FORMAT)].marketPrice;
if (marketPrice) { if (marketPrice) {
return await lock.acquire('marketData', async function () { await this.lock.acquireAsync();
try {
return await this.prismaService.marketData.upsert({ return await this.prismaService.marketData.upsert({
create: { create: {
dataSource, dataSource,
@ -113,7 +115,9 @@ export class DataGatheringService {
update: { marketPrice }, update: { marketPrice },
where: { dataSource_date_symbol: { dataSource, date, symbol } } where: { dataSource_date_symbol: { dataSource, date, symbol } }
}); });
}); } finally {
this.lock.release();
}
} }
} catch (error) { } catch (error) {
Logger.error(error, 'DataGatheringService'); Logger.error(error, 'DataGatheringService');

Loading…
Cancel
Save