diff --git a/apps/api/src/services/data-gathering/data-gathering.service.ts b/apps/api/src/services/data-gathering/data-gathering.service.ts index 78531b745..213f91692 100644 --- a/apps/api/src/services/data-gathering/data-gathering.service.ts +++ b/apps/api/src/services/data-gathering/data-gathering.service.ts @@ -24,6 +24,7 @@ import { DataSource } from '@prisma/client'; import { JobOptions, Queue } from 'bull'; import { format, min, subDays, subYears } from 'date-fns'; import { isEmpty } from 'lodash'; +import AwaitLock from 'await-lock'; @Injectable() export class DataGatheringService { @@ -40,6 +41,8 @@ export class DataGatheringService { private readonly symbolProfileService: SymbolProfileService ) {} + lock = new AwaitLock(); + public async addJobToQueue({ data, name, @@ -100,16 +103,21 @@ export class DataGatheringService { historicalData[symbol][format(date, DATE_FORMAT)].marketPrice; if (marketPrice) { - return await this.prismaService.marketData.upsert({ - create: { - dataSource, - date, - marketPrice, - symbol - }, - update: { marketPrice }, - where: { dataSource_date_symbol: { dataSource, date, symbol } } - }); + await this.lock.acquireAsync(); + try { + return await this.prismaService.marketData.upsert({ + create: { + dataSource, + date, + marketPrice, + symbol + }, + update: { marketPrice }, + where: { dataSource_date_symbol: { dataSource, date, symbol } } + }); + } finally { + this.lock.release(); + } } } catch (error) { Logger.error(error, 'DataGatheringService'); diff --git a/apps/api/src/services/market-data/market-data.service.ts b/apps/api/src/services/market-data/market-data.service.ts index 85a62837b..716bfd236 100644 --- a/apps/api/src/services/market-data/market-data.service.ts +++ b/apps/api/src/services/market-data/market-data.service.ts @@ -13,11 +13,14 @@ import { Prisma } from '@prisma/client'; import { DateQueryHelper } from '@ghostfolio/api/helper/dateQueryHelper'; +import AwaitLock from 'await-lock'; @Injectable() export class MarketDataService { public constructor(private readonly prismaService: PrismaService) {} + lock = new AwaitLock(); + private dateQueryHelper = new DateQueryHelper(); public async deleteMany({ dataSource, symbol }: UniqueAsset) { @@ -129,18 +132,22 @@ export class MarketDataService { where: Prisma.MarketDataWhereUniqueInput; }): Promise { const { data, where } = params; - - return this.prismaService.marketData.upsert({ - where, - create: { - dataSource: where.dataSource_date_symbol.dataSource, - date: where.dataSource_date_symbol.date, - marketPrice: data.marketPrice, - state: data.state, - symbol: where.dataSource_date_symbol.symbol - }, - update: { marketPrice: data.marketPrice, state: data.state } - }); + await this.lock.acquireAsync(); + try { + return this.prismaService.marketData.upsert({ + where, + create: { + dataSource: where.dataSource_date_symbol.dataSource, + date: where.dataSource_date_symbol.date, + marketPrice: data.marketPrice, + state: data.state, + symbol: where.dataSource_date_symbol.symbol + }, + update: { marketPrice: data.marketPrice, state: data.state } + }); + } finally { + this.lock.release(); + } } /** @@ -153,30 +160,34 @@ export class MarketDataService { data: Prisma.MarketDataUpdateInput[]; }): Promise { const upsertPromises = data.map( - ({ dataSource, date, marketPrice, symbol, state }) => { - return this.prismaService.marketData.upsert({ - create: { - dataSource: dataSource, - date: date, - marketPrice: marketPrice, - state: state, - symbol: symbol - }, - update: { - marketPrice: marketPrice, - state: state - }, - where: { - dataSource_date_symbol: { + async ({ dataSource, date, marketPrice, symbol, state }) => { + await this.lock.acquireAsync(); + try { + return this.prismaService.marketData.upsert({ + create: { dataSource: dataSource, date: date, + marketPrice: marketPrice, + state: state, symbol: symbol + }, + update: { + marketPrice: marketPrice, + state: state + }, + where: { + dataSource_date_symbol: { + dataSource: dataSource, + date: date, + symbol: symbol + } } - } - }); + }); + } finally { + this.lock.release(); + } } ); - - return this.prismaService.$transaction(upsertPromises); + return await Promise.all(upsertPromises); } } diff --git a/package.json b/package.json index a7d7cd90b..ac3223a23 100644 --- a/package.json +++ b/package.json @@ -86,6 +86,7 @@ "@simplewebauthn/server": "8.3.2", "@stripe/stripe-js": "1.47.0", "alphavantage": "2.2.0", + "await-lock": "^2.2.2", "big.js": "6.2.1", "body-parser": "1.20.1", "bootstrap": "4.6.0", diff --git a/yarn.lock b/yarn.lock index 6e22a8d97..7048cedb8 100644 --- a/yarn.lock +++ b/yarn.lock @@ -7366,6 +7366,11 @@ available-typed-arrays@^1.0.5: resolved "https://registry.yarnpkg.com/available-typed-arrays/-/available-typed-arrays-1.0.5.tgz#92f95616501069d07d10edb2fc37d3e1c65123b7" integrity sha512-DMD0KiN46eipeziST1LPP/STfDU0sufISXmjSgvVsoU2tqxctQeASejWcfNtxYKqETM1UxQ8sp2OrSBWpHY6sw== +await-lock@^2.2.2: + version "2.2.2" + resolved "https://registry.yarnpkg.com/await-lock/-/await-lock-2.2.2.tgz#a95a9b269bfd2f69d22b17a321686f551152bcef" + integrity sha512-aDczADvlvTGajTDjcjpJMqRkOF6Qdz3YbPZm/PyW6tKPkx2hlYBzxMhEywM/tU72HrVZjgl5VCdRuMlA7pZ8Gw== + aws-sign2@~0.7.0: version "0.7.0" resolved "https://registry.yarnpkg.com/aws-sign2/-/aws-sign2-0.7.0.tgz#b46e890934a9591f2d2f6f86d7e6a9f1b3fe76a8" @@ -19214,4 +19219,4 @@ zone.js@0.13.1: zone.js@~0.10.3: version "0.10.3" resolved "https://registry.yarnpkg.com/zone.js/-/zone.js-0.10.3.tgz#3e5e4da03c607c9dcd92e37dd35687a14a140c16" - integrity sha512-LXVLVEq0NNOqK/fLJo3d0kfzd4sxwn2/h67/02pjCjfKDxgx1i9QqpvtHD8CrBnSSwMw5+dy11O7FRX5mkO7Cg== \ No newline at end of file + integrity sha512-LXVLVEq0NNOqK/fLJo3d0kfzd4sxwn2/h67/02pjCjfKDxgx1i9QqpvtHD8CrBnSSwMw5+dy11O7FRX5mkO7Cg==