diff --git a/apps/api/src/services/data-gathering/data-gathering.service.ts b/apps/api/src/services/data-gathering/data-gathering.service.ts index 2f48394c4..213f91692 100644 --- a/apps/api/src/services/data-gathering/data-gathering.service.ts +++ b/apps/api/src/services/data-gathering/data-gathering.service.ts @@ -24,7 +24,7 @@ import { DataSource } from '@prisma/client'; import { JobOptions, Queue } from 'bull'; import { format, min, subDays, subYears } from 'date-fns'; import { isEmpty } from 'lodash'; -import { Lock } from 'async-lock'; +import AwaitLock from 'await-lock'; @Injectable() export class DataGatheringService { @@ -41,6 +41,8 @@ export class DataGatheringService { private readonly symbolProfileService: SymbolProfileService ) {} + lock = new AwaitLock(); + public async addJobToQueue({ data, name, @@ -90,7 +92,6 @@ export class DataGatheringService { date: Date; symbol: string; }) { - const lock = new Lock(); try { const historicalData = await this.dataProviderService.getHistoricalRaw( [{ dataSource, symbol }], @@ -102,7 +103,8 @@ export class DataGatheringService { historicalData[symbol][format(date, DATE_FORMAT)].marketPrice; if (marketPrice) { - return await lock.acquire('marketData', async function () { + await this.lock.acquireAsync(); + try { return await this.prismaService.marketData.upsert({ create: { dataSource, @@ -113,7 +115,9 @@ export class DataGatheringService { update: { marketPrice }, where: { dataSource_date_symbol: { dataSource, date, symbol } } }); - }); + } finally { + this.lock.release(); + } } } catch (error) { Logger.error(error, 'DataGatheringService');