import { Client, IMessage, StompSubscription } from '@stomp/stompjs';
import { Observable, Subject, Subscriber, Subscription } from 'rxjs';
import { first, filter, takeUntil } from 'rxjs/operators';

/**
 * Jms subscription wrapper class. Requires connection from another service.
 * Destinations can be acquired from JMS destination Factory. JSON parsing body
 * is active by default. Can be turned off within configuration.If parsing is
 * not active subscription will pass stomp message to the callback. Observables
 * react only on first non null values.
 */
export class JmsSubscription {
  private subscribed = false;

  private destinationSubscription: StompSubscription;
  private loadingDestinationSubscription: Subscription;
  private loadingConnectionSubscription: Subscription;

  /**
   * Passing next value to this subject cancels all subscriptions
   * created by the instance of this class
   */
  private destroy$ = new Subject<boolean>();

  constructor(
    private connection: Observable<Client>,
    private _destination: string | Observable<string>,
    private options: {
      skipParsingBody: boolean;
      headers: Record<string, string>;
    } = { skipParsingBody: false, headers: {} },
  ) {}

  get destination() {
    return this._destination;
  }

  get subscriptionId(): string {
    return this.destinationSubscription?.id;
  }

  /**
   * Returns stream of message bodies from the set destination. Message bodies are JSON parsed by default.
   * Unsubscribing should be performed via unsubscribe method
   */
  startSub<T = any>(): Observable<T> {
    if (this.subscribed) throw new Error('Jms socket object is already in use.');
    this.loadingDestinationSubscription?.unsubscribe();
    this.loadingConnectionSubscription?.unsubscribe();

    return new Observable((subscriber: Subscriber<T>) => this.handleSubscription(subscriber)).pipe(
      takeUntil(this.destroy$),
    );
  }

  unsubscribe(): void {
    this.loadingDestinationSubscription?.unsubscribe();
    this.loadingConnectionSubscription?.unsubscribe();
    if (!this.subscribed) return;
    this.destinationSubscription.unsubscribe();
    this.destroy$.next(true);
    this.subscribed = false;
  }

  private handleSubscription(subscriber: Subscriber<any>) {
    this.loadingConnectionSubscription = this.connection
      .pipe(filter((connectionObject) => connectionObject != null && connectionObject.webSocket != null))
      .subscribe(
        (connectionObject) => {
          if (this.subscribed) {
            // need this ??
            //if already closed
            if(connectionObject.webSocket.readyState != 1) {
            this.destinationSubscription.unsubscribe();
            }
            this.subscribed = false;
          }

          if (typeof this._destination === 'string') {
            this.destinationSubscription = this.subscribeToDestination(connectionObject, this._destination, subscriber);
            this.subscribed = true;
            return;
          }

          this.loadingDestinationSubscription = this._destination
            .pipe(
              filter((resolvedDestination) => resolvedDestination != null),
              first(),
            )
            .subscribe(
              (resolvedDestination) => {
                this.destinationSubscription = this.subscribeToDestination(
                  connectionObject,
                  resolvedDestination,
                  subscriber,
                );
                this.subscribed = true;
              },
              (err) => {
                throw new Error('Jms socket destination observable error. Error message: ' + err);
              },
            );
        },
        (err) => {
          throw new Error('Jms socket connection is not available/error. Error message: ' + err);
        },
      );
  }

  private subscribeToDestination(
    connectionObject: Client,
    destination: string,
    /* */ subscriber: Subscriber<any>,
  ): StompSubscription {
    return connectionObject.subscribe(
      destination,
      (message: IMessage) => {
        if (this.options.skipParsingBody) {
          subscriber.next(message);
          return;
        }
        subscriber.next(this.parseBody(message));
      },
      this.options.headers || {},
    );
  }



  private parseBody(message: IMessage): any {
    try {
      return JSON.parse(message.body);
    } catch {
      console.error('Error parsing body message body', message);
      return null;
    }
  }
}
