pinkmine/libs/event-emitter/src/queue/queue.ts
Pavel Gnedov 3876a60eb2 init
2022-04-07 10:01:56 +07:00

45 lines
No EOL
953 B
TypeScript

import { Subject } from "rxjs";
export class Queue<T, NT> {
private items: T[] = [];
queue: Subject<NT[]> = new Subject<NT[]>();
constructor(
private updateInterval: number,
private itemsLimit: number,
private transformationFn: ((arg: Array<T>) => Promise<Array<NT>>)
) {}
add(values: T[]): void {
this.items.push(...values);
}
isItemExists(value: T): boolean {
return this.items.indexOf(value) >= 0;
}
start(): void {
this.update();
}
stop(): void {
clearTimeout(this.updateTimeout);
this.updateTimeout = null;
}
private updateTimeout;
private async update(): Promise<void> {
if (this.items.length > 0) {
const items = this.items.splice(0, this.itemsLimit);
const transformedItems = await this.transformationFn(items);
this.queue.next(transformedItems);
}
this.updateTimeout = setTimeout(() => {
this.update();
}, this.updateInterval);
}
}