diff --git a/configs/issue-event-emitter-config.jsonc.dist b/configs/issue-event-emitter-config.jsonc.dist index 3129594..d400c49 100644 --- a/configs/issue-event-emitter-config.jsonc.dist +++ b/configs/issue-event-emitter-config.jsonc.dist @@ -24,10 +24,32 @@ ], "updateInterval": 600000 // 10 min }, + "csvListener": { + "tasks": [ + { + "schedule": "", // cron schedule syntax + "updatedAtFieldName": "", + "dateTimeFormat": "", + "csvLinks": [ + "", + "" + ] + } + ] + }, + "rootIssueListener": { + "tasks": [ + { + "schedule": "", // cron schedule syntax + "rootIssues": [] // number[] + } + ] + }, "issueChangesQueue": { "updateInterval": 5000, // 5 sec "itemsLimit": 3 }, + "redmineToken": "", "redmineUrlPrefix": "", "redmineUrlPublic": "", "webhooks": [ diff --git a/libs/event-emitter/src/csvlistener/csv-listener.ts b/libs/event-emitter/src/csvlistener/csv-listener.ts new file mode 100644 index 0000000..8fcfe82 --- /dev/null +++ b/libs/event-emitter/src/csvlistener/csv-listener.ts @@ -0,0 +1,220 @@ +import { BehaviorSubject } from 'rxjs'; +import { EventsListener } from '../events/events-listener'; +import { Injectable, Logger } from '@nestjs/common'; +import { SchedulerRegistry } from '@nestjs/schedule'; +import { CronJob } from 'cron'; +import { randomUUID } from 'crypto'; +import { ConfigService } from '@nestjs/config'; +import { IssuesService } from '../issues/issues.service'; +import { RedmineDataLoader } from '../redmine-data-loader/redmine-data-loader'; +import { DateTime } from 'luxon'; +import { RedmineTypes } from '../models/redmine-types'; + +export type CsvIssue = Record; + +export type Task = { + schedule: string; + updatedAtFieldName: string; + dateTimeFormat: string; + csvLinks: string[]; +}; + +export type CsvListenerParams = { + tasks: Task[]; +}; + +export type CsvIssuesStore = Record; + +@Injectable() +export class CsvListenerFactory { + private csvListener: CsvListener; + + constructor( + private configService: ConfigService, + private schedulerRegistry: SchedulerRegistry, + private issuesService: IssuesService, + private redmineDataLoader: RedmineDataLoader, + ) {} + + getCsvListener(): CsvListener { + if (!this.csvListener) { + const params = this.configService.get('csvListener'); + this.csvListener = new CsvListener( + params, + this.schedulerRegistry, + 'default_csv_listener', + this.issuesService, + this.redmineDataLoader, + ); + } + return this.csvListener; + } +} + +@Injectable() +export class CsvListener implements EventsListener { + private logger = new Logger(CsvListener.name); + + issues: BehaviorSubject = new BehaviorSubject([]); + + constructor( + private params: CsvListenerParams, + private schedulerRegistry: SchedulerRegistry, + private jobPrefix: string, + private issuesService: IssuesService, + private redmineDataLoader: RedmineDataLoader, + ) { + this.logger.log( + `Csv listener created with params - ${JSON.stringify(params)}`, + ); + } + + start(): void { + const tasks = this?.params?.tasks || []; + this.logger.debug(`Scheduling ${tasks.length} tasks`); + for (let i = 0; i < tasks.length; i++) { + const task = this.params.tasks[i]; + const cronJobName = this.createCronJobName(); + const cronJob = new CronJob( + task.schedule, + this.createLoader(task, cronJobName), + ); + this.schedulerRegistry.addCronJob(cronJobName, cronJob); + cronJob.start(); + } + } + + stop(): void { + const jobs = this.schedulerRegistry.getCronJobs(); + jobs.forEach((job, jobName) => { + if (this.isListenerCronJon(jobName)) { + job.stop(); + this.schedulerRegistry.deleteCronJob(jobName); + } + }); + } + + private createCronJobName(): string { + return `${this.jobPrefix}_${randomUUID()}`; + } + + private isListenerCronJon(name: string): boolean { + return name.startsWith(`${this.jobPrefix}_`); + } + + private createLoader(task: Task, cronJobName: string): () => Promise { + return async () => { + this.logger.log( + `Execute task ${cronJobName} ` + + `by schedule ${task.schedule} ` + + `with ${task.csvLinks.length} queries`, + ); + this.logger.debug(`Queries - ${JSON.stringify(task.csvLinks)}`); + const csvIssuesStore = await this.loadCsv(task); + this.logger.debug( + `Loaded from csv issues count - ${Object.keys(csvIssuesStore).length}`, + ); + const existsIssuesStore = await this.getCachedIssues(csvIssuesStore); + const ids = this.filterIssueIdsForUpdate( + csvIssuesStore, + existsIssuesStore, + ); + this.logger.debug( + `Issues for update ${ids.length} - ${JSON.stringify(ids)}`, + ); + if (ids && ids.length > 0) { + this.issues.next(ids); + } + }; + } + + private async loadCsv(task: Task): Promise { + const res: CsvIssuesStore = {}; + for (let i = 0; i < task.csvLinks.length; i++) { + const csvLink = task.csvLinks[i]; + const csvData = await this.redmineDataLoader.loadCsv(csvLink); + if (!csvData || csvData.length <= 0) continue; + for (let j = 0; j < csvData.length; j++) { + const issue = csvData[j]; + const issueId = Object.values(issue)[0]; + issue.id = Number(issueId); + issue.updatedAt = DateTime.fromFormat( + issue[task.updatedAtFieldName], + task.dateTimeFormat, + ); + res[issueId] = issue; + } + } + return res; + } + + private async getCachedIssues( + store: CsvIssuesStore, + ): Promise> { + const ids = Object.keys(store).map((k) => Number(k)); + const issues = await this.issuesService.getIssues(ids); + return issues.reduce((acc, issue) => { + acc[issue.id] = issue; + return acc; + }, {}); + } + + private filterIssueIdsForUpdate( + csvIssues: CsvIssuesStore, + existsIssues: Record, + ): number[] { + const res: number[] = []; + for (const [issueId, issue] of Object.entries(csvIssues)) { + const id = Number(issueId); + if (!existsIssues[id]) { + res.push(id); + continue; + } + const existingIssue = existsIssues[id]; + const csvIssueTimestamp = issue.updatedAt?.isValid + ? issue.updatedAt.toMillis() + : null; + const existingIssueTimestamp = + this.getTimestampFromCachedIssue(existingIssue); + if (!existingIssueTimestamp) { + res.push(id); + continue; + } + if (existingIssueTimestamp < csvIssueTimestamp) { + res.push(id); + continue; + } + } + return res; + } + + private getTimestampFromCachedIssue( + issue: RedmineTypes.ExtendedIssue, + ): number | null { + if ( + typeof issue.updated_on_timestamp === 'number' && + issue.updated_on_timestamp > 0 + ) { + return issue.updated_on_timestamp; + } + + let dt = DateTime.fromISO(issue.updated_on); + if (dt.isValid) { + return dt.toMillis(); + } + + if ( + typeof issue.created_on_timestamp === 'number' && + issue.created_on_timestamp > 0 + ) { + return issue.created_on_timestamp; + } + + dt = DateTime.fromISO(issue.created_on); + if (dt.isValid) { + return dt.toMillis(); + } + + return null; + } +} diff --git a/libs/event-emitter/src/event-emitter.module.ts b/libs/event-emitter/src/event-emitter.module.ts index 2e69e41..0ddf58e 100644 --- a/libs/event-emitter/src/event-emitter.module.ts +++ b/libs/event-emitter/src/event-emitter.module.ts @@ -37,6 +37,9 @@ import { ListIssuesByFieldsWidgetDataLoaderService } from './dashboards/widget-d import { WidgetsCollectionService } from './dashboards/widgets-collection.service'; import { DashboardsController } from './dashboards/dashboards.controller'; import { CalendarWidgetDataLoaderService } from './dashboards/widget-data-loader/calendar.widget-data-loader.service'; +import { ScheduleModule } from '@nestjs/schedule'; +import { CsvListenerFactory } from './csvlistener/csv-listener'; +import { RootIssueListenerFactory } from './rootissuelistener/root-issue-listener'; @Module({}) export class EventEmitterModule implements OnModuleInit { @@ -45,6 +48,7 @@ export class EventEmitterModule implements OnModuleInit { module: EventEmitterModule, imports: [ ConfigModule.forRoot({ load: [() => params?.config || MainConfig()] }), + ScheduleModule.forRoot(), ], providers: [ EventEmitterService, @@ -106,6 +110,8 @@ export class EventEmitterModule implements OnModuleInit { DashboardsDataService, WidgetsCollectionService, CalendarWidgetDataLoaderService, + CsvListenerFactory, + RootIssueListenerFactory, ], exports: [ EventEmitterService, @@ -162,9 +168,16 @@ export class EventEmitterModule implements OnModuleInit { constructor( private redmineEventsGateway: RedmineEventsGateway, private redmineIssuesCacheWriterService: RedmineIssuesCacheWriterService, + private csvListenerFactory: CsvListenerFactory, + private rootIssueListenerFactory: RootIssueListenerFactory, ) {} onModuleInit() { + const csvListener = this.csvListenerFactory.getCsvListener(); + this.redmineEventsGateway.appendAndInitListener(csvListener); + const rootIssueListener = + this.rootIssueListenerFactory.getRootIssueListener(); + this.redmineEventsGateway.appendAndInitListener(rootIssueListener); const queue = this.redmineEventsGateway.getIssuesChangesQueue(); const subj = queue.queue; subj.subscribe(async (issues: RedmineTypes.Issue[]) => { diff --git a/libs/event-emitter/src/events/redmine-events.gateway.ts b/libs/event-emitter/src/events/redmine-events.gateway.ts index fd05823..932bfbb 100644 --- a/libs/event-emitter/src/events/redmine-events.gateway.ts +++ b/libs/event-emitter/src/events/redmine-events.gateway.ts @@ -134,25 +134,29 @@ export class RedmineEventsGateway { return this.rssListener; } - private listener: EventsListener | null | undefined; - private getMainListener(): EventsListener | null { - if (typeof this.listener !== 'undefined') { - return this.listener; + private listeners: EventsListener[]; + private getMainListener(): EventsListener[] { + if (!this.listeners) { + this.listeners = [ + this.getMailListener(), + this.getRssListener(), + // this.getCsvListener(), + ]; + this.listeners.forEach((l) => l && l.start && l.start()); } + return this.listeners; + } - const mailListener = this.getMailListener(); - const rssListener = this.getRssListener(); - if (mailListener) { - this.listener = mailListener; - } else if (rssListener) { - this.listener = rssListener; - } else { - this.listener = null; + appendAndInitListener(eventListener: EventsListener): void { + const listeners = this.getMainListener(); + if (listeners.indexOf(eventListener) < 0) { + this.listeners.push(eventListener); + eventListener.start(); + const issuesChangesQueue = this.getIssuesChangesQueue(); + eventListener.issues.subscribe((issues) => { + issuesChangesQueue.add(issues); + }); } - if (this.listener) { - this.listener.start(); - } - return this.listener; } private initWebSocketsSendData(): void { @@ -170,7 +174,7 @@ export class RedmineEventsGateway { } private initChangesLogging(): void { - if (this.listener) { + if (this.listeners && this.listeners.length > 0) { this.getIssuesChangesQueue().queue.subscribe((data) => { const issues = data.map((issue) => { return `${issue['id']} - ${issue['subject']}`; @@ -181,14 +185,18 @@ export class RedmineEventsGateway { } private initRedmineEventsGateway(): boolean { - const listener = this.getMainListener(); - if (!listener) { + const listeners = this.getMainListener(); + if (!listeners || listeners.length <= 0) { this.logger.error('Listener not created'); return false; } const issuesChangesQueue = this.getIssuesChangesQueue(); - listener.issues.subscribe((issues) => { - issuesChangesQueue.add(issues); + listeners.forEach((l) => { + l && + l.issues && + l.issues.subscribe((issues) => { + issuesChangesQueue.add(issues); + }); }); this.initWebSocketsSendData(); this.initWebHooksSendData(); diff --git a/libs/event-emitter/src/redmine-data-loader/redmine-data-loader.ts b/libs/event-emitter/src/redmine-data-loader/redmine-data-loader.ts index 4eef4cf..2ebe001 100644 --- a/libs/event-emitter/src/redmine-data-loader/redmine-data-loader.ts +++ b/libs/event-emitter/src/redmine-data-loader/redmine-data-loader.ts @@ -3,10 +3,13 @@ import { Injectable, Logger } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { RedmineTypes } from '../models/redmine-types'; import { EnhancerService } from '../issue-enhancers/enhancer.service'; +import { parse as csvParse } from 'csv/sync'; @Injectable() export class RedmineDataLoader { urlPrefix: string; + redmineUrl: string; + redmineToken: string; private logger = new Logger(RedmineDataLoader.name); @@ -15,6 +18,8 @@ export class RedmineDataLoader { private enhancerService: EnhancerService, ) { this.urlPrefix = this.configService.get('redmineUrlPrefix'); + this.redmineUrl = this.configService.get('redmineUrlPublic'); + this.redmineToken = this.configService.get('redmineToken'); } async loadIssues(issues: number[]): Promise<(RedmineTypes.Issue | null)[]> { @@ -22,7 +27,10 @@ export class RedmineDataLoader { return Promise.all(promises); } - async loadIssue(issueNumber: number): Promise { + async loadIssue( + issueNumber: number, + skipEnhancers = false, + ): Promise { const url = this.getIssueUrl(issueNumber); let resp; try { @@ -41,6 +49,7 @@ export class RedmineDataLoader { this.logger.debug( `Loaded issue, issueNumber = ${issueNumber}, subject = ${resp.data.issue.subject}`, ); + if (skipEnhancers) return resp.data.issue; let enhancedIssue; try { enhancedIssue = await this.enhancerService.enhanceIssue(resp.data.issue); @@ -58,7 +67,7 @@ export class RedmineDataLoader { } async loadUser(userNumber: number): Promise { - if (userNumber <= 0) { + if (typeof userNumber !== 'number' || userNumber <= 0) { this.logger.warn(`Invalid userNumber = ${userNumber}`); return null; } @@ -94,4 +103,34 @@ export class RedmineDataLoader { private getUserUrl(userNumber: number): string { return `${this.urlPrefix}/users/${userNumber}.json`; } + + async loadCsv( + urlQuery: string, + csvParserParams?: any, + ): Promise[]> { + if (!csvParserParams) { + csvParserParams = { + delimiter: ';', + quote: '"', + columns: true, + skip_empty_lines: true, + }; + } + const resp = await fetch(urlQuery, { + headers: { + 'X-Redmine-API-Key': this.redmineToken, + }, + }); + const rawData = await resp.text(); + let res; + try { + res = csvParse(rawData, csvParserParams); + } catch (ex) { + this.logger.error( + `Error at loading csv from redmine, query - ${urlQuery}, ex - ${ex}`, + ); + return null; + } + return res; + } } diff --git a/libs/event-emitter/src/rootissuelistener/root-issue-listener.ts b/libs/event-emitter/src/rootissuelistener/root-issue-listener.ts new file mode 100644 index 0000000..e6896aa --- /dev/null +++ b/libs/event-emitter/src/rootissuelistener/root-issue-listener.ts @@ -0,0 +1,149 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { EventsListener } from '../events/events-listener'; +import { BehaviorSubject } from 'rxjs'; +import { SchedulerRegistry } from '@nestjs/schedule'; +import { RedmineDataLoader } from '../redmine-data-loader/redmine-data-loader'; +import { randomUUID } from 'crypto'; +import { RedmineTypes } from '../models/redmine-types'; +import { CronJob } from 'cron'; +import { ConfigService } from '@nestjs/config'; + +export type Task = { + schedule: string; + rootIssues: number[]; +}; + +export type RootIssueListenerParams = { + tasks: Task[]; +}; + +@Injectable() +export class RootIssueListenerFactory { + private rootIssueListener: RootIssueListener; + + constructor( + private configService: ConfigService, + private schedulerRegistry: SchedulerRegistry, + private redmineDataLoader: RedmineDataLoader, + ) {} + + getRootIssueListener(): RootIssueListener { + if (!this.rootIssueListener) { + const params = this.configService.get('rootIssueListener'); + this.rootIssueListener = new RootIssueListener( + params, + this.schedulerRegistry, + 'default_root_issue_listener', + this.redmineDataLoader, + ); + } + return this.rootIssueListener; + } +} + +@Injectable() +export class RootIssueListener implements EventsListener { + private logger = new Logger(RootIssueListener.name); + + issues: BehaviorSubject = new BehaviorSubject([]); + + constructor( + private params: RootIssueListenerParams, + private schedulerRegistry: SchedulerRegistry, + private jobPrefix: string, + private redmineDataLoader: RedmineDataLoader, + ) { + this.logger.log( + `Root issue listener created with params - ${JSON.stringify(params)}`, + ); + } + + start(): void { + const tasks = this?.params?.tasks || []; + this.logger.debug(`Scheduling ${tasks.length} tasks`); + for (let i = 0; i < tasks.length; i++) { + const task = tasks[i]; + const cronJobName = this.createCronJobName(); + const cronJob = new CronJob( + task.schedule, + this.createLoader(task, cronJobName), + ); + this.schedulerRegistry.addCronJob(cronJobName, cronJob); + cronJob.start(); + } + } + + stop(): void { + const jobs = this.schedulerRegistry.getCronJobs(); + jobs.forEach((job, jobName) => { + if (this.isListenerCronJon(jobName)) { + job.stop(); + this.schedulerRegistry.deleteCronJob(jobName); + } + }); + } + + private createCronJobName(): string { + return `${this.jobPrefix}_${randomUUID()}`; + } + + private isListenerCronJon(name: string): boolean { + return name.startsWith(`${this.jobPrefix}_`); + } + + private createLoader(task: Task, cronJobName: string): () => Promise { + return async () => { + this.logger.log( + `Execute task ${cronJobName} ` + + `by schedule ${task.schedule} ` + + `with ${task.rootIssues.length} root issues`, + ); + const issuesStore = await this.getRootIssuesFromRedmine(task); + this.logger.debug( + `Loaded root issues ` + + `${Object.keys(issuesStore).length} - ` + + `${JSON.stringify(Object.keys(issuesStore).map((i) => Number(i)))}`, + ); + const ids = this.getAllRootIssueIds(Object.values(issuesStore), []); + this.logger.debug( + `Issues for update ${ids.length} - ${JSON.stringify(ids)}`, + ); + if (ids && ids.length > 0) { + this.issues.next(ids); + } + }; + } + + private async getRootIssuesFromRedmine( + task: Task, + ): Promise> { + const res = {}; + const SKIP_ENHANCERS = true; + for (let i = 0; i < task.rootIssues.length; i++) { + const issueId = task.rootIssues[i]; + if (typeof issueId !== 'number' || issueId <= 0) continue; + const issue = await this.redmineDataLoader.loadIssue( + issueId, + SKIP_ENHANCERS, + ); + if (issue) { + res[issueId] = issue; + } + } + return res; + } + + private getAllRootIssueIds( + issues: RedmineTypes.ExtendedIssue[] | RedmineTypes.Children, + res: number[], + ): number[] { + for (let i = 0; i < issues.length; i++) { + const issue = issues[i]; + if (issue.children && issue.children.length > 0) { + res.push(issue.id); + this.getAllRootIssueIds(issue.children, res); + } + } + return res; + } +}