From 98db33553c1916fcc658eb774474c4867ca522a8 Mon Sep 17 00:00:00 2001 From: Dan Date: Wed, 15 Nov 2023 16:31:33 +0100 Subject: [PATCH] Added Market Data upsert locks --- .../data-gathering/data-gathering.service.ts | 22 +++--- .../market-data/market-data.service.ts | 73 +++++++++++-------- package.json | 1 + 3 files changed, 56 insertions(+), 40 deletions(-) diff --git a/apps/api/src/services/data-gathering/data-gathering.service.ts b/apps/api/src/services/data-gathering/data-gathering.service.ts index 78531b745..2f48394c4 100644 --- a/apps/api/src/services/data-gathering/data-gathering.service.ts +++ b/apps/api/src/services/data-gathering/data-gathering.service.ts @@ -24,6 +24,7 @@ import { DataSource } from '@prisma/client'; import { JobOptions, Queue } from 'bull'; import { format, min, subDays, subYears } from 'date-fns'; import { isEmpty } from 'lodash'; +import { Lock } from 'async-lock'; @Injectable() export class DataGatheringService { @@ -89,6 +90,7 @@ export class DataGatheringService { date: Date; symbol: string; }) { + const lock = new Lock(); try { const historicalData = await this.dataProviderService.getHistoricalRaw( [{ dataSource, symbol }], @@ -100,15 +102,17 @@ export class DataGatheringService { historicalData[symbol][format(date, DATE_FORMAT)].marketPrice; if (marketPrice) { - return await this.prismaService.marketData.upsert({ - create: { - dataSource, - date, - marketPrice, - symbol - }, - update: { marketPrice }, - where: { dataSource_date_symbol: { dataSource, date, symbol } } + return await lock.acquire('marketData', async function () { + return await this.prismaService.marketData.upsert({ + create: { + dataSource, + date, + marketPrice, + symbol + }, + update: { marketPrice }, + where: { dataSource_date_symbol: { dataSource, date, symbol } } + }); }); } } catch (error) { diff --git a/apps/api/src/services/market-data/market-data.service.ts b/apps/api/src/services/market-data/market-data.service.ts index 85a62837b..716bfd236 100644 --- a/apps/api/src/services/market-data/market-data.service.ts +++ b/apps/api/src/services/market-data/market-data.service.ts @@ -13,11 +13,14 @@ import { Prisma } from '@prisma/client'; import { DateQueryHelper } from '@ghostfolio/api/helper/dateQueryHelper'; +import AwaitLock from 'await-lock'; @Injectable() export class MarketDataService { public constructor(private readonly prismaService: PrismaService) {} + lock = new AwaitLock(); + private dateQueryHelper = new DateQueryHelper(); public async deleteMany({ dataSource, symbol }: UniqueAsset) { @@ -129,18 +132,22 @@ export class MarketDataService { where: Prisma.MarketDataWhereUniqueInput; }): Promise { const { data, where } = params; - - return this.prismaService.marketData.upsert({ - where, - create: { - dataSource: where.dataSource_date_symbol.dataSource, - date: where.dataSource_date_symbol.date, - marketPrice: data.marketPrice, - state: data.state, - symbol: where.dataSource_date_symbol.symbol - }, - update: { marketPrice: data.marketPrice, state: data.state } - }); + await this.lock.acquireAsync(); + try { + return this.prismaService.marketData.upsert({ + where, + create: { + dataSource: where.dataSource_date_symbol.dataSource, + date: where.dataSource_date_symbol.date, + marketPrice: data.marketPrice, + state: data.state, + symbol: where.dataSource_date_symbol.symbol + }, + update: { marketPrice: data.marketPrice, state: data.state } + }); + } finally { + this.lock.release(); + } } /** @@ -153,30 +160,34 @@ export class MarketDataService { data: Prisma.MarketDataUpdateInput[]; }): Promise { const upsertPromises = data.map( - ({ dataSource, date, marketPrice, symbol, state }) => { - return this.prismaService.marketData.upsert({ - create: { - dataSource: dataSource, - date: date, - marketPrice: marketPrice, - state: state, - symbol: symbol - }, - update: { - marketPrice: marketPrice, - state: state - }, - where: { - dataSource_date_symbol: { + async ({ dataSource, date, marketPrice, symbol, state }) => { + await this.lock.acquireAsync(); + try { + return this.prismaService.marketData.upsert({ + create: { dataSource: dataSource, date: date, + marketPrice: marketPrice, + state: state, symbol: symbol + }, + update: { + marketPrice: marketPrice, + state: state + }, + where: { + dataSource_date_symbol: { + dataSource: dataSource, + date: date, + symbol: symbol + } } - } - }); + }); + } finally { + this.lock.release(); + } } ); - - return this.prismaService.$transaction(upsertPromises); + return await Promise.all(upsertPromises); } } diff --git a/package.json b/package.json index a7d7cd90b..ac3223a23 100644 --- a/package.json +++ b/package.json @@ -86,6 +86,7 @@ "@simplewebauthn/server": "8.3.2", "@stripe/stripe-js": "1.47.0", "alphavantage": "2.2.0", + "await-lock": "^2.2.2", "big.js": "6.2.1", "body-parser": "1.20.1", "bootstrap": "4.6.0",