|
@ -24,7 +24,7 @@ import { DataSource } from '@prisma/client'; |
|
|
import { JobOptions, Queue } from 'bull'; |
|
|
import { JobOptions, Queue } from 'bull'; |
|
|
import { format, min, subDays, subYears } from 'date-fns'; |
|
|
import { format, min, subDays, subYears } from 'date-fns'; |
|
|
import { isEmpty } from 'lodash'; |
|
|
import { isEmpty } from 'lodash'; |
|
|
import { Lock } from 'async-lock'; |
|
|
import AwaitLock from 'await-lock'; |
|
|
|
|
|
|
|
|
@Injectable() |
|
|
@Injectable() |
|
|
export class DataGatheringService { |
|
|
export class DataGatheringService { |
|
@ -41,6 +41,8 @@ export class DataGatheringService { |
|
|
private readonly symbolProfileService: SymbolProfileService |
|
|
private readonly symbolProfileService: SymbolProfileService |
|
|
) {} |
|
|
) {} |
|
|
|
|
|
|
|
|
|
|
|
lock = new AwaitLock(); |
|
|
|
|
|
|
|
|
public async addJobToQueue({ |
|
|
public async addJobToQueue({ |
|
|
data, |
|
|
data, |
|
|
name, |
|
|
name, |
|
@ -90,7 +92,6 @@ export class DataGatheringService { |
|
|
date: Date; |
|
|
date: Date; |
|
|
symbol: string; |
|
|
symbol: string; |
|
|
}) { |
|
|
}) { |
|
|
const lock = new Lock(); |
|
|
|
|
|
try { |
|
|
try { |
|
|
const historicalData = await this.dataProviderService.getHistoricalRaw( |
|
|
const historicalData = await this.dataProviderService.getHistoricalRaw( |
|
|
[{ dataSource, symbol }], |
|
|
[{ dataSource, symbol }], |
|
@ -102,7 +103,8 @@ export class DataGatheringService { |
|
|
historicalData[symbol][format(date, DATE_FORMAT)].marketPrice; |
|
|
historicalData[symbol][format(date, DATE_FORMAT)].marketPrice; |
|
|
|
|
|
|
|
|
if (marketPrice) { |
|
|
if (marketPrice) { |
|
|
return await lock.acquire('marketData', async function () { |
|
|
await this.lock.acquireAsync(); |
|
|
|
|
|
try { |
|
|
return await this.prismaService.marketData.upsert({ |
|
|
return await this.prismaService.marketData.upsert({ |
|
|
create: { |
|
|
create: { |
|
|
dataSource, |
|
|
dataSource, |
|
@ -113,7 +115,9 @@ export class DataGatheringService { |
|
|
update: { marketPrice }, |
|
|
update: { marketPrice }, |
|
|
where: { dataSource_date_symbol: { dataSource, date, symbol } } |
|
|
where: { dataSource_date_symbol: { dataSource, date, symbol } } |
|
|
}); |
|
|
}); |
|
|
}); |
|
|
} finally { |
|
|
|
|
|
this.lock.release(); |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
} catch (error) { |
|
|
} catch (error) { |
|
|
Logger.error(error, 'DataGatheringService'); |
|
|
Logger.error(error, 'DataGatheringService'); |
|
|