import { Injectable } from '@angular/core';
import { BehaviorSubject, Observable, of, Subscriber, throwError, timer } from 'rxjs';
import { catchError, filter, first, switchMap, tap, withLatestFrom } from 'rxjs/operators';
import { Stomp, Client } from '@stomp/stompjs';
import { AuthServerProvider } from 'app/shared';
import { JmsSubscription } from './jms-socket-subscription';
import { Store } from '@ngrx/store';

import * as fromRoot from 'app/app.reducer';
import { MatSnackBar } from '@angular/material/snack-bar';
import { HttpErrorResponse } from '@angular/common/http';

const JWT_USER_LOGIN = 'jwtUser';
const WARN_MESSAGE_SIZE = 10 ** 6;
@Injectable({
  providedIn: 'root',
})
export class JmsWebSocketConnectionVendor {
  private errorMessageDisplay = true;
  private stompConnection: BehaviorSubject<Client> = new BehaviorSubject(null);
  private connectionStatus: BehaviorSubject<boolean> = new BehaviorSubject(false);

  constructor(
    private authService: AuthServerProvider,
    private store: Store,
    private matSnackbar: MatSnackBar,
  ) {}

  /**
   * Connection should be provided to subscriber class for listening.
   * Method createSub can be used for convinience
   * Method send should be used for sending messages
   * @return Null safe connected stomp client
   */
  getConnection(): Observable<Client> {
    // this.stompConnection.value.unsubscribe();
    return this.stompConnection.asObservable().pipe(
      tap((conn) => {
        if (!conn?.connected) console.warn('Connection is not active.', conn);
      }),
      filter((conn) => conn != null && conn?.connected),
    );
  }

  isConnected(): Observable<boolean> {
    return this.connectionStatus.asObservable();
  }

  send(destination: string, body: any, headers = { 'content-type': 'text/plain' }) {
    if (body == null) throw new Error('JMS message body cannot be null');
    this.getConnection()
      .pipe(first())
      .subscribe((conn) => {
        const messageContent = JSON.stringify(body);

        if (messageContent.length > WARN_MESSAGE_SIZE) {
          console.warn('Warning: Message is larger then: ', WARN_MESSAGE_SIZE);
        }

        conn.publish({ destination, headers, body: messageContent });
      });
  }

  createSub(
    destination: string | Observable<string>,
    /**/ options: {
      skipParsingBody: boolean;
      headers: Record<string, string>;
    } = { skipParsingBody: false, headers: {} },
  ) {
    return new JmsSubscription(this.getConnection(), destination, options);
  }

  disconnect() {
    if (!this.stompConnection?.value) return;
    // eslint-disable-next-line @typescript-eslint/no-empty-function
    this.stompConnection.value.onWebSocketClose = () => {};
    this.stompConnection.value?.deactivate();
  }

  establishConnection(): void {
    this.createConnection().subscribe((connection) => {
      connection.onWebSocketClose = () => {
        if (connection.connected) {
          connection.deactivate();
        }
        this.establishConnection();
      };
      
      this.stompConnection.next(connection);
    });
  }

  private createConnection(): Observable<Client> {
    return this.authService.fetchToken().pipe(
      withLatestFrom(this.store.select(fromRoot.getActiveMqAddress)),
      switchMap(([{ token }, activeMqAddress]: any) =>
        this.createStompConnection(activeMqAddress, {
          login: JWT_USER_LOGIN,
          passcode: token,
        }).pipe(
          catchError(() =>
            timer(2000).pipe(
              switchMap(() => {
                if (this.errorMessageDisplay) {
                  this.errorMessageDisplay = false;
                  this.connectionStatus.next(false);
                  this.matSnackbar.open('No Server connection. Attempting to connect.', 'OK', {
                    duration: 5000,
                    horizontalPosition: 'center',
                    verticalPosition: 'top',
                  });
                }
                return this.createConnection();
              }),
            ),
          ),
        ),
      ),
      catchError((err: HttpErrorResponse) => {
        this.connectionStatus.next(false);
        if (err.status === 401) {
          console.error('Cannot refresh token as it had expired.');
          return of(null);
        }

        return timer(2000).pipe(
          switchMap(() => {
            if (this.errorMessageDisplay) {
              this.errorMessageDisplay = false;
              this.connectionStatus.next(false);
              this.matSnackbar.open('No Server connection. Attempting to connect.', 'OK', {
                duration: 5000,
                horizontalPosition: 'center',
                verticalPosition: 'top',
              });
            }
            return this.createConnection();
          }),
        );
      }),
    );
  }

  private createStompConnection(address: string, credentials: { login: string; passcode: string }): Observable<Client> {
    try {
      const socket = new WebSocket(this.getWSAddress(address), 'stomp');

      const stompClient = Stomp.over(() => socket);
      // eslint-disable-next-line @typescript-eslint/no-empty-function
      stompClient.debug = () => {};
      stompClient.heartbeat = { incoming: 50000, outgoing: 50000 };

      return new Observable((observer: Subscriber<Client>) => {
        const onError = (event) => {
          observer.error(event);
          socket.removeEventListener('error', onError);
        };

        socket.addEventListener('error', onError);

        stompClient.connect(
          credentials,
          () => {
            if (!this.errorMessageDisplay)
              { this.matSnackbar.open('Server connection has been reestablished', 'OK', {
                duration: 5000,
                horizontalPosition: 'center',
                verticalPosition: 'top',
              }); }
            this.connectionStatus.next(true);
            this.errorMessageDisplay = true;
            observer.next(stompClient);
            observer.complete();
          },
          (err) => {
            console.error(err);
            observer.error(err);
          },
          () => {
            socket.removeEventListener('error', onError);
          },
        );
      });
    } catch (e) {
      this.connectionStatus.next(false);
      return throwError(() => e);
    }
  }

  private getWSAddress(address: string) {
    if (address.startsWith('ws://') || address.startsWith('wss://')) return address;

    const isHttps = location.href.startsWith('https://');

    return (isHttps ? 'wss://' : 'ws://') + location.host + (address.startsWith('/') ? '' : '/') + address;
  }
}
