|
@ -1,5 +1,6 @@ |
|
|
import { |
|
|
import { |
|
|
DATA_GATHERING_QUEUE, |
|
|
DATA_GATHERING_QUEUE, |
|
|
|
|
|
PORTFOLIO_SNAPSHOT_QUEUE, |
|
|
QUEUE_JOB_STATUS_LIST |
|
|
QUEUE_JOB_STATUS_LIST |
|
|
} from '@ghostfolio/common/config'; |
|
|
} from '@ghostfolio/common/config'; |
|
|
import { AdminJobs } from '@ghostfolio/common/interfaces'; |
|
|
import { AdminJobs } from '@ghostfolio/common/interfaces'; |
|
@ -12,11 +13,19 @@ import { JobStatus, Queue } from 'bull'; |
|
|
export class QueueService { |
|
|
export class QueueService { |
|
|
public constructor( |
|
|
public constructor( |
|
|
@InjectQueue(DATA_GATHERING_QUEUE) |
|
|
@InjectQueue(DATA_GATHERING_QUEUE) |
|
|
private readonly dataGatheringQueue: Queue |
|
|
private readonly dataGatheringQueue: Queue, |
|
|
|
|
|
@InjectQueue(PORTFOLIO_SNAPSHOT_QUEUE) |
|
|
|
|
|
private readonly portfolioSnapshotQueue: Queue |
|
|
) {} |
|
|
) {} |
|
|
|
|
|
|
|
|
public async deleteJob(aId: string) { |
|
|
public async deleteJob(aId: string) { |
|
|
return (await this.dataGatheringQueue.getJob(aId))?.remove(); |
|
|
let job = await this.dataGatheringQueue.getJob(aId); |
|
|
|
|
|
|
|
|
|
|
|
if (!job) { |
|
|
|
|
|
job = await this.portfolioSnapshotQueue.getJob(aId); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
return job?.remove(); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
public async deleteJobs({ |
|
|
public async deleteJobs({ |
|
@ -25,15 +34,21 @@ export class QueueService { |
|
|
status?: JobStatus[]; |
|
|
status?: JobStatus[]; |
|
|
}) { |
|
|
}) { |
|
|
for (const statusItem of status) { |
|
|
for (const statusItem of status) { |
|
|
await this.dataGatheringQueue.clean( |
|
|
const queueStatus = statusItem === 'waiting' ? 'wait' : statusItem; |
|
|
300, |
|
|
|
|
|
statusItem === 'waiting' ? 'wait' : statusItem |
|
|
await this.dataGatheringQueue.clean(300, queueStatus); |
|
|
); |
|
|
await this.portfolioSnapshotQueue.clean(300, queueStatus); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
public async executeJob(aId: string) { |
|
|
public async executeJob(aId: string) { |
|
|
return (await this.dataGatheringQueue.getJob(aId))?.promote(); |
|
|
let job = await this.dataGatheringQueue.getJob(aId); |
|
|
|
|
|
|
|
|
|
|
|
if (!job) { |
|
|
|
|
|
job = await this.portfolioSnapshotQueue.getJob(aId); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
return job?.promote(); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
public async getJobs({ |
|
|
public async getJobs({ |
|
@ -43,10 +58,15 @@ export class QueueService { |
|
|
limit?: number; |
|
|
limit?: number; |
|
|
status?: JobStatus[]; |
|
|
status?: JobStatus[]; |
|
|
}): Promise<AdminJobs> { |
|
|
}): Promise<AdminJobs> { |
|
|
const jobs = await this.dataGatheringQueue.getJobs(status); |
|
|
const [dataGatheringJobs, portfolioSnapshotJobs] = await Promise.all([ |
|
|
|
|
|
this.dataGatheringQueue.getJobs(status), |
|
|
|
|
|
this.portfolioSnapshotQueue.getJobs(status) |
|
|
|
|
|
]); |
|
|
|
|
|
|
|
|
|
|
|
const allJobs = [...dataGatheringJobs, ...portfolioSnapshotJobs]; |
|
|
|
|
|
|
|
|
const jobsWithState = await Promise.all( |
|
|
const jobsWithState = await Promise.all( |
|
|
jobs |
|
|
allJobs |
|
|
.filter((job) => { |
|
|
.filter((job) => { |
|
|
return job; |
|
|
return job; |
|
|
}) |
|
|
}) |
|
|