pinkmine/libs/event-emitter/src/queue/queue.ts

56 lines
1.2 KiB
TypeScript

import { Subject } from 'rxjs';
export class Queue<T, NT> {
private items: T[] = [];
queue: Subject<NT[]> = new Subject<NT[]>();
finished: Subject<boolean> = new Subject();
constructor(
private updateInterval: number,
private itemsLimit: number,
private transformationFn: (arg: Array<T>) => Promise<Array<NT>>,
) {}
add(values: T[]): void {
this.items.push(...values);
}
isItemExists(value: T): boolean {
return this.items.indexOf(value) >= 0;
}
start(): void {
this.update();
}
stop(): void {
clearTimeout(this.updateTimeout);
this.updateTimeout = null;
}
getQueueSize(): number {
return this.items.length;
}
getItems(): T[] {
return this.items;
}
private updateTimeout;
private async update(): Promise<void> {
if (this.items.length > 0) {
const items = this.items.splice(0, this.itemsLimit);
const transformedItems = await this.transformationFn(items);
this.queue.next(transformedItems);
if (this.items.length <= 0) {
this.finished.next(true);
}
}
this.updateTimeout = setTimeout(() => {
this.update();
}, this.updateInterval);
}
}