Добавлены дополнительные стратегии для синхронизации с redmine

* через запрос данных из csv
* через прогрузку суммирующих задач
This commit is contained in:
Pavel Gnedov 2024-02-22 13:07:54 +07:00
parent 9d476a43fa
commit 18873337b1
6 changed files with 474 additions and 23 deletions

View file

@ -24,10 +24,32 @@
], ],
"updateInterval": 600000 // 10 min "updateInterval": 600000 // 10 min
}, },
"csvListener": {
"tasks": [
{
"schedule": "", // cron schedule syntax
"updatedAtFieldName": "",
"dateTimeFormat": "",
"csvLinks": [
"",
""
]
}
]
},
"rootIssueListener": {
"tasks": [
{
"schedule": "", // cron schedule syntax
"rootIssues": [] // number[]
}
]
},
"issueChangesQueue": { "issueChangesQueue": {
"updateInterval": 5000, // 5 sec "updateInterval": 5000, // 5 sec
"itemsLimit": 3 "itemsLimit": 3
}, },
"redmineToken": "",
"redmineUrlPrefix": "", "redmineUrlPrefix": "",
"redmineUrlPublic": "", "redmineUrlPublic": "",
"webhooks": [ "webhooks": [

View file

@ -0,0 +1,220 @@
import { BehaviorSubject } from 'rxjs';
import { EventsListener } from '../events/events-listener';
import { Injectable, Logger } from '@nestjs/common';
import { SchedulerRegistry } from '@nestjs/schedule';
import { CronJob } from 'cron';
import { randomUUID } from 'crypto';
import { ConfigService } from '@nestjs/config';
import { IssuesService } from '../issues/issues.service';
import { RedmineDataLoader } from '../redmine-data-loader/redmine-data-loader';
import { DateTime } from 'luxon';
import { RedmineTypes } from '../models/redmine-types';
export type CsvIssue = Record<string, any>;
export type Task = {
schedule: string;
updatedAtFieldName: string;
dateTimeFormat: string;
csvLinks: string[];
};
export type CsvListenerParams = {
tasks: Task[];
};
export type CsvIssuesStore = Record<number, CsvIssue>;
@Injectable()
export class CsvListenerFactory {
private csvListener: CsvListener;
constructor(
private configService: ConfigService,
private schedulerRegistry: SchedulerRegistry,
private issuesService: IssuesService,
private redmineDataLoader: RedmineDataLoader,
) {}
getCsvListener(): CsvListener {
if (!this.csvListener) {
const params = this.configService.get('csvListener');
this.csvListener = new CsvListener(
params,
this.schedulerRegistry,
'default_csv_listener',
this.issuesService,
this.redmineDataLoader,
);
}
return this.csvListener;
}
}
@Injectable()
export class CsvListener implements EventsListener {
private logger = new Logger(CsvListener.name);
issues: BehaviorSubject<number[]> = new BehaviorSubject([]);
constructor(
private params: CsvListenerParams,
private schedulerRegistry: SchedulerRegistry,
private jobPrefix: string,
private issuesService: IssuesService,
private redmineDataLoader: RedmineDataLoader,
) {
this.logger.log(
`Csv listener created with params - ${JSON.stringify(params)}`,
);
}
start(): void {
const tasks = this?.params?.tasks || [];
this.logger.debug(`Scheduling ${tasks.length} tasks`);
for (let i = 0; i < tasks.length; i++) {
const task = this.params.tasks[i];
const cronJobName = this.createCronJobName();
const cronJob = new CronJob(
task.schedule,
this.createLoader(task, cronJobName),
);
this.schedulerRegistry.addCronJob(cronJobName, cronJob);
cronJob.start();
}
}
stop(): void {
const jobs = this.schedulerRegistry.getCronJobs();
jobs.forEach((job, jobName) => {
if (this.isListenerCronJon(jobName)) {
job.stop();
this.schedulerRegistry.deleteCronJob(jobName);
}
});
}
private createCronJobName(): string {
return `${this.jobPrefix}_${randomUUID()}`;
}
private isListenerCronJon(name: string): boolean {
return name.startsWith(`${this.jobPrefix}_`);
}
private createLoader(task: Task, cronJobName: string): () => Promise<void> {
return async () => {
this.logger.log(
`Execute task ${cronJobName} ` +
`by schedule ${task.schedule} ` +
`with ${task.csvLinks.length} queries`,
);
this.logger.debug(`Queries - ${JSON.stringify(task.csvLinks)}`);
const csvIssuesStore = await this.loadCsv(task);
this.logger.debug(
`Loaded from csv issues count - ${Object.keys(csvIssuesStore).length}`,
);
const existsIssuesStore = await this.getCachedIssues(csvIssuesStore);
const ids = this.filterIssueIdsForUpdate(
csvIssuesStore,
existsIssuesStore,
);
this.logger.debug(
`Issues for update ${ids.length} - ${JSON.stringify(ids)}`,
);
if (ids && ids.length > 0) {
this.issues.next(ids);
}
};
}
private async loadCsv(task: Task): Promise<CsvIssuesStore> {
const res: CsvIssuesStore = {};
for (let i = 0; i < task.csvLinks.length; i++) {
const csvLink = task.csvLinks[i];
const csvData = await this.redmineDataLoader.loadCsv(csvLink);
if (!csvData || csvData.length <= 0) continue;
for (let j = 0; j < csvData.length; j++) {
const issue = csvData[j];
const issueId = Object.values(issue)[0];
issue.id = Number(issueId);
issue.updatedAt = DateTime.fromFormat(
issue[task.updatedAtFieldName],
task.dateTimeFormat,
);
res[issueId] = issue;
}
}
return res;
}
private async getCachedIssues(
store: CsvIssuesStore,
): Promise<Record<number, RedmineTypes.ExtendedIssue>> {
const ids = Object.keys(store).map((k) => Number(k));
const issues = await this.issuesService.getIssues(ids);
return issues.reduce((acc, issue) => {
acc[issue.id] = issue;
return acc;
}, {});
}
private filterIssueIdsForUpdate(
csvIssues: CsvIssuesStore,
existsIssues: Record<number, RedmineTypes.ExtendedIssue>,
): number[] {
const res: number[] = [];
for (const [issueId, issue] of Object.entries(csvIssues)) {
const id = Number(issueId);
if (!existsIssues[id]) {
res.push(id);
continue;
}
const existingIssue = existsIssues[id];
const csvIssueTimestamp = issue.updatedAt?.isValid
? issue.updatedAt.toMillis()
: null;
const existingIssueTimestamp =
this.getTimestampFromCachedIssue(existingIssue);
if (!existingIssueTimestamp) {
res.push(id);
continue;
}
if (existingIssueTimestamp < csvIssueTimestamp) {
res.push(id);
continue;
}
}
return res;
}
private getTimestampFromCachedIssue(
issue: RedmineTypes.ExtendedIssue,
): number | null {
if (
typeof issue.updated_on_timestamp === 'number' &&
issue.updated_on_timestamp > 0
) {
return issue.updated_on_timestamp;
}
let dt = DateTime.fromISO(issue.updated_on);
if (dt.isValid) {
return dt.toMillis();
}
if (
typeof issue.created_on_timestamp === 'number' &&
issue.created_on_timestamp > 0
) {
return issue.created_on_timestamp;
}
dt = DateTime.fromISO(issue.created_on);
if (dt.isValid) {
return dt.toMillis();
}
return null;
}
}

View file

@ -37,6 +37,9 @@ import { ListIssuesByFieldsWidgetDataLoaderService } from './dashboards/widget-d
import { WidgetsCollectionService } from './dashboards/widgets-collection.service'; import { WidgetsCollectionService } from './dashboards/widgets-collection.service';
import { DashboardsController } from './dashboards/dashboards.controller'; import { DashboardsController } from './dashboards/dashboards.controller';
import { CalendarWidgetDataLoaderService } from './dashboards/widget-data-loader/calendar.widget-data-loader.service'; import { CalendarWidgetDataLoaderService } from './dashboards/widget-data-loader/calendar.widget-data-loader.service';
import { ScheduleModule } from '@nestjs/schedule';
import { CsvListenerFactory } from './csvlistener/csv-listener';
import { RootIssueListenerFactory } from './rootissuelistener/root-issue-listener';
@Module({}) @Module({})
export class EventEmitterModule implements OnModuleInit { export class EventEmitterModule implements OnModuleInit {
@ -45,6 +48,7 @@ export class EventEmitterModule implements OnModuleInit {
module: EventEmitterModule, module: EventEmitterModule,
imports: [ imports: [
ConfigModule.forRoot({ load: [() => params?.config || MainConfig()] }), ConfigModule.forRoot({ load: [() => params?.config || MainConfig()] }),
ScheduleModule.forRoot(),
], ],
providers: [ providers: [
EventEmitterService, EventEmitterService,
@ -106,6 +110,8 @@ export class EventEmitterModule implements OnModuleInit {
DashboardsDataService, DashboardsDataService,
WidgetsCollectionService, WidgetsCollectionService,
CalendarWidgetDataLoaderService, CalendarWidgetDataLoaderService,
CsvListenerFactory,
RootIssueListenerFactory,
], ],
exports: [ exports: [
EventEmitterService, EventEmitterService,
@ -162,9 +168,16 @@ export class EventEmitterModule implements OnModuleInit {
constructor( constructor(
private redmineEventsGateway: RedmineEventsGateway, private redmineEventsGateway: RedmineEventsGateway,
private redmineIssuesCacheWriterService: RedmineIssuesCacheWriterService, private redmineIssuesCacheWriterService: RedmineIssuesCacheWriterService,
private csvListenerFactory: CsvListenerFactory,
private rootIssueListenerFactory: RootIssueListenerFactory,
) {} ) {}
onModuleInit() { onModuleInit() {
const csvListener = this.csvListenerFactory.getCsvListener();
this.redmineEventsGateway.appendAndInitListener(csvListener);
const rootIssueListener =
this.rootIssueListenerFactory.getRootIssueListener();
this.redmineEventsGateway.appendAndInitListener(rootIssueListener);
const queue = this.redmineEventsGateway.getIssuesChangesQueue(); const queue = this.redmineEventsGateway.getIssuesChangesQueue();
const subj = queue.queue; const subj = queue.queue;
subj.subscribe(async (issues: RedmineTypes.Issue[]) => { subj.subscribe(async (issues: RedmineTypes.Issue[]) => {

View file

@ -134,25 +134,29 @@ export class RedmineEventsGateway {
return this.rssListener; return this.rssListener;
} }
private listener: EventsListener | null | undefined; private listeners: EventsListener[];
private getMainListener(): EventsListener | null { private getMainListener(): EventsListener[] {
if (typeof this.listener !== 'undefined') { if (!this.listeners) {
return this.listener; this.listeners = [
this.getMailListener(),
this.getRssListener(),
// this.getCsvListener(),
];
this.listeners.forEach((l) => l && l.start && l.start());
}
return this.listeners;
} }
const mailListener = this.getMailListener(); appendAndInitListener(eventListener: EventsListener): void {
const rssListener = this.getRssListener(); const listeners = this.getMainListener();
if (mailListener) { if (listeners.indexOf(eventListener) < 0) {
this.listener = mailListener; this.listeners.push(eventListener);
} else if (rssListener) { eventListener.start();
this.listener = rssListener; const issuesChangesQueue = this.getIssuesChangesQueue();
} else { eventListener.issues.subscribe((issues) => {
this.listener = null; issuesChangesQueue.add(issues);
});
} }
if (this.listener) {
this.listener.start();
}
return this.listener;
} }
private initWebSocketsSendData(): void { private initWebSocketsSendData(): void {
@ -170,7 +174,7 @@ export class RedmineEventsGateway {
} }
private initChangesLogging(): void { private initChangesLogging(): void {
if (this.listener) { if (this.listeners && this.listeners.length > 0) {
this.getIssuesChangesQueue().queue.subscribe((data) => { this.getIssuesChangesQueue().queue.subscribe((data) => {
const issues = data.map((issue) => { const issues = data.map((issue) => {
return `${issue['id']} - ${issue['subject']}`; return `${issue['id']} - ${issue['subject']}`;
@ -181,15 +185,19 @@ export class RedmineEventsGateway {
} }
private initRedmineEventsGateway(): boolean { private initRedmineEventsGateway(): boolean {
const listener = this.getMainListener(); const listeners = this.getMainListener();
if (!listener) { if (!listeners || listeners.length <= 0) {
this.logger.error('Listener not created'); this.logger.error('Listener not created');
return false; return false;
} }
const issuesChangesQueue = this.getIssuesChangesQueue(); const issuesChangesQueue = this.getIssuesChangesQueue();
listener.issues.subscribe((issues) => { listeners.forEach((l) => {
l &&
l.issues &&
l.issues.subscribe((issues) => {
issuesChangesQueue.add(issues); issuesChangesQueue.add(issues);
}); });
});
this.initWebSocketsSendData(); this.initWebSocketsSendData();
this.initWebHooksSendData(); this.initWebHooksSendData();
this.initChangesLogging(); this.initChangesLogging();

View file

@ -3,10 +3,13 @@ import { Injectable, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config'; import { ConfigService } from '@nestjs/config';
import { RedmineTypes } from '../models/redmine-types'; import { RedmineTypes } from '../models/redmine-types';
import { EnhancerService } from '../issue-enhancers/enhancer.service'; import { EnhancerService } from '../issue-enhancers/enhancer.service';
import { parse as csvParse } from 'csv/sync';
@Injectable() @Injectable()
export class RedmineDataLoader { export class RedmineDataLoader {
urlPrefix: string; urlPrefix: string;
redmineUrl: string;
redmineToken: string;
private logger = new Logger(RedmineDataLoader.name); private logger = new Logger(RedmineDataLoader.name);
@ -15,6 +18,8 @@ export class RedmineDataLoader {
private enhancerService: EnhancerService, private enhancerService: EnhancerService,
) { ) {
this.urlPrefix = this.configService.get<string>('redmineUrlPrefix'); this.urlPrefix = this.configService.get<string>('redmineUrlPrefix');
this.redmineUrl = this.configService.get<string>('redmineUrlPublic');
this.redmineToken = this.configService.get<string>('redmineToken');
} }
async loadIssues(issues: number[]): Promise<(RedmineTypes.Issue | null)[]> { async loadIssues(issues: number[]): Promise<(RedmineTypes.Issue | null)[]> {
@ -22,7 +27,10 @@ export class RedmineDataLoader {
return Promise.all(promises); return Promise.all(promises);
} }
async loadIssue(issueNumber: number): Promise<RedmineTypes.Issue | null> { async loadIssue(
issueNumber: number,
skipEnhancers = false,
): Promise<RedmineTypes.Issue | null> {
const url = this.getIssueUrl(issueNumber); const url = this.getIssueUrl(issueNumber);
let resp; let resp;
try { try {
@ -41,6 +49,7 @@ export class RedmineDataLoader {
this.logger.debug( this.logger.debug(
`Loaded issue, issueNumber = ${issueNumber}, subject = ${resp.data.issue.subject}`, `Loaded issue, issueNumber = ${issueNumber}, subject = ${resp.data.issue.subject}`,
); );
if (skipEnhancers) return resp.data.issue;
let enhancedIssue; let enhancedIssue;
try { try {
enhancedIssue = await this.enhancerService.enhanceIssue(resp.data.issue); enhancedIssue = await this.enhancerService.enhanceIssue(resp.data.issue);
@ -58,7 +67,7 @@ export class RedmineDataLoader {
} }
async loadUser(userNumber: number): Promise<RedmineTypes.User | null> { async loadUser(userNumber: number): Promise<RedmineTypes.User | null> {
if (userNumber <= 0) { if (typeof userNumber !== 'number' || userNumber <= 0) {
this.logger.warn(`Invalid userNumber = ${userNumber}`); this.logger.warn(`Invalid userNumber = ${userNumber}`);
return null; return null;
} }
@ -94,4 +103,34 @@ export class RedmineDataLoader {
private getUserUrl(userNumber: number): string { private getUserUrl(userNumber: number): string {
return `${this.urlPrefix}/users/${userNumber}.json`; return `${this.urlPrefix}/users/${userNumber}.json`;
} }
async loadCsv(
urlQuery: string,
csvParserParams?: any,
): Promise<Record<string, any>[]> {
if (!csvParserParams) {
csvParserParams = {
delimiter: ';',
quote: '"',
columns: true,
skip_empty_lines: true,
};
}
const resp = await fetch(urlQuery, {
headers: {
'X-Redmine-API-Key': this.redmineToken,
},
});
const rawData = await resp.text();
let res;
try {
res = csvParse(rawData, csvParserParams);
} catch (ex) {
this.logger.error(
`Error at loading csv from redmine, query - ${urlQuery}, ex - ${ex}`,
);
return null;
}
return res;
}
} }

View file

@ -0,0 +1,149 @@
import { Injectable, Logger } from '@nestjs/common';
import { EventsListener } from '../events/events-listener';
import { BehaviorSubject } from 'rxjs';
import { SchedulerRegistry } from '@nestjs/schedule';
import { RedmineDataLoader } from '../redmine-data-loader/redmine-data-loader';
import { randomUUID } from 'crypto';
import { RedmineTypes } from '../models/redmine-types';
import { CronJob } from 'cron';
import { ConfigService } from '@nestjs/config';
export type Task = {
schedule: string;
rootIssues: number[];
};
export type RootIssueListenerParams = {
tasks: Task[];
};
@Injectable()
export class RootIssueListenerFactory {
private rootIssueListener: RootIssueListener;
constructor(
private configService: ConfigService,
private schedulerRegistry: SchedulerRegistry,
private redmineDataLoader: RedmineDataLoader,
) {}
getRootIssueListener(): RootIssueListener {
if (!this.rootIssueListener) {
const params = this.configService.get('rootIssueListener');
this.rootIssueListener = new RootIssueListener(
params,
this.schedulerRegistry,
'default_root_issue_listener',
this.redmineDataLoader,
);
}
return this.rootIssueListener;
}
}
@Injectable()
export class RootIssueListener implements EventsListener {
private logger = new Logger(RootIssueListener.name);
issues: BehaviorSubject<number[]> = new BehaviorSubject([]);
constructor(
private params: RootIssueListenerParams,
private schedulerRegistry: SchedulerRegistry,
private jobPrefix: string,
private redmineDataLoader: RedmineDataLoader,
) {
this.logger.log(
`Root issue listener created with params - ${JSON.stringify(params)}`,
);
}
start(): void {
const tasks = this?.params?.tasks || [];
this.logger.debug(`Scheduling ${tasks.length} tasks`);
for (let i = 0; i < tasks.length; i++) {
const task = tasks[i];
const cronJobName = this.createCronJobName();
const cronJob = new CronJob(
task.schedule,
this.createLoader(task, cronJobName),
);
this.schedulerRegistry.addCronJob(cronJobName, cronJob);
cronJob.start();
}
}
stop(): void {
const jobs = this.schedulerRegistry.getCronJobs();
jobs.forEach((job, jobName) => {
if (this.isListenerCronJon(jobName)) {
job.stop();
this.schedulerRegistry.deleteCronJob(jobName);
}
});
}
private createCronJobName(): string {
return `${this.jobPrefix}_${randomUUID()}`;
}
private isListenerCronJon(name: string): boolean {
return name.startsWith(`${this.jobPrefix}_`);
}
private createLoader(task: Task, cronJobName: string): () => Promise<void> {
return async () => {
this.logger.log(
`Execute task ${cronJobName} ` +
`by schedule ${task.schedule} ` +
`with ${task.rootIssues.length} root issues`,
);
const issuesStore = await this.getRootIssuesFromRedmine(task);
this.logger.debug(
`Loaded root issues ` +
`${Object.keys(issuesStore).length} - ` +
`${JSON.stringify(Object.keys(issuesStore).map((i) => Number(i)))}`,
);
const ids = this.getAllRootIssueIds(Object.values(issuesStore), []);
this.logger.debug(
`Issues for update ${ids.length} - ${JSON.stringify(ids)}`,
);
if (ids && ids.length > 0) {
this.issues.next(ids);
}
};
}
private async getRootIssuesFromRedmine(
task: Task,
): Promise<Record<number, RedmineTypes.ExtendedIssue>> {
const res = {};
const SKIP_ENHANCERS = true;
for (let i = 0; i < task.rootIssues.length; i++) {
const issueId = task.rootIssues[i];
if (typeof issueId !== 'number' || issueId <= 0) continue;
const issue = await this.redmineDataLoader.loadIssue(
issueId,
SKIP_ENHANCERS,
);
if (issue) {
res[issueId] = issue;
}
}
return res;
}
private getAllRootIssueIds(
issues: RedmineTypes.ExtendedIssue[] | RedmineTypes.Children,
res: number[],
): number[] {
for (let i = 0; i < issues.length; i++) {
const issue = issues[i];
if (issue.children && issue.children.length > 0) {
res.push(issue.id);
this.getAllRootIssueIds(issue.children, res);
}
}
return res;
}
}