From 1ed5690b33e5d55cb6f87bb92c221c5506e4c069 Mon Sep 17 00:00:00 2001
From: Thomas Kaul <4159106+dtslvr@users.noreply.github.com>
Date: Fri, 14 Apr 2023 19:57:23 +0200
Subject: [PATCH] Feature/improve queue jobs implementation (#1855)

* Improve queue jobs implementation

* Update changelog
---
 CHANGELOG.md                                  |  7 +++
 apps/api/src/app/admin/admin.controller.ts    | 15 ++++-
 apps/api/src/app/order/order.service.ts       |  5 +-
 apps/api/src/services/cron.service.ts         |  8 ++-
 .../src/services/data-gathering.service.ts    | 61 ++++++++-----------
 5 files changed, 52 insertions(+), 44 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 5402edd4b..75e464622 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file.
 The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
 and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
 
+## Unreleased
+
+### Changed
+
+- Improved the queue jobs implementation by adding in bulk
+- Improved the queue jobs implementation by introducing unique job ids
+
 ## 1.253.0 - 2023-04-14
 
 ### Changed
diff --git a/apps/api/src/app/admin/admin.controller.ts b/apps/api/src/app/admin/admin.controller.ts
index 7f3f17791..6d34f8cdb 100644
--- a/apps/api/src/app/admin/admin.controller.ts
+++ b/apps/api/src/app/admin/admin.controller.ts
@@ -107,7 +107,10 @@ export class AdminController {
           dataSource,
           symbol
         },
-        GATHER_ASSET_PROFILE_PROCESS_OPTIONS
+        {
+          ...GATHER_ASSET_PROFILE_PROCESS_OPTIONS,
+          jobId: `${dataSource}-${symbol}}`
+        }
       );
     }
 
@@ -138,7 +141,10 @@ export class AdminController {
           dataSource,
           symbol
         },
-        GATHER_ASSET_PROFILE_PROCESS_OPTIONS
+        {
+          ...GATHER_ASSET_PROFILE_PROCESS_OPTIONS,
+          jobId: `${dataSource}-${symbol}}`
+        }
       );
     }
   }
@@ -167,7 +173,10 @@ export class AdminController {
         dataSource,
         symbol
       },
-      GATHER_ASSET_PROFILE_PROCESS_OPTIONS
+      {
+        ...GATHER_ASSET_PROFILE_PROCESS_OPTIONS,
+        jobId: `${dataSource}-${symbol}}`
+      }
     );
   }
 
diff --git a/apps/api/src/app/order/order.service.ts b/apps/api/src/app/order/order.service.ts
index 2d0cb7376..50cc3bf71 100644
--- a/apps/api/src/app/order/order.service.ts
+++ b/apps/api/src/app/order/order.service.ts
@@ -118,7 +118,10 @@ export class OrderService {
         dataSource: data.SymbolProfile.connectOrCreate.create.dataSource,
         symbol: data.SymbolProfile.connectOrCreate.create.symbol
       },
-      GATHER_ASSET_PROFILE_PROCESS_OPTIONS
+      {
+        ...GATHER_ASSET_PROFILE_PROCESS_OPTIONS,
+        jobId: `${data.SymbolProfile.connectOrCreate.create.dataSource}-${data.SymbolProfile.connectOrCreate.create.symbol}}`
+      }
     );
 
     const isDraft = isAfter(data.date as Date, endOfToday());
diff --git a/apps/api/src/services/cron.service.ts b/apps/api/src/services/cron.service.ts
index 3b592d176..69906dfa9 100644
--- a/apps/api/src/services/cron.service.ts
+++ b/apps/api/src/services/cron.service.ts
@@ -21,13 +21,12 @@ export class CronService {
 
   @Cron(CronExpression.EVERY_4_HOURS)
   public async runEveryFourHours() {
-    // await this.dataGatheringService.gather7Days();
+    await this.dataGatheringService.gather7Days();
   }
 
   @Cron(CronExpression.EVERY_12_HOURS)
   public async runEveryTwelveHours() {
     await this.exchangeRateDataService.loadCurrencies();
-    await this.dataGatheringService.gather7Days();
   }
 
   @Cron(CronExpression.EVERY_DAY_AT_5PM)
@@ -46,7 +45,10 @@ export class CronService {
           dataSource,
           symbol
         },
-        GATHER_ASSET_PROFILE_PROCESS_OPTIONS
+        {
+          ...GATHER_ASSET_PROFILE_PROCESS_OPTIONS,
+          jobId: `${dataSource}-${symbol}}`
+        }
       );
     }
   }
diff --git a/apps/api/src/services/data-gathering.service.ts b/apps/api/src/services/data-gathering.service.ts
index 62209846c..4015cf114 100644
--- a/apps/api/src/services/data-gathering.service.ts
+++ b/apps/api/src/services/data-gathering.service.ts
@@ -2,8 +2,7 @@ import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile.se
 import {
   DATA_GATHERING_QUEUE,
   GATHER_HISTORICAL_MARKET_DATA_PROCESS,
-  GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS,
-  QUEUE_JOB_STATUS_LIST
+  GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS
 } from '@ghostfolio/common/config';
 import { DATE_FORMAT, resetHours } from '@ghostfolio/common/helper';
 import { UniqueAsset } from '@ghostfolio/common/interfaces';
@@ -34,17 +33,14 @@ export class DataGatheringService {
     private readonly symbolProfileService: SymbolProfileService
   ) {}
 
-  public async addJobToQueue(name: string, data: any, options?: JobOptions) {
-    const hasJob = await this.hasJob(name, data);
+  public async addJobToQueue(name: string, data: any, opts?: JobOptions) {
+    return this.dataGatheringQueue.add(name, data, opts);
+  }
 
-    if (hasJob) {
-      Logger.log(
-        `Job ${name} with data ${JSON.stringify(data)} already exists.`,
-        'DataGatheringService'
-      );
-    } else {
-      return this.dataGatheringQueue.add(name, data, options);
-    }
+  public async addJobsToQueue(
+    jobs: { data: any; name: string; opts?: JobOptions }[]
+  ) {
+    return this.dataGatheringQueue.addBulk(jobs);
   }
 
   public async gather7Days() {
@@ -209,17 +205,22 @@ export class DataGatheringService {
   }
 
   public async gatherSymbols(aSymbolsWithStartDate: IDataGatheringItem[]) {
-    for (const { dataSource, date, symbol } of aSymbolsWithStartDate) {
-      await this.addJobToQueue(
-        GATHER_HISTORICAL_MARKET_DATA_PROCESS,
-        {
-          dataSource,
-          date,
-          symbol
-        },
-        GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS
-      );
-    }
+    await this.addJobsToQueue(
+      aSymbolsWithStartDate.map(({ dataSource, date, symbol }) => {
+        return {
+          data: {
+            dataSource,
+            date,
+            symbol
+          },
+          name: GATHER_HISTORICAL_MARKET_DATA_PROCESS,
+          opts: {
+            ...GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS,
+            jobId: `${dataSource}-${symbol}-${format(date, DATE_FORMAT)}`
+          }
+        };
+      })
+    );
   }
 
   public async getSymbolsMax(): Promise<IDataGatheringItem[]> {
@@ -341,18 +342,4 @@ export class DataGatheringService {
 
     return [...currencyPairsToGather, ...symbolProfilesToGather];
   }
-
-  private async hasJob(name: string, data: any) {
-    const jobs = await this.dataGatheringQueue.getJobs(
-      QUEUE_JOB_STATUS_LIST.filter((status) => {
-        return status !== 'completed';
-      })
-    );
-
-    return jobs.some((job) => {
-      return (
-        job.name === name && JSON.stringify(job.data) === JSON.stringify(data)
-      );
-    });
-  }
 }