import { Observable, Operator, OperatorFunction, Subscriber } from 'rxjs';

class BufferWithSizeSubscriber<T> extends Subscriber<T> {
    private buffer: T[] = [];

    constructor(
        public override destination: Subscriber<T[]>,
        private bufferSize: number,
        closingNotifier: Observable<any>,
    ) {
        super(destination);
        closingNotifier.subscribe(() => {
            this.emitAndClearBuffer(this.buffer);
        });
    }

    private emitAndClearBuffer(buffer: T[]): void {
        this.destination.next(buffer);
        this.buffer = [];
    }

    protected override _next(value: T): void {
        const buffer = this.buffer;
        buffer.push(value);

        if (buffer.length === this.bufferSize) {
            this.emitAndClearBuffer(buffer);
        }
    }

    protected override _complete(): void {
        const buffer = this.buffer;
        if (buffer.length > 0) {
            this.destination.next(buffer);
        }
        super._complete();
    }
}

class BufferWithSizeOperator<T> implements Operator<T, T[]> {
    constructor(
        private bufferSize: number,
        private closingNotifier: Observable<any>,
    ) {}

    call(subscriber: Subscriber<T[]>, source: any): any {
        return source.subscribe(new BufferWithSizeSubscriber(subscriber, this.bufferSize, this.closingNotifier));
    }
}

/**
 * Combination of buffer() and bufferCount()
 *
 * @param bufferSize Argument of bufferCount()
 * @param closingNotifier Argument of buffer()
 */
export function bufferWithSize<T>(bufferSize: number, closingNotifier: Observable<any>): OperatorFunction<T, T[]> {
    return function bufferWithSizeOperatorFunction(source: Observable<T>): Observable<T[]> {
        return source.lift(new BufferWithSizeOperator<T>(bufferSize, closingNotifier));
    };
}
