import {Injectable} from '@angular/core';
import {EventSourcePolyfill} from 'event-source-polyfill/src/eventsource.min.js';
import {KeycloakService} from 'keycloak-angular';
import {BehaviorSubject, Observable, Subject} from 'rxjs';
import {filter, map} from 'rxjs/operators';
import {environment} from '../../../environments/environment';
import {EventMessageType} from '../models/event-message-type.enum';
import {EventMessage} from '../models/event-message.model';

@Injectable({
  providedIn: 'root',
})
export class SseEventsService {
  private readonly serviceUrl: string = environment.baseUrl + '/events';
  private eventSource: EventSourcePolyfill = null;
  private readonly _events$: BehaviorSubject<EventMessage[]> = new BehaviorSubject<
    EventMessage[]
  >([]);

  private readonly _event$: Subject<EventMessage> = new Subject<EventMessage>();

  constructor(private readonly keycloak: KeycloakService) {
  }

  // ------------------------------------------------- INIT ---------------------------------------------------- //
  private initEventSource(): void {
    this.eventSource = new EventSourcePolyfill(this.serviceUrl, {
      headers: {
        'Request-Origin': 'et-adm',
        Authorization: 'Bearer ' + this.keycloak.getKeycloakInstance().token,
      },
      heartbeatTimeout: 1000000000,
      connectionTimeout: 5000,
    });
  }

  // -------------------------------------------------- SSE CONTROLS ------------------------------------------------------ //

  public startListening(): void {
    this.initEventSource();

    this.eventSource.onmessage = (event) => {
      const data = JSON.parse(event.data);

      const newEvents = this._events$.value;
      newEvents.push(data);
      this._events$.next(newEvents);

      this._event$.next(data);
    };

    this.eventSource.onerror = (event) => {
      console.error(event);
    };
  }

  public stopListening(): void {
    if (this.eventSource) {
      this.eventSource.close();
    }
  }

  // ------------------------------------------------- EVENT ACCESSOR ---------------------------------------------------- //

  public on(...types: EventMessageType[]): Observable<EventMessage> {
    return this._events$.asObservable().pipe(
      map((events: EventMessage[]) => {
        const typedEvents = events.filter((e) => {
          return types.includes(e.type);
        });

        // console.log(typedEvents);

        return typedEvents[typedEvents.length - 1];
      })
    );
  }

  public onSubject(type: EventMessageType, dataType?: string): Observable<EventMessage> {
    return this.onSubjects([type], dataType);
  }

  public onSubjects(types: EventMessageType[], dataType?: string): Observable<EventMessage> {
    return this._event$
      .asObservable()
      .pipe(
        filter((event: EventMessage) => types.includes(event.type) && (dataType ? dataType === event.dataType : true))
      );
  }
}
