From 3a1d5c1aafe28fab94276f7d6b0d759e5b6ace70 Mon Sep 17 00:00:00 2001 From: Dan Date: Wed, 15 Nov 2023 16:31:33 +0100 Subject: [PATCH 1/2] Added Market Data upsert locks --- .../data-gathering/data-gathering.service.ts | 22 +++--- .../market-data/market-data.service.ts | 72 +++++++++++-------- package.json | 1 + yarn.lock | 5 ++ 4 files changed, 61 insertions(+), 39 deletions(-) diff --git a/apps/api/src/services/data-gathering/data-gathering.service.ts b/apps/api/src/services/data-gathering/data-gathering.service.ts index 78531b745..2f48394c4 100644 --- a/apps/api/src/services/data-gathering/data-gathering.service.ts +++ b/apps/api/src/services/data-gathering/data-gathering.service.ts @@ -24,6 +24,7 @@ import { DataSource } from '@prisma/client'; import { JobOptions, Queue } from 'bull'; import { format, min, subDays, subYears } from 'date-fns'; import { isEmpty } from 'lodash'; +import { Lock } from 'async-lock'; @Injectable() export class DataGatheringService { @@ -89,6 +90,7 @@ export class DataGatheringService { date: Date; symbol: string; }) { + const lock = new Lock(); try { const historicalData = await this.dataProviderService.getHistoricalRaw( [{ dataSource, symbol }], @@ -100,15 +102,17 @@ export class DataGatheringService { historicalData[symbol][format(date, DATE_FORMAT)].marketPrice; if (marketPrice) { - return await this.prismaService.marketData.upsert({ - create: { - dataSource, - date, - marketPrice, - symbol - }, - update: { marketPrice }, - where: { dataSource_date_symbol: { dataSource, date, symbol } } + return await lock.acquire('marketData', async function () { + return await this.prismaService.marketData.upsert({ + create: { + dataSource, + date, + marketPrice, + symbol + }, + update: { marketPrice }, + where: { dataSource_date_symbol: { dataSource, date, symbol } } + }); }); } } catch (error) { diff --git a/apps/api/src/services/market-data/market-data.service.ts b/apps/api/src/services/market-data/market-data.service.ts index ae0e0ef80..716bfd236 100644 --- a/apps/api/src/services/market-data/market-data.service.ts +++ b/apps/api/src/services/market-data/market-data.service.ts @@ -13,11 +13,14 @@ import { Prisma } from '@prisma/client'; import { DateQueryHelper } from '@ghostfolio/api/helper/dateQueryHelper'; +import AwaitLock from 'await-lock'; @Injectable() export class MarketDataService { public constructor(private readonly prismaService: PrismaService) {} + lock = new AwaitLock(); + private dateQueryHelper = new DateQueryHelper(); public async deleteMany({ dataSource, symbol }: UniqueAsset) { @@ -129,17 +132,22 @@ export class MarketDataService { where: Prisma.MarketDataWhereUniqueInput; }): Promise { const { data, where } = params; - return this.prismaService.marketData.upsert({ - where, - create: { - dataSource: where.dataSource_date_symbol.dataSource, - date: where.dataSource_date_symbol.date, - marketPrice: data.marketPrice, - state: data.state, - symbol: where.dataSource_date_symbol.symbol - }, - update: { marketPrice: data.marketPrice, state: data.state } - }); + await this.lock.acquireAsync(); + try { + return this.prismaService.marketData.upsert({ + where, + create: { + dataSource: where.dataSource_date_symbol.dataSource, + date: where.dataSource_date_symbol.date, + marketPrice: data.marketPrice, + state: data.state, + symbol: where.dataSource_date_symbol.symbol + }, + update: { marketPrice: data.marketPrice, state: data.state } + }); + } finally { + this.lock.release(); + } } /** @@ -152,30 +160,34 @@ export class MarketDataService { data: Prisma.MarketDataUpdateInput[]; }): Promise { const upsertPromises = data.map( - ({ dataSource, date, marketPrice, symbol, state }) => { - return this.prismaService.marketData.upsert({ - create: { - dataSource: dataSource, - date: date, - marketPrice: marketPrice, - state: state, - symbol: symbol - }, - update: { - marketPrice: marketPrice, - state: state - }, - where: { - dataSource_date_symbol: { + async ({ dataSource, date, marketPrice, symbol, state }) => { + await this.lock.acquireAsync(); + try { + return this.prismaService.marketData.upsert({ + create: { dataSource: dataSource, date: date, + marketPrice: marketPrice, + state: state, symbol: symbol + }, + update: { + marketPrice: marketPrice, + state: state + }, + where: { + dataSource_date_symbol: { + dataSource: dataSource, + date: date, + symbol: symbol + } } - } - }); + }); + } finally { + this.lock.release(); + } } ); - - return this.prismaService.$transaction(upsertPromises); + return await Promise.all(upsertPromises); } } diff --git a/package.json b/package.json index a7d7cd90b..ac3223a23 100644 --- a/package.json +++ b/package.json @@ -86,6 +86,7 @@ "@simplewebauthn/server": "8.3.2", "@stripe/stripe-js": "1.47.0", "alphavantage": "2.2.0", + "await-lock": "^2.2.2", "big.js": "6.2.1", "body-parser": "1.20.1", "bootstrap": "4.6.0", diff --git a/yarn.lock b/yarn.lock index a70ef8e84..7048cedb8 100644 --- a/yarn.lock +++ b/yarn.lock @@ -7366,6 +7366,11 @@ available-typed-arrays@^1.0.5: resolved "https://registry.yarnpkg.com/available-typed-arrays/-/available-typed-arrays-1.0.5.tgz#92f95616501069d07d10edb2fc37d3e1c65123b7" integrity sha512-DMD0KiN46eipeziST1LPP/STfDU0sufISXmjSgvVsoU2tqxctQeASejWcfNtxYKqETM1UxQ8sp2OrSBWpHY6sw== +await-lock@^2.2.2: + version "2.2.2" + resolved "https://registry.yarnpkg.com/await-lock/-/await-lock-2.2.2.tgz#a95a9b269bfd2f69d22b17a321686f551152bcef" + integrity sha512-aDczADvlvTGajTDjcjpJMqRkOF6Qdz3YbPZm/PyW6tKPkx2hlYBzxMhEywM/tU72HrVZjgl5VCdRuMlA7pZ8Gw== + aws-sign2@~0.7.0: version "0.7.0" resolved "https://registry.yarnpkg.com/aws-sign2/-/aws-sign2-0.7.0.tgz#b46e890934a9591f2d2f6f86d7e6a9f1b3fe76a8" From 613fcbd9b38007a431e7529f664e9e4ec11e24d4 Mon Sep 17 00:00:00 2001 From: Dan Date: Wed, 15 Nov 2023 16:41:02 +0100 Subject: [PATCH 2/2] Data Gathering fix --- .../data-gathering/data-gathering.service.ts | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/apps/api/src/services/data-gathering/data-gathering.service.ts b/apps/api/src/services/data-gathering/data-gathering.service.ts index 2f48394c4..213f91692 100644 --- a/apps/api/src/services/data-gathering/data-gathering.service.ts +++ b/apps/api/src/services/data-gathering/data-gathering.service.ts @@ -24,7 +24,7 @@ import { DataSource } from '@prisma/client'; import { JobOptions, Queue } from 'bull'; import { format, min, subDays, subYears } from 'date-fns'; import { isEmpty } from 'lodash'; -import { Lock } from 'async-lock'; +import AwaitLock from 'await-lock'; @Injectable() export class DataGatheringService { @@ -41,6 +41,8 @@ export class DataGatheringService { private readonly symbolProfileService: SymbolProfileService ) {} + lock = new AwaitLock(); + public async addJobToQueue({ data, name, @@ -90,7 +92,6 @@ export class DataGatheringService { date: Date; symbol: string; }) { - const lock = new Lock(); try { const historicalData = await this.dataProviderService.getHistoricalRaw( [{ dataSource, symbol }], @@ -102,7 +103,8 @@ export class DataGatheringService { historicalData[symbol][format(date, DATE_FORMAT)].marketPrice; if (marketPrice) { - return await lock.acquire('marketData', async function () { + await this.lock.acquireAsync(); + try { return await this.prismaService.marketData.upsert({ create: { dataSource, @@ -113,7 +115,9 @@ export class DataGatheringService { update: { marketPrice }, where: { dataSource_date_symbol: { dataSource, date, symbol } } }); - }); + } finally { + this.lock.release(); + } } } catch (error) { Logger.error(error, 'DataGatheringService');