Browse Source

Added Market Data upsert locks

pull/5027/head
Dan 2 years ago
parent
commit
98db33553c
  1. 4
      apps/api/src/services/data-gathering/data-gathering.service.ts
  2. 19
      apps/api/src/services/market-data/market-data.service.ts
  3. 1
      package.json

4
apps/api/src/services/data-gathering/data-gathering.service.ts

@ -24,6 +24,7 @@ import { DataSource } from '@prisma/client';
import { JobOptions, Queue } from 'bull';
import { format, min, subDays, subYears } from 'date-fns';
import { isEmpty } from 'lodash';
import { Lock } from 'async-lock';
@Injectable()
export class DataGatheringService {
@ -89,6 +90,7 @@ export class DataGatheringService {
date: Date;
symbol: string;
}) {
const lock = new Lock();
try {
const historicalData = await this.dataProviderService.getHistoricalRaw(
[{ dataSource, symbol }],
@ -100,6 +102,7 @@ export class DataGatheringService {
historicalData[symbol][format(date, DATE_FORMAT)].marketPrice;
if (marketPrice) {
return await lock.acquire('marketData', async function () {
return await this.prismaService.marketData.upsert({
create: {
dataSource,
@ -110,6 +113,7 @@ export class DataGatheringService {
update: { marketPrice },
where: { dataSource_date_symbol: { dataSource, date, symbol } }
});
});
}
} catch (error) {
Logger.error(error, 'DataGatheringService');

19
apps/api/src/services/market-data/market-data.service.ts

@ -13,11 +13,14 @@ import {
Prisma
} from '@prisma/client';
import { DateQueryHelper } from '@ghostfolio/api/helper/dateQueryHelper';
import AwaitLock from 'await-lock';
@Injectable()
export class MarketDataService {
public constructor(private readonly prismaService: PrismaService) {}
lock = new AwaitLock();
private dateQueryHelper = new DateQueryHelper();
public async deleteMany({ dataSource, symbol }: UniqueAsset) {
@ -129,7 +132,8 @@ export class MarketDataService {
where: Prisma.MarketDataWhereUniqueInput;
}): Promise<MarketData> {
const { data, where } = params;
await this.lock.acquireAsync();
try {
return this.prismaService.marketData.upsert({
where,
create: {
@ -141,6 +145,9 @@ export class MarketDataService {
},
update: { marketPrice: data.marketPrice, state: data.state }
});
} finally {
this.lock.release();
}
}
/**
@ -153,7 +160,9 @@ export class MarketDataService {
data: Prisma.MarketDataUpdateInput[];
}): Promise<MarketData[]> {
const upsertPromises = data.map(
({ dataSource, date, marketPrice, symbol, state }) => {
async ({ dataSource, date, marketPrice, symbol, state }) => {
await this.lock.acquireAsync();
try {
return this.prismaService.marketData.upsert({
create: {
dataSource: <DataSource>dataSource,
@ -174,9 +183,11 @@ export class MarketDataService {
}
}
});
} finally {
this.lock.release();
}
}
);
return this.prismaService.$transaction(upsertPromises);
return await Promise.all(upsertPromises);
}
}

1
package.json

@ -86,6 +86,7 @@
"@simplewebauthn/server": "8.3.2",
"@stripe/stripe-js": "1.47.0",
"alphavantage": "2.2.0",
"await-lock": "^2.2.2",
"big.js": "6.2.1",
"body-parser": "1.20.1",
"bootstrap": "4.6.0",

Loading…
Cancel
Save