import { MessageEnvelope, MessageType } from '@cds/push';
import { flatten } from 'lodash-es';
import { EMPTY, Observable, isObservable } from 'rxjs';
import { catchError, first } from 'rxjs/operators';

import { CdsPushService, CdsSubscription } from '../cds/cds-push.service';
import { BaseEventModel } from '../event-model/model/event.model';

export enum SubscriptionTopic {
    Grid,
    NonGridable,
    Specials,
    Outrights,
    All,
}

export type EventCallback<T extends BaseEventModel> = (event: T, message: MessageEnvelope) => void;
export type MessageCallback = (message: MessageEnvelope) => void;
export type MessageHandler<T extends BaseEventModel> = (event: T, payload: any) => T | Promise<T> | Observable<T>;
export interface EventSubscriptionRequest<T extends BaseEventModel> {
    event: T;
    callback?: EventCallback<T>;
}

export class EventSubscription<T extends BaseEventModel> {
    constructor(
        private subscriptions: Map<string, CdsSubscription[]>,
        private teardown: (subscriptions: CdsSubscription[]) => void,
    ) {}

    unsubscribe(): void {
        if (this.subscriptions.size) {
            this.teardown(flatten([...this.subscriptions.values()]));
        }
    }

    add(other: EventSubscription<T>): EventSubscription<T> {
        return new EventSubscription(this.merge(this.subscriptions, other.subscriptions), this.teardown);
    }

    remove(events: (T | string)[]): void {
        const subscriptions: CdsSubscription[] = [];

        for (const event of events) {
            const id = typeof event === 'string' ? event : event.id;
            const current = this.subscriptions.get(id);

            if (current) {
                this.subscriptions.delete(id);
                subscriptions.push(...current);
            }
        }

        this.teardown(subscriptions);
    }

    private merge(actual: Map<string, CdsSubscription[]>, other: Map<string, CdsSubscription[]>): Map<string, CdsSubscription[]> {
        const merged = new Map<string, CdsSubscription[]>();

        for (const map of [actual, other]) {
            for (const [id, subscriptions] of map) {
                const current = merged.get(id);

                if (current) {
                    current.push(...subscriptions);
                } else {
                    merged.set(id, subscriptions);
                }
            }
        }

        return merged;
    }
}

export abstract class BaseSubscriptionService<T extends BaseEventModel> {
    private handlers = new Map<MessageType, MessageHandler<T>>();

    protected constructor(private push: CdsPushService) {
        this.handlers = this.getHandlers();

        if (!this.handlers || this.handlers.size === 0) {
            throw new Error('No handler registered');
        }
    }

    subscribe(request: EventSubscriptionRequest<T>[]): EventSubscription<T> {
        const subscriptions = new Map<string, CdsSubscription[]>();

        for (const current of request) {
            let source = subscriptions.get(current.event.id);

            if (!source) {
                source = [];
                subscriptions.set(current.event.id, source);
            }

            source.push(...this.getEventSubscriptions(current));
        }

        if (subscriptions.size) {
            this.push.subscribe(flatten([...subscriptions.values()]));
        }

        return new EventSubscription(subscriptions, this.push.unsubscribe.bind(this.push));
    }

    protected abstract getHandlers(): Map<MessageType, MessageHandler<T>>;
    protected abstract getContextSubscriptions(event: T, callback: MessageCallback): CdsSubscription[];

    private getEventSubscriptions({ event, callback }: EventSubscriptionRequest<T>): CdsSubscription[] {
        const subscriptionHandler = async (message: MessageEnvelope) => {
            const handler = this.handlers.get(message.messageType);

            if (handler) {
                try {
                    const result = handler(event, message.payload);
                    const apply = (updated: T) => callback?.(updated, message);
                    const error = (reason: any) => console.error('Can not apply update to eventModel', reason);

                    if (isObservable(result)) {
                        result
                            .pipe(
                                first(),
                                catchError((reason) => {
                                    error(reason);

                                    return EMPTY;
                                }),
                            )
                            .subscribe(apply);
                    } else if (result instanceof Promise) {
                        Promise.resolve(result).then(apply).catch(error);
                    } else {
                        apply(result);
                    }
                } catch (error) {
                    console.group('Can not apply update to eventModel');
                    console.error(message);
                    console.error(error);
                    console.groupEnd();
                }
            } else {
                console.warn('No handler registered for the command', message);
            }
        };

        return this.getContextSubscriptions(event, subscriptionHandler);
    }
}
