Browse Source

Merge pull request #40 from dandevaud/bugfix/several-bugfixes

Added Market Data upsert locks
pull/5027/head
dandevaud 2 years ago
committed by GitHub
parent
commit
1b46a2bb81
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 28
      apps/api/src/services/data-gathering/data-gathering.service.ts
  2. 73
      apps/api/src/services/market-data/market-data.service.ts
  3. 1
      package.json
  4. 5
      yarn.lock

28
apps/api/src/services/data-gathering/data-gathering.service.ts

@ -24,6 +24,7 @@ import { DataSource } from '@prisma/client';
import { JobOptions, Queue } from 'bull'; import { JobOptions, Queue } from 'bull';
import { format, min, subDays, subYears } from 'date-fns'; import { format, min, subDays, subYears } from 'date-fns';
import { isEmpty } from 'lodash'; import { isEmpty } from 'lodash';
import AwaitLock from 'await-lock';
@Injectable() @Injectable()
export class DataGatheringService { export class DataGatheringService {
@ -40,6 +41,8 @@ export class DataGatheringService {
private readonly symbolProfileService: SymbolProfileService private readonly symbolProfileService: SymbolProfileService
) {} ) {}
lock = new AwaitLock();
public async addJobToQueue({ public async addJobToQueue({
data, data,
name, name,
@ -100,16 +103,21 @@ export class DataGatheringService {
historicalData[symbol][format(date, DATE_FORMAT)].marketPrice; historicalData[symbol][format(date, DATE_FORMAT)].marketPrice;
if (marketPrice) { if (marketPrice) {
return await this.prismaService.marketData.upsert({ await this.lock.acquireAsync();
create: { try {
dataSource, return await this.prismaService.marketData.upsert({
date, create: {
marketPrice, dataSource,
symbol date,
}, marketPrice,
update: { marketPrice }, symbol
where: { dataSource_date_symbol: { dataSource, date, symbol } } },
}); update: { marketPrice },
where: { dataSource_date_symbol: { dataSource, date, symbol } }
});
} finally {
this.lock.release();
}
} }
} catch (error) { } catch (error) {
Logger.error(error, 'DataGatheringService'); Logger.error(error, 'DataGatheringService');

73
apps/api/src/services/market-data/market-data.service.ts

@ -13,11 +13,14 @@ import {
Prisma Prisma
} from '@prisma/client'; } from '@prisma/client';
import { DateQueryHelper } from '@ghostfolio/api/helper/dateQueryHelper'; import { DateQueryHelper } from '@ghostfolio/api/helper/dateQueryHelper';
import AwaitLock from 'await-lock';
@Injectable() @Injectable()
export class MarketDataService { export class MarketDataService {
public constructor(private readonly prismaService: PrismaService) {} public constructor(private readonly prismaService: PrismaService) {}
lock = new AwaitLock();
private dateQueryHelper = new DateQueryHelper(); private dateQueryHelper = new DateQueryHelper();
public async deleteMany({ dataSource, symbol }: UniqueAsset) { public async deleteMany({ dataSource, symbol }: UniqueAsset) {
@ -129,18 +132,22 @@ export class MarketDataService {
where: Prisma.MarketDataWhereUniqueInput; where: Prisma.MarketDataWhereUniqueInput;
}): Promise<MarketData> { }): Promise<MarketData> {
const { data, where } = params; const { data, where } = params;
await this.lock.acquireAsync();
return this.prismaService.marketData.upsert({ try {
where, return this.prismaService.marketData.upsert({
create: { where,
dataSource: where.dataSource_date_symbol.dataSource, create: {
date: where.dataSource_date_symbol.date, dataSource: where.dataSource_date_symbol.dataSource,
marketPrice: data.marketPrice, date: where.dataSource_date_symbol.date,
state: data.state, marketPrice: data.marketPrice,
symbol: where.dataSource_date_symbol.symbol state: data.state,
}, symbol: where.dataSource_date_symbol.symbol
update: { marketPrice: data.marketPrice, state: data.state } },
}); update: { marketPrice: data.marketPrice, state: data.state }
});
} finally {
this.lock.release();
}
} }
/** /**
@ -153,30 +160,34 @@ export class MarketDataService {
data: Prisma.MarketDataUpdateInput[]; data: Prisma.MarketDataUpdateInput[];
}): Promise<MarketData[]> { }): Promise<MarketData[]> {
const upsertPromises = data.map( const upsertPromises = data.map(
({ dataSource, date, marketPrice, symbol, state }) => { async ({ dataSource, date, marketPrice, symbol, state }) => {
return this.prismaService.marketData.upsert({ await this.lock.acquireAsync();
create: { try {
dataSource: <DataSource>dataSource, return this.prismaService.marketData.upsert({
date: <Date>date, create: {
marketPrice: <number>marketPrice,
state: <MarketDataState>state,
symbol: <string>symbol
},
update: {
marketPrice: <number>marketPrice,
state: <MarketDataState>state
},
where: {
dataSource_date_symbol: {
dataSource: <DataSource>dataSource, dataSource: <DataSource>dataSource,
date: <Date>date, date: <Date>date,
marketPrice: <number>marketPrice,
state: <MarketDataState>state,
symbol: <string>symbol symbol: <string>symbol
},
update: {
marketPrice: <number>marketPrice,
state: <MarketDataState>state
},
where: {
dataSource_date_symbol: {
dataSource: <DataSource>dataSource,
date: <Date>date,
symbol: <string>symbol
}
} }
} });
}); } finally {
this.lock.release();
}
} }
); );
return await Promise.all(upsertPromises);
return this.prismaService.$transaction(upsertPromises);
} }
} }

1
package.json

@ -86,6 +86,7 @@
"@simplewebauthn/server": "8.3.2", "@simplewebauthn/server": "8.3.2",
"@stripe/stripe-js": "1.47.0", "@stripe/stripe-js": "1.47.0",
"alphavantage": "2.2.0", "alphavantage": "2.2.0",
"await-lock": "^2.2.2",
"big.js": "6.2.1", "big.js": "6.2.1",
"body-parser": "1.20.1", "body-parser": "1.20.1",
"bootstrap": "4.6.0", "bootstrap": "4.6.0",

5
yarn.lock

@ -7366,6 +7366,11 @@ available-typed-arrays@^1.0.5:
resolved "https://registry.yarnpkg.com/available-typed-arrays/-/available-typed-arrays-1.0.5.tgz#92f95616501069d07d10edb2fc37d3e1c65123b7" resolved "https://registry.yarnpkg.com/available-typed-arrays/-/available-typed-arrays-1.0.5.tgz#92f95616501069d07d10edb2fc37d3e1c65123b7"
integrity sha512-DMD0KiN46eipeziST1LPP/STfDU0sufISXmjSgvVsoU2tqxctQeASejWcfNtxYKqETM1UxQ8sp2OrSBWpHY6sw== integrity sha512-DMD0KiN46eipeziST1LPP/STfDU0sufISXmjSgvVsoU2tqxctQeASejWcfNtxYKqETM1UxQ8sp2OrSBWpHY6sw==
await-lock@^2.2.2:
version "2.2.2"
resolved "https://registry.yarnpkg.com/await-lock/-/await-lock-2.2.2.tgz#a95a9b269bfd2f69d22b17a321686f551152bcef"
integrity sha512-aDczADvlvTGajTDjcjpJMqRkOF6Qdz3YbPZm/PyW6tKPkx2hlYBzxMhEywM/tU72HrVZjgl5VCdRuMlA7pZ8Gw==
aws-sign2@~0.7.0: aws-sign2@~0.7.0:
version "0.7.0" version "0.7.0"
resolved "https://registry.yarnpkg.com/aws-sign2/-/aws-sign2-0.7.0.tgz#b46e890934a9591f2d2f6f86d7e6a9f1b3fe76a8" resolved "https://registry.yarnpkg.com/aws-sign2/-/aws-sign2-0.7.0.tgz#b46e890934a9591f2d2f6f86d7e6a9f1b3fe76a8"

Loading…
Cancel
Save