/* eslint @typescript-eslint/no-explicit-any: 0 */

import * as error from '@mirage/shared/errors';
import * as rx from 'rxjs';
import * as op from 'rxjs/operators';

import type { Observable, Subject } from 'rxjs';

// implements observables across ipc by exposing: subscribe, next, error,
// unsubscrive, and complete through messages

type ControlPayload = { type: 'subscribe' | 'unsubscribe' | 'complete' };
type ValuePayload<T> = { type: 'next'; payload: T };
type ErrorPayload<E> = { type: 'error'; payload: InternalErrorPayload<E> };
type Payload<T, E> = ControlPayload | ValuePayload<T> | ErrorPayload<E>;

type InternalErrorPayload<E> =
  | { serialized: true; value: { [key: string]: any } }
  | { serialized: false; value: E };

function wrap<E>(raw: E): InternalErrorPayload<E> {
  if (raw instanceof Error) {
    return {
      serialized: true,
      value: error.serialize(raw),
    };
  }
  return { serialized: false, value: raw };
}

function unwrap<E>(payload: InternalErrorPayload<E>): E {
  const err = payload.serialized
    ? error.deserialize(payload.value)
    : payload.value;
  return err as E;
}

// sends observable over tx/rx pair when subscribed to
export function send<T, E>(
  tx$: Subject<Payload<T, E>>,
  rx$: Observable<Payload<T, E>>,
  observable$: Observable<T>,
) {
  // setup an observable to look for unsubscribe messages coming back across
  const sub$ = rx$.pipe(op.filter(({ type }) => type === 'subscribe'));
  const unsub$: Observable<ControlPayload> = rx$.pipe(
    op.filter(
      (message): message is ControlPayload => message.type === 'unsubscribe',
    ),
  );

  // wrap emissions from the observable in control data
  // XXX: the error capture placement is important in this context. if we
  // capture inside the concat we will emit both a completion event as well
  // as an error event which is not to be expected
  const mapped$ = rx
    .concat(
      observable$.pipe(op.map((payload) => ({ type: 'next', payload }))),
      rx.of({ type: 'complete' }),
    )
    .pipe(
      op.catchError((payload) => {
        return rx.of({
          type: 'error',
          payload: wrap(payload),
        });
      }),
    )
    // kill our subscription by listening for an unsubscription
    .pipe(op.takeUntil(unsub$));

  // use the concat + skip to wait for a subscribe before subscribing to the
  // provided observable and sending the values across to the receiver
  rx.concat(sub$.pipe(op.take(1)), mapped$)
    .pipe(op.skip(1)) // skip subscribe emission
    .subscribe((value) => tx$.next(value as Payload<T, E>));
}

export function receive<T, E>(
  tx$: Subject<Payload<T, E>>,
  rx$: Observable<Payload<T, E>>,
): Observable<T> {
  let _runOnceGuard = false;
  return (
    new rx.Observable<T>((subscriber) => {
      if (_runOnceGuard) {
        throw new Error('Observables are not re-usable when sent across IPC!');
      }
      _runOnceGuard = true;

      const complete$ = rx$
        .pipe(op.filter(({ type }) => type === 'complete'))
        .pipe(op.share());

      const values$ = rx$
        .pipe(op.takeUntil(complete$))
        .pipe(
          op.filter(
            (message): message is ValuePayload<T> => message.type === 'next',
          ),
        )
        .pipe(op.map(({ payload }) => payload));

      const errors$ = rx$
        .pipe(op.takeUntil(complete$))
        .pipe(
          op.filter(
            (message): message is ErrorPayload<E> => message.type === 'error',
          ),
        )
        .pipe(
          op.map(({ payload }) => {
            throw unwrap(payload);
          }),
        );

      // subscribe the subscriber, and also track it to clean up
      // $FlowFixMe - flow-typed dep broken
      const subscription = rx.merge(values$, errors$).subscribe(subscriber);
      subscriber.add(subscription);

      tx$.next({ type: 'subscribe' });

      // TODO (Matt): to be somewhat pedantic, this should not be sent across the
      // wire in cases where we know the observable on the other end completed
      return () => tx$.next({ type: 'unsubscribe' });
    })
      // at least let them share the same observable while actively running
      .pipe(op.share())
  );
}
