pinkmine/libs/event-emitter/src/events/redmine-events.gateway.ts

208 lines
6.4 KiB
TypeScript

import {
MessageBody,
SubscribeMessage,
WebSocketGateway,
WebSocketServer,
WsResponse,
} from '@nestjs/websockets';
import { Server } from 'socket.io';
import { Observable } from 'rxjs';
import { map } from 'rxjs/operators';
import { MailListener } from '../maillistener/maillistener';
import { Queue } from '../queue/queue';
import { RedmineDataLoader } from '../redmine-data-loader/redmine-data-loader';
import { RedmineIssueData } from '../models/RedmineIssueData';
import { ConfigService } from '@nestjs/config';
import axios from 'axios';
import { WebhookConfigItemModel } from '../models/webhook-config-item-model';
import { RssListener } from '../rsslistener/rsslistener';
import { EventsListener } from './events-listener';
import { Logger } from '@nestjs/common';
import { RedmineTypes } from '../models/redmine-types';
type IssuesChangesQueueParams = {
updateInterval: number;
itemsLimit: number;
};
@WebSocketGateway({ namespace: 'redmine-events' })
export class RedmineEventsGateway {
@WebSocketServer()
server: Server;
issuesChangesObservable: Observable<WsResponse<RedmineIssueData[]>>;
private issueNumberParser: RegExp;
private issuesChangesQueueParams: IssuesChangesQueueParams;
private logger = new Logger(RedmineEventsGateway.name);
constructor(
private config: ConfigService,
private redmineDataLoader: RedmineDataLoader,
) {
this.issuesChangesQueueParams =
this.config.get<IssuesChangesQueueParams>('issueChangesQueue');
this.initRedmineEventsGateway();
}
@SubscribeMessage('issues-changes')
issuesChanges(
// eslint-disable-next-line @typescript-eslint/no-unused-vars
@MessageBody() data: any,
): Observable<WsResponse<RedmineTypes.Issue[]>> {
return this.issuesChangesObservable;
}
private issuesChangesQueue: Queue<number, RedmineTypes.Issue>;
getIssuesChangesQueue(): Queue<number, RedmineTypes.Issue> {
if (!this.issuesChangesQueue) {
this.issuesChangesQueue = new Queue<number, RedmineTypes.Issue>(
this.issuesChangesQueueParams.updateInterval,
this.issuesChangesQueueParams.itemsLimit,
async (issueNumbers) => {
let res;
try {
res = await this.redmineDataLoader.loadIssues(issueNumbers);
} catch (e) {
this.logger.error(
`Error load issues: ${e.message} ` +
`for issues: ${JSON.stringify(issueNumbers)}`,
);
return [];
}
return res;
},
);
this.issuesChangesQueue.start();
}
return this.issuesChangesQueue;
}
addIssues(issues: number[]): void {
issues.forEach((issue) => {
if (!this.issuesChangesQueue.isItemExists(issue)) {
this.issuesChangesQueue.add([issue]);
}
});
}
getQueueSize(): number {
return this.issuesChangesQueue.getQueueSize();
}
getQueueIssues(): number[] {
return this.issuesChangesQueue.getItems();
}
private sendWebHookFullDataEvents(data: RedmineIssueData[]): void {
const webhooks = this.config.get<WebhookConfigItemModel[]>('webhooks');
webhooks.forEach((webhook) => {
let config = undefined;
if (webhook.apiKeyName && webhook.apiKeyValue) {
config = { headers: {} };
config.headers[webhook.apiKeyName] = webhook.apiKeyValue;
}
axios.post(webhook.url, data, config).catch((err) => {
console.error('Error at webhook send request:', err);
});
});
}
private mailListener: MailListener | null | undefined;
private getMailListener(): MailListener | null {
if (typeof this.mailListener === 'undefined') {
const mailListenerParams = this.config.get<any>('mailListener');
if (mailListenerParams) {
this.mailListener = new MailListener(mailListenerParams);
} else {
this.mailListener = null;
}
}
return this.mailListener;
}
private rssListener: RssListener | null | undefined;
private getRssListener(): RssListener | null {
if (typeof this.rssListener === 'undefined') {
const rssListenerParams = this.config.get<any>('rssListener');
if (rssListenerParams) {
this.rssListener = new RssListener(rssListenerParams);
} else {
this.rssListener = null;
}
}
return this.rssListener;
}
private listeners: EventsListener[];
private getMainListener(): EventsListener[] {
if (!this.listeners) {
this.listeners = [
this.getMailListener(),
this.getRssListener(),
// this.getCsvListener(),
];
this.listeners.forEach((l) => l && l.start && l.start());
}
return this.listeners;
}
appendAndInitListener(eventListener: EventsListener): void {
const listeners = this.getMainListener();
if (listeners.indexOf(eventListener) < 0) {
this.listeners.push(eventListener);
eventListener.start();
const issuesChangesQueue = this.getIssuesChangesQueue();
eventListener.issues.subscribe((issues) => {
issuesChangesQueue.add(issues);
});
}
}
private initWebSocketsSendData(): void {
const queue: Queue<number, RedmineIssueData> = this.getIssuesChangesQueue();
this.issuesChangesObservable = queue.queue.pipe(
map((data) => {
return { event: 'issues-changes', data: data };
}),
);
}
private initWebHooksSendData(): void {
const queue: Queue<number, RedmineIssueData> = this.getIssuesChangesQueue();
queue.queue.subscribe((data) => this.sendWebHookFullDataEvents(data));
}
private initChangesLogging(): void {
if (this.listeners && this.listeners.length > 0) {
this.getIssuesChangesQueue().queue.subscribe((data) => {
const issues = data.map((issue) => {
return (issue && issue.id && issue.subject)
? `${issue['id']} - ${issue['subject']}`
: '';
});
this.logger.debug('Changed issues: ' + JSON.stringify(issues));
});
}
}
private initRedmineEventsGateway(): boolean {
const listeners = this.getMainListener();
if (!listeners || listeners.length <= 0) {
this.logger.error('Listener not created');
return false;
}
const issuesChangesQueue = this.getIssuesChangesQueue();
listeners.forEach((l) => {
l &&
l.issues &&
l.issues.subscribe((issues) => {
issuesChangesQueue.add(issues);
});
});
this.initWebSocketsSendData();
this.initWebHooksSendData();
this.initChangesLogging();
return true;
}
}