import { getArray } from '../API/axios';
import {
  Observable,
  Subject,
  createSubject,
  emitterToPromise,
  once,
} from '../utils/observable';
import { UntrustedEvent, untrustedEventSchema } from './types';

/**
 * The sole purpose of the Receiver is to receive comment events from the
 * remote endpoint.
 */
export class Receiver {
  private isClosed: boolean = false;
  private ws: WebSocket | null = null;
  private acknowledgedEvents: Set<string> = new Set();
  private eventsSubject: Subject<UntrustedEvent> = createSubject();

  constructor(
    private readonly params: {
      readonly objectRequestUrl: string;
      readonly eventStreamUrl: string;
    }
  ) {
    this.connect().catch(console.error);
  }

  private async connect() {
    if (this.isClosed) return;

    const onOpen = once<void>();

    let shouldSignal = false;

    this.ws = new WebSocket(this.params.eventStreamUrl);
    this.ws.addEventListener('close', () => {
      this.connect();
    });
    this.ws.addEventListener('open', async () => {
      onOpen.emit();
    });
    const events: UntrustedEvent[] = [];
    this.ws.addEventListener('message', (event: MessageEvent<unknown>) => {
      try {
        const parsed = JSON.parse(event.data as any) as unknown;
        const validation = untrustedEventSchema.validate(parsed);
        if (!validation.isValid) return;

        if (shouldSignal) {
          this.emit(validation.value);
        } else {
          events.push(validation.value);
        }
      } catch (e) {
        console.error(e);
      }
    });

    const [receivedEvents] = await Promise.all([
      getArray(this.params.objectRequestUrl),
      emitterToPromise(onOpen),
    ]);

    for (const event of receivedEvents) {
      const validation = untrustedEventSchema.validate(event);
      if (!validation.isValid) continue;
      events.push(validation.value);
    }

    shouldSignal = true;

    events.sort((a, b) => (a < b ? -1 : a > b ? 1 : 0));
    for (const event of events) {
      this.emit(event);
    }
  }

  get eventsStream(): Observable<UntrustedEvent> {
    return { subscribe: this.eventsSubject.subscribe };
  }

  private emit(event: UntrustedEvent) {
    if (this.acknowledgedEvents.has(event.eventId)) return;
    this.acknowledgedEvents.add(event.eventId);
    this.eventsSubject.emit(event);
  }

  close() {
    this.isClosed = true;
    this.ws?.close();
  }
}
