/// <reference no-default-lib="true" />
/// <reference lib="webworker" />
/* eslint @typescript-eslint/no-explicit-any: 0 */
import * as messages from '@mirage/discovery/messages';
import { OriginEnum } from '@mirage/discovery/messages';
import * as errors from '@mirage/shared/errors/classes/discovery';
import * as rx from 'rxjs';
import * as op from 'rxjs/operators';
import { v4 as uuid } from 'uuid';

import type { ServiceId } from '@mirage/discovery/id';
import type {
  AddNode,
  AddService,
  Identity,
  Message,
  NodeMap,
  Relay,
  RemoveNode,
  RemoveService,
  Request,
  Response,
} from '@mirage/discovery/messages';
import type {
  MessagePort as NodeMessagePort,
  Worker as NodeWorker,
} from 'node:worker_threads';
import type { Observable, Subject } from 'rxjs';

export type Peerable =
  | MessagePort
  | Worker
  | NodeWorker
  | NodeMessagePort
  | DedicatedWorkerGlobalScope;

export const WILDCARD_FILTER = Symbol.for('wildcard');

export type OriginFilter = typeof WILDCARD_FILTER | ServiceId[];

//
// local state tracking
//------------------------------------------------------------------------------
const self = new MessageChannel();

// TODO (Matt): should probably figure out a way that we can mock this for
// different ids during testing
const IDENTITY = uuid();

// all of our neighboring nodes we have direct access to via a port
// XXX: we treat ourselves as a neighbor to make access easier for relays
const neighbors: Map<string, Peerable> = new Map([
  [IDENTITY, start(self.port2)],
]);

// all of the nodes that we know about in the graph, even if we do not have
// one-step access to them via a message port
type NodeInformation = {
  distance: number; // number of hops to get to peer (can be zero if loopback)
  next: string; // next hop to get to the peer (can be self if local)
  services: ServiceId[]; // services handle-able via peer
};
const nodes: Map<string, NodeInformation> = new Map([
  [
    IDENTITY,
    {
      distance: 0,
      next: IDENTITY,
      services: [],
    },
  ],
]);

// used to filter incoming values from peers for allowed service registrations
// in cases where we want to defensively register peer services
const whitelist: Map<Peerable, OriginFilter> = new Map();

// used to broadcast changes from ourselves to all neighbors...these are sent
// indiscriminately to all adjacent nodes (so be careful if you touch this!)
const broadcast$: Subject<Message> = new rx.Subject();

// used for managing locally exposed services to provide tx/rx pairs
type Connection = [Subject<any>, Observable<any>];
const exposed: Map<ServiceId, Subject<Connection>> = new Map();

//
// message generation helpers
//------------------------------------------------------------------------------
function normalize<T extends object>(payload: T | MessageEvent<T>): T {
  // depending upon the context, it may be the data itself or it may be an
  // instance of a MessageEvent and we need to extract the payload data
  return payload instanceof MessageEvent || 'data' in payload
    ? ((payload as any).data as T)
    : (payload as any as T);
}

function start(peer: Peerable): Peerable {
  if ('start' in peer) peer.start();
  return peer;
}

function peerid(peer: Peerable): string | undefined {
  for (const [id, port] of neighbors.entries()) {
    if (port === peer) return id;
  }

  // shouldn't happen but we'll use undefined as a sigil here
  return undefined;
}

function nmap(): NodeMap {
  return Array.from(nodes.entries()).reduce(
    (all: NodeMap, [id, information]) => {
      all[id] = {
        distance: information.distance,
        services: information.services,
      };
      return all;
    },
    {},
  );
}

function sowner(service: ServiceId): string | undefined {
  let match;
  for (const [id, ndata] of nodes.entries()) {
    if (ndata.services.includes(service)) match = id;
  }
  return match;
}

function filterServicesByPeer(
  peer: Peerable,
  services: ServiceId[],
): ServiceId[] {
  const allowed = whitelist.get(peer);

  // should not happen but invariant here if we do not have a lookup
  if (!allowed) throw new Error('whitelist missing for peer');

  // if we are in a wildcard context, allow all services to pass
  if (allowed === WILDCARD_FILTER) return services;

  // otherwise filter by itemized services allowed over this peer
  return services.filter((service) => allowed.includes(service));
}

//
// message handling helpers
//------------------------------------------------------------------------------
const handlers = {
  identity(peer: Peerable, message: Identity) {
    const { payload } = message;
    // for identity messages, even if duplicative we'll update our local cache
    neighbors.set(payload.id, peer);

    let changed = false;
    for (const [id, info] of Object.entries(payload.nodes)) {
      // ignore node information about ourselves
      if (id === IDENTITY) continue;

      // defaults here presume they will be updated with the most accurate info
      const cnode = nodes.get(id);
      const node = cnode || {
        distance: Infinity, // presumed to be updated
        next: payload.id, // obviously the neighbor knows about it
        services: [], // to be populated
      };

      // it should be safe for us to blindly merge since we will receive
      // separate messages for service removal
      const slength = node.services.length;
      const filtered = filterServicesByPeer(peer, info.services);
      node.services = Array.from(new Set([...node.services, ...filtered]));
      if (node.services.length !== slength) changed = true;

      // only update our routing with next-hop if distance decreases
      // XXX: distances are perceived by the peer, need to add a hop
      if (info.distance + 1 < node.distance) {
        node.distance = info.distance + 1;
        node.next = payload.id;
        changed = true;
      }

      nodes.set(id, node);

      // if this was a new node, broadcast to peers
      if (cnode) continue;
      broadcast$.next(messages.builder.addNode(id, node.distance));
    }

    if (!changed) return;

    // XXX: we must re-send our identity instead of proxying the message
    // directly as the distances will be incorrect
    broadcast$.next(messages.builder.identity(IDENTITY, nmap()));
  },

  addService(peer: Peerable, message: AddService) {
    const { payload } = message;
    // XXX: this may be objectively "wrong" but if we have no associated node
    // information we cannot concretely say whether or not to propagate this
    // message and we cannot update our local state since we have no routing
    // information about unknown nodes
    const node = nodes.get(payload.id);
    if (!node) return;

    const slength = node.services.length;
    const filtered = filterServicesByPeer(peer, [payload.service]);
    node.services = Array.from(new Set([...node.services, ...filtered]));
    nodes.set(payload.id, node);

    // propagate service additions when it results in a change to our known
    // service state for a given node
    if (node.services.length !== slength) broadcast$.next(message);
  },

  removeService(peer: Peerable, message: RemoveService) {
    const { payload } = message;

    // determine if we are honoring incoming messages regarding service removal
    // from this peer and ignore this message if not
    const filtered = filterServicesByPeer(peer, [payload.service]);
    if (!filtered.length) return;

    // XXX: this may be objectively "wrong" but if we have no associated node
    // information we cannot concretely say whether or not to propagate this
    // message and we cannot update our local state since we have no routing
    // information about unknown nodes
    const node = nodes.get(payload.id);
    if (!node) return;

    const slength = node.services.length;
    node.services = node.services.filter((service) => {
      return service !== payload.service;
    });
    nodes.set(payload.id, node);

    // propagate service removals when it results in a change to our known
    // service state for a given node
    if (node.services.length === slength) return;
    broadcast$.next(
      messages.builder.removeService(payload.id, payload.service),
    );
  },

  addNode(peer: Peerable, message: AddNode) {
    // figure out what peer we got this from so that we can track next hop
    const next = peerid(peer);
    if (!next) {
      // we could probably throw? but we have no easy way to handle that here
      // eslint-disable-next-line no-console
      console.error('received message from peer not in local state!');
      return;
    }

    const { payload } = message;
    let broadcast = !nodes.has(payload.id);
    const node = nodes.get(payload.id) || {
      distance: payload.distance + 1, // add one since we got it from a peer
      services: [],
      next,
    };

    // check to see if our distance has decreased compared to our state
    if (payload.distance + 1 < node.distance) {
      broadcast = true;
      node.distance = payload.distance + 1;
      node.next = next;
    }

    nodes.set(payload.id, node);

    // ensure that we adjust distance and do not send the message as-is since
    // the receiving end will consider us a relay node for distance tracking
    if (!broadcast) return;
    broadcast$.next(messages.builder.addNode(payload.id, payload.distance + 1));
  },

  removeNode(message: RemoveNode) {
    const { payload } = message;
    // XXX: this may be objectively "wrong" but if we have no associated node
    // information we cannot concretely say whether or not to propagate this
    // message and we cannot update our local state since we have no routing
    // information about unknown nodes
    const node = nodes.get(payload.id);
    if (!node) return;

    // remove the node associated with this message and all nodes that we know
    // about that traverse via the node being removed
    nodes.delete(payload.id);
    for (const [id, ninfo] of nodes) {
      if (ninfo.next === payload.id) nodes.delete(id);
    }

    broadcast$.next(message);
  },

  relay(message: Relay | Request | Response) {
    // check to see if we have knowledge of a node in the graph that corresponds
    // to this relayable message, if we cannot direct it we will drop it
    //
    // TODO (Matt): we should notify the origin node that we do not have a route
    // and then attempt remediation (probably via an identity broadcast?)
    // the origin should remove the node and associated services
    const { payload } = message;
    const node = nodes.get(payload.destination);
    if (!node) {
      // TODO: remove this console if not needed
      //eslint-disable-next-line no-console
      console.error('received relay message for a peer with no route!');
      return;
    }

    // this should not happen assuming our identity tracking and service code
    // works as expected, but guard against a case where we lose the next hop
    const neighbor = neighbors.get(node.next);
    if (!neighbor) {
      // TODO: remove this console if not needed
      // eslint-disable-next-line no-console
      console.error('failed to look up next hop in neighbor mapping!');
      return;
    }

    // propagate this message to the next neighbor
    // XXX: because of our port setup code and the mapping we maintain, _we_ are
    // a neighbor to ourself and can directly look up and use the port to send
    // the message in either context. the difference is that we will handle the
    // message locally when reflecting it onto our local context
    (neighbor as unknown as MessagePort).postMessage(message);
  },
};

//
// local port setup code
//------------------------------------------------------------------------------
// port one is our local context port, messages coming across here are directed
// at our local context
const p1 = start(self.port1) as any as MessagePort;
whitelist.set(p1, WILDCARD_FILTER);
const incoming$ = rx
  .fromEvent<MessageEvent<Relay>>(p1, 'message')
  .pipe(op.map(normalize))
  .pipe(op.share());

incoming$.pipe(op.filter(messages.is.request)).subscribe((request: Request) => {
  const {
    payload: { source, id, service },
  } = request;

  // ensure that this service is exposed locally
  const exthandler = exposed.get(service);
  if (!exthandler) {
    // send an error response to the requesting peer indicating failure to setup
    // a channel with the given locally exposed service
    return handlers.relay(
      messages.builder.response(IDENTITY, source, id, undefined),
    );
  }

  // create a new bound pair over a generated context uuid
  const ctx = uuid();
  const [tx$, rx$] = connection(IDENTITY, source, ctx, OriginEnum.Service);

  // provide them to our locally exposed service handler
  exthandler.next([tx$, rx$]);

  // inform the peer that we are ready to rock and roll
  return handlers.relay(messages.builder.response(IDENTITY, source, id, ctx));
});

// port two is our local to outbound port and messages from here are being
// sent from the local context across the graph
const p2 = start(self.port2) as any as MessagePort;
rx.fromEvent<MessageEvent<Relay>>(p2, 'message')
  .pipe(op.map(normalize))
  .subscribe(handlers.relay);

function connection(
  source: string,
  destination: string,
  context: string,
  origin: OriginEnum,
): Connection {
  const tx$ = new rx.Subject<any>();
  const rx$ = incoming$
    .pipe(op.filter(messages.is.relay))
    .pipe(op.filter(messages.is.context(context)))
    .pipe(op.filter(messages.is.norigin(origin)))
    .pipe(op.map(messages.get.payload));

  // TODO (Matt): need to capture this subscription so that it completes when
  // the externally provided handler subject completes so we do not have
  // lingering subjects
  tx$
    .pipe(op.map(messages.wrap.relay(source, destination, context, origin)))
    .subscribe(handlers.relay);

  return [tx$, rx$];
}

//
// external api
//------------------------------------------------------------------------------
/**
 * This is the primary entry point that consumers of discovery will interact
 * with. Calling `listen` sets up a bridge between different contexts so that
 * peers and services can be discovered and interacted with.
 *
 * The `filters` parameter itemizes services which the local context should know
 * about and be able to discover. These filters are one-way, filtering incoming
 * service registration-related message and lean on the calling context to
 * specify which services it needs to be able to access from the provided
 * `peer`.
 *
 * In situations where the `peer` provides the same services as the current
 * context, it is important to itemize services the calling context needs access
 * to using an array parameter of `ServiceId`s. In consumption contexts where
 * there are no duplicate service registrations, you can use the
 * `WILDCARD_FILTER` to allow all service registration-related messages to pass
 * through. If you want to be able to provide services to the `peer` but consume
 * no services from the peer, you can use an empty array (`[]`) to specify that
 * you want to drop all service-related messages from the peer but expose your
 * registered services.
 *
 * NOTE: these filters are indiscriminate of the origin of any service
 * registration-related messages. The caller of `listen` must itemize all
 * allowed dependencies regardless of hop distance from the `peer`.
 */
export function listen(peer: Peerable, filters: OriginFilter) {
  // rx is a bit pissy with variants of message port
  const port = start(peer) as any as MessagePort;

  const incoming$ = rx
    .fromEvent<MessageEvent<Message>>(port, 'message')
    .pipe(op.map(normalize))
    .pipe(op.share());

  // track provided filters for handling informational filtering
  whitelist.set(peer, filters);

  // TODO: remove node if we can capture msg port closures
  // TODO: capture port error? recoverable?
  //  -> does the port close on error or just a signal that there was one?

  // create an observable that captures the first identity message from our peer
  // so we have a neighbor identity and then segment based upon message type so
  // that we can most accurately manage message handling
  const messages$ = incoming$
    .pipe(
      op.connect((shared$) => {
        return shared$
          .pipe(op.filter(messages.is.identity))
          .pipe(op.take(1))
          .pipe(
            op.mergeMap((payload: Identity) => {
              handlers.identity(peer, payload);
              return shared$;
            }),
          );
      }),
    )
    .pipe(op.share());

  // listen for identity messages from peers so that we can update our cache
  messages$
    .pipe(op.filter(messages.is.identity))
    .subscribe(handlers.identity.bind(handlers.identity, peer));

  // listen for node additions
  messages$
    .pipe(op.filter(messages.is.addNode))
    .subscribe(handlers.addNode.bind(handlers.addNode, peer));

  // listen for node removals
  messages$
    .pipe(op.filter(messages.is.removeNode))
    .subscribe(handlers.removeNode);

  // listen for service additions
  messages$
    .pipe(op.filter(messages.is.addService))
    .subscribe(handlers.addService.bind(handlers.addService, peer));

  // listen for service removals
  messages$
    .pipe(op.filter(messages.is.removeService))
    .subscribe(handlers.removeService.bind(handlers.removeService, peer));

  // listen for source/destination type messages
  const local = messages.or(
    messages.is.relay,
    messages.is.request,
    messages.is.response,
  );

  const relayable$ = messages$.pipe(op.filter(local)) as Observable<
    Relay | Request | Response
  >;
  relayable$.subscribe(handlers.relay);

  // on listen, emit an identity payload that contains our peers and exposed
  // services for the other side to consume
  const message = messages.builder.identity(IDENTITY, nmap());
  (peer as any as MessagePort).postMessage(message);

  // subscribe this neighbor to broadcast messages
  broadcast$.subscribe((message) => {
    (peer as any as MessagePort).postMessage(message);
  });
}

export function expose(service: ServiceId): Subject<Connection> {
  // add service to our local state pointing at ourselves
  const self = nodes.get(IDENTITY);
  if (!self) throw new errors.GraphStateError('Self lookup failed');

  // invariant if we have already exposed this service from out local context
  if (self.services.includes(service)) {
    throw new errors.DuplicateService(service);
  }

  // register service locally
  const incoming = new rx.Subject<Connection>();
  self.services.push(service);
  exposed.set(service, incoming);

  // notify externally that the service has been added
  broadcast$.next(messages.builder.addService(IDENTITY, service));
  return incoming;
}

export function remove(service: ServiceId) {
  // add service to our local state pointing at ourselves
  const self = nodes.get(IDENTITY);
  if (!self) throw new errors.GraphStateError('Self lookup failed');

  // invariant if we have logic errors in external code
  if (!self.services.includes(service)) {
    throw new errors.ServiceRemovalError(service);
  }

  // preempt local cleanup by issuing a removal message
  broadcast$.next(messages.builder.removeService(IDENTITY, service));

  // clean up local exposed service state
  const incoming = exposed.get(service) as Subject<Connection>;
  incoming.complete();
  exposed.delete(service);
  self.services = self.services.filter((lservice) => {
    return lservice !== service;
  });
}

export function has(service: ServiceId) {
  return sowner(service) !== undefined;
}

export async function get(
  service: ServiceId,
  timeout: number = 30_000,
): Promise<Connection> {
  // XXX: use a defer here so we do a re-lookup on retry
  const connection$ = rx
    .defer(() => rx.of(sowner(service)))
    .pipe(
      op.mergeMap((nid) => {
        // escape hatch if we do not know about this service (yet)
        if (!nid) {
          return rx.throwError(() => new errors.ServiceNotFoundError(service));
        }

        // create and direct the message using our source/destination relay
        const rid = uuid();
        handlers.relay(messages.builder.request(IDENTITY, nid, rid, service));

        return incoming$
          .pipe(op.filter(messages.is.response))
          .pipe(op.filter(messages.is.rid(rid)))
          .pipe(op.take(1))
          .pipe(
            op.mergeMap(({ payload: { context } }) => {
              // it's possible we made a request before we had state information
              // and the node no longer provides this service, exit and error
              if (!context) {
                handlers.removeService(
                  p1,
                  messages.builder.removeService(nid, service),
                );
                return rx.throwError(
                  () => new errors.ServiceNotFoundError(service),
                );
              }

              // create a wrapped communication channel and return it
              return rx.of(
                connection(IDENTITY, nid, context, OriginEnum.Consumer),
              );
            }),
          );
      }),
    )
    .pipe(op.retry({ delay: 10 }))
    .pipe(
      op.timeout({
        first: timeout,
        with: () =>
          rx.throwError(() => new errors.NoConnectionResponseError(service)),
      }),
    );

  return rx.firstValueFrom(connection$);
}

// XXX: use at your own peril, will hopefully resolve ... eventually
export async function wait(services: ServiceId[]): Promise<undefined> {
  const available = (service: ServiceId): Observable<void> => {
    return rx
      .defer(() => rx.of(sowner(service)))
      .pipe(
        op.mergeMap((nid) => {
          if (!nid) return rx.throwError(() => new Error());
          return rx.of(undefined);
        }),
      )
      .pipe(op.retry({ delay: 10 }));
  };
  const mapped: Observable<void>[] = services.map(available);
  return rx.firstValueFrom(rx.forkJoin(mapped).pipe(op.map(() => undefined)));
}

export function available(): Set<ServiceId> {
  const known: Set<ServiceId> = new Set();
  for (const node of nodes.values()) {
    for (const service of node.services) known.add(service);
  }
  return known;
}
