import { SearchResultSource } from '@mirage/analytics/events/enums/search_result_source';
import { ServiceId } from '@mirage/discovery/id';
import * as services from '@mirage/discovery/services';
import { LOCAL_FILE_CONNECTOR } from '@mirage/shared/connectors';
import { SearchTimeoutError } from '@mirage/shared/errors/classes/search';
import { scoreResult } from '@mirage/shared/search/scoring/score-result';
import {
  getConnectorFilters,
  isConnectorFilter,
  SearchFilterType,
} from '@mirage/shared/search/search-filters';
import { ONE_SECOND_IN_MILLIS } from '@mirage/shared/util/constants';
import * as rx from 'rxjs';
import * as op from 'rxjs/operators';

import type { Connection, SearchResult } from '@mirage/service-dbx-api/service';
import type { Namespace } from '@mirage/service-operational-metrics';
import type {
  ConnectorFilter,
  SearchFilter,
} from '@mirage/shared/search/search-filters';
import type { ConsolaInstance } from 'consola';
import type { Observable } from 'rxjs';

export type Service = {
  performSearch(
    searchQuery: string,
    connections: Connection[],
    filters: SearchFilter[],
    options: SearchOptions,
  ): Observable<{
    results: ScoredResult[];
    localMixinPosition: number;
  }>;
};
export type SearchOptions = {
  localMixinThreshold: number;
  isLocalFilesEnabled: boolean;
  useSlottedSearchRanking: boolean;
};

export interface CachingStrategy {
  cacheQuery(query: string): void;
}
export type CustomSource = {
  connector: Connection;
  search: (query: string, filters: SearchFilter[]) => Promise<SearchResult[]>;
};

export type ScoredResult = SearchResult & {
  score: number;
};
export type ScoredResultWithLatency = ScoredResult & {
  latency: number;
};

interface DbxApiServiceContract {
  fetchSearchResults(
    searchQuery: string,
    filters: SearchFilter[],
  ): Promise<SearchResult[]>;
  fetchUpstreamForConnection(
    searchQuery: string,
    connection: Connection,
    filters: SearchFilter[],
  ): Promise<SearchResult[]>;
}

interface LoggingServiceContract {
  tagged: (tag: string) => ConsolaInstance;
}

interface OperationalMetricsServiceContract {
  namespace(ns: string): Namespace;
  measureLatencySync<T>(ns: string, name: string, fn: () => T): T;
}

const METRIC_NAMESPACE = 'search-service';
const MAX_DATE_NUM = 9999999999999;
const SEARCH_TIMEOUT = ONE_SECOND_IN_MILLIS * 20;

export const searchConfig = {
  /** the client score value used when we pin some results to the top */
  pinnedServerResultScore: 1,
  /** the default position that all results will be "mixed in" at. This is only used if localMixinThreshold isn't set  */
  defaultLocalMixinPosition: 5,
  /** A max limiter for the number of results that can be pinned to the top */
  maxPinnedResults: 5,
  /** minimum number of pinned results to show before we start mixing and displaying "everything" */
  minPinnedResults: 10,
  /** when we stop waiting on latent upstream calls and show all results */
  upstreamSearchTimeout: 8000,
  /** when we stop waiting on additional sources and show all results */
  additionalSourcesSearchTimeout: 8000,
  /** delay before showing second batch of results (if necessary) */
  secondBatchResultDelayMs: 1500,
  /** max number of results consider as candidates from each upstream connection */
  maxUpstreamResultsPerConnection: 10,
  /** Max number of total results to dispaly on the page */
  maxResultsToDisplay: 100,
} as const;

export default function searchService(
  cache: CachingStrategy,
  additionalSources: Array<CustomSource>,
  dbxApiService: DbxApiServiceContract,
  { tagged }: LoggingServiceContract,
  { namespace, measureLatencySync }: OperationalMetricsServiceContract,
) {
  const logger = tagged(METRIC_NAMESPACE);
  const metrics = namespace(METRIC_NAMESPACE);

  function scoreResults(
    query: string,
    results: SearchResult[],
  ): ScoredResult[] {
    const perform = () => {
      return results.map((result) => ({
        ...result,
        score: scoreResult(query, result),
      }));
    };
    return measureLatencySync(METRIC_NAMESPACE, 'score-results', perform);
  }

  function performSearch(
    searchQuery: string,
    connections: Connection[],
    filters: SearchFilter[],
    options: SearchOptions,
  ) {
    logger.debug('starting search');

    let localMixinPosition: number = searchConfig.defaultLocalMixinPosition;
    const { localMixinThreshold, isLocalFilesEnabled = false } = options;

    // connector ids of applied connector filters
    const filterConnectorIds = getConnectorFilters(filters).map(
      ({ parameters }) => parameters.connectorId,
    );

    // setup the observables for the various search sources
    // these are all deferred so that they don't start until the search is
    // subscribed to
    const server$ = fetchServerResults({
      searchQuery,
      filters,
      localMixinThreshold,
      dbxApiService,
      logger,
      scoreResults,
    });
    const upstream$ = fetchUpstreamResults({
      searchQuery,
      filters,
      connections,
      filterConnectorIds,
      dbxApiService,
      logger,
      metrics,
      scoreResults,
    });
    const additionalSources$ = fetchAdditionalSourceResults({
      searchQuery,
      filters,
      additionalSources,
      filterConnectorIds,
      isLocalFilesEnabled,
      logger,
      scoreResults,
    });

    // then start combining them, buckle up it's a bit of a brain bender
    const results$ = server$.pipe(
      // this multicasts the observable so that it can be essentially subscribed
      // by multiple times and the results are shared
      op.connect((multicastedServer$) => {
        // merge the upstream and additional sources into one, this uses the
        // combineLatest dance on purpose to ensure that it re-emits all of the
        // previous results whenever either of the sources emits
        const upstreamAndAddlResults$ = rx
          .combineLatest([upstream$, additionalSources$])
          .pipe(
            op.map(([upstreamResults, additionalSourceResults]) => [
              ...upstreamResults,
              ...additionalSourceResults,
            ]),
          )
          .pipe(
            op.connect((multicastedUpstreamAndAddl$) => {
              // this ensures that we only emit results once the server results
              // have come back, even if upstream or additional sources come
              // back first. This is done by buffering the upstream and
              // additional source results until the server results come back,
              // then it will emit all the buffered results followed by hooking
              // the observable back into the pipeline so if it emits results
              // after they'll still be piped through
              const buffered$ = multicastedUpstreamAndAddl$
                .pipe(op.buffer(multicastedServer$))
                .pipe(op.take(1))
                .pipe(
                  op.mergeMap((bufferedResultsArrArr) =>
                    rx.from(bufferedResultsArrArr),
                  ),
                );
              // concat will only subscribe to one of the observables at a
              // time, so in this case the first will be subscribed and then
              // we will wait till it completes, then the second is subscribed.
              return rx.concat(buffered$, multicastedUpstreamAndAddl$);
            }),
          );

        // flatten the server results into a single array and set the localMixinPosition
        const flattenedServerResults$ = multicastedServer$.pipe(
          op.map(([fixedResults, scoredResults]) => {
            localMixinPosition = fixedResults.length;
            return [...fixedResults, ...scoredResults];
          }),
        );

        return (
          rx
            // combineLatest acts like forkJoin, but it doesn't wait for all of
            // the observables to complete, it instead just emits the latest
            // value from each observable whenever any of them emit a value
            .combineLatest([flattenedServerResults$, upstreamAndAddlResults$])
            .pipe(
              // we (ab)use combineLatest here to ensure that no matter what
              // order the observables emit in, we will always put the server
              // results first. They'll be somewhat re-sorted in useSearch, but
              // we still want to ensure that server results are always first
              // here and rely on useSearch to read the localMixinPosition and
              // re-sort the results as necessary
              op.map(([serverResults, upstreamAndAddlResults]) => [
                ...serverResults,
                ...upstreamAndAddlResults,
              ]),
            )
        );
      }),
    );

    return (
      results$
        // an overall timeout, will throw if anything takes too long
        .pipe(
          op.timeout({
            each: SEARCH_TIMEOUT,
            with: () =>
              rx.throwError(
                () =>
                  new SearchTimeoutError(
                    `Search operation timed out after ${
                      SEARCH_TIMEOUT / 1000
                    } seconds`,
                  ),
              ),
          }),
          op.map((results) => {
            return {
              results,
              localMixinPosition,
            };
          }),
        )
        // cache the query
        .pipe(
          op.tap(() => {
            measureLatencySync(METRIC_NAMESPACE, 'cache-results', () => {
              cache.cacheQuery(searchQuery);
            });
          }),
        )
        // and log completion
        .pipe(
          op.tap({
            error: (err) => {
              logger.error('Search error', err);
            },
            complete: () => {
              logger.debug('Search complete');
            },
          }),
        )
    );
  }
  services.provide<Service>(
    ServiceId.SEARCH,
    {
      performSearch,
    },
    [ServiceId.DBX_API],
  );
}

/**
 * Fetch search results from the server
 * this emits a tuple of search result arrays. The first array is for the
 * "fixed score" results, the second is for the rest of the results
 */
function fetchServerResults({
  searchQuery,
  filters,
  localMixinThreshold,
  dbxApiService,
  logger,
  scoreResults,
}: {
  searchQuery: string;
  filters: SearchFilter[];
  localMixinThreshold: number;
  dbxApiService: DbxApiServiceContract;
  logger: ConsolaInstance;
  scoreResults: (query: string, results: SearchResult[]) => ScoredResult[];
}) {
  return rx
    .defer(() => {
      const connectorFilters = filters.filter(isConnectorFilter);

      if (isFilteredToOnlyLocalFiles(connectorFilters)) {
        logger.debug(
          'Filtered to only local files, not returning server results',
        );
        return rx.of([]);
      }

      const t0 = performance.now();
      return rx.from(
        dbxApiService
          .fetchSearchResults(searchQuery, filters)
          .then(filterOutContacts)
          .then(validateProvidedAtMs)
          .then((results) => tagSearchResultSource(results, 'server'))
          .then((results) => {
            const latency = performance.now() - t0;
            return results.map((result) => ({ ...result, latency }));
          }),
      );
    })
    .pipe(
      op.map((results) => {
        // split it into 2 arrays, one with fixed score results and one with scored results
        if (localMixinThreshold) {
          logger.debug('Using threshold score for local mixin');

          const fixedResults: ScoredResult[] = [];
          const belowThresholdResults: SearchResult[] = [];

          // Categorize results based on threshold
          results.forEach((result) => {
            if (
              result.relevanceScore >= localMixinThreshold &&
              fixedResults.length < searchConfig.maxPinnedResults
            ) {
              // Results above the threshold will have a fixed score
              fixedResults.push({
                ...result,
                score: searchConfig.pinnedServerResultScore,
              });
            } else {
              belowThresholdResults.push(result);
            }
          });

          // Score the results that are below the threshold
          const scoredBelowThreshold = scoreResults(
            searchQuery,
            belowThresholdResults,
          );

          return [fixedResults, scoredBelowThreshold] as [
            ScoredResultWithLatency[],
            ScoredResultWithLatency[],
          ];
        } else {
          logger.debug('Using fixed score for local mixin');

          const fixed = results
            .slice(0, searchConfig.defaultLocalMixinPosition)
            .map((result) => ({
              ...result,
              score: searchConfig.pinnedServerResultScore,
            }));

          const scored = scoreResults(
            searchQuery,
            results.slice(searchConfig.defaultLocalMixinPosition),
          );

          return [fixed, scored] as [
            ScoredResultWithLatency[],
            ScoredResultWithLatency[],
          ];
        }
      }),
    )
    .pipe(
      op.tap({
        error: (err) => {
          logger.error('Server search error', err);
        },
        complete: () => {
          logger.debug('Server search complete');
        },
      }),
    );
}

/**
 * Fetches upstream results for all supported connectors.
 * this has a built-in timeout and will silence any errors
 */
function fetchUpstreamResults({
  searchQuery,
  filters,
  connections,
  filterConnectorIds,
  dbxApiService,
  logger,
  metrics,
  scoreResults,
}: {
  searchQuery: string;
  filters: SearchFilter[];
  connections: Connection[];
  filterConnectorIds: string[];
  dbxApiService: DbxApiServiceContract;
  logger: ConsolaInstance;
  metrics: Namespace;
  scoreResults: (query: string, results: SearchResult[]) => ScoredResult[];
}) {
  logger.debug('Starting upstream search');

  // connections to use for upstream search
  const upstreamConnectors = connections.filter((connection) => {
    // Connection doesn't support upstream search
    if (!connection?.connector?.details?.supports_upstream) return false;
    // Connection doesn't have a type
    if (!connection?.connector?.id_attrs?.type) return false;
    // Connection is expired or not connected
    if (!(connection?.connection_status?.['.tag'] === 'connected'))
      return false;

    if (filterConnectorIds.length === 0) return true;

    return filterConnectorIds.includes(connection.connector.id_attrs?.type);
  });

  return rx.defer(() =>
    rx
      .from(upstreamConnectors)
      .pipe(
        op.mergeMap((connector) => {
          const t0 = performance.now();
          const connectorSafeName = connector.branding?.display_name
            ?.toLocaleLowerCase()
            .replace(' ', '');
          logger.debug(`Starting upstream search for ${connectorSafeName}`);

          return rx.from(
            dbxApiService
              .fetchUpstreamForConnection(searchQuery, connector, filters)
              .then(filterOutContacts)
              .then(validateProvidedAtMs)
              .then((results) =>
                results.slice(0, searchConfig.maxUpstreamResultsPerConnection),
              )
              .then((results) => {
                const latency = performance.now() - t0;
                metrics.stats(`upstream-${connectorSafeName}`, latency);
                return tagSearchResultSource(results, 'upstream');
              })
              .then((results) => scoreResults(searchQuery, results))
              .catch((err) => {
                logger.error(
                  `Error fetching upstream search results for ${connectorSafeName}`,
                  err.message,
                );
                // silence errors
                return [] as ScoredResult[];
              }),
          );
        }),
      )
      .pipe(op.startWith([] as ScoredResult[]))
      .pipe(
        op.takeUntil(
          rx.timer(searchConfig.upstreamSearchTimeout).pipe(
            op.tap(() => {
              logger.warn('Upstream search timed out');
            }),
          ),
        ),
      )
      .pipe(
        op.tap({
          error: (err) => {
            logger.error('Upstream search error', err);
          },
          complete: () => {
            logger.debug('Upstream search complete');
          },
        }),
      ),
  );
}

/**
 * Fetches any provided "additional sources" (eg local files in hornet)
 * this has a built-in timeout and will silence any errors
 */
function fetchAdditionalSourceResults({
  searchQuery,
  filters,
  additionalSources,
  filterConnectorIds,
  isLocalFilesEnabled,
  logger,
  scoreResults,
}: {
  searchQuery: string;
  filters: SearchFilter[];
  additionalSources: CustomSource[];
  filterConnectorIds: string[];
  isLocalFilesEnabled: boolean;
  logger: ConsolaInstance;
  scoreResults: (query: string, results: SearchResult[]) => ScoredResult[];
}) {
  logger.debug('Starting additional source search');
  // Incorporate results from additional search sources here, such as local files
  const filteredAdditionalSources = additionalSources
    .filter((source) => {
      // filter gated sources (ie local files)
      const localFileConnectionId = LOCAL_FILE_CONNECTOR?.id_attrs?.id;
      const sourceConnectionId = source.connector?.connector?.id_attrs?.id;
      if (
        sourceConnectionId === localFileConnectionId &&
        !isLocalFilesEnabled
      ) {
        return false;
      }
      return true;
    })
    // handle applied search filters
    .filter((source) => {
      // no support for people, file type or updated on additional sources yet
      if (containsUnsupportedFilters(filters)) return false;

      if (filterConnectorIds.length === 0) return true;
      // remove connections without type
      if (!source.connector.connector?.id_attrs?.type) return false;

      return filterConnectorIds.includes(
        source.connector.connector?.id_attrs?.type,
      );
    });

  return rx.defer(() =>
    rx
      .from(filteredAdditionalSources)
      .pipe(
        op.mergeMap((customSource) => {
          const t0 = performance.now();
          return rx.from(
            customSource
              .search(searchQuery, filters)
              .then(
                (results) => tagSearchResultSource(results, 'local_cache'), // TODO: new source for local files?
              )
              .then((results) => scoreResults(searchQuery, results))
              .then((scored) => {
                const latency = performance.now() - t0;
                return scored.map((result) => ({
                  ...result,
                  latency,
                })) as ScoredResultWithLatency[];
              })
              .catch((err) => {
                logger.error(
                  `Error fetching additional source results for ${customSource.connector.connector?.id_attrs?.type}`,
                  err.message,
                );
                // silence errors
                return [] as ScoredResult[];
              }),
          );
        }),
      )
      .pipe(op.startWith([] as ScoredResult[]))
      .pipe(
        op.takeUntil(
          rx.timer(searchConfig.additionalSourcesSearchTimeout).pipe(
            op.tap(() => {
              logger.warn('Additional sources search timed out');
            }),
          ),
        ),
      )
      .pipe(
        op.tap({
          error: (err) => {
            logger.error('Additional Sources search error', err);
          },
          complete: () => {
            logger.debug('Additional Sources search complete');
          },
        }),
      ),
  );
}

function filterOutContacts(results: SearchResult[]) {
  return results.filter(
    (result) =>
      result?.recordType?.['.tag'] !== 'contact' &&
      result?.recordType?.['.tag'] !== 'user',
  );
}

function validateProvidedAtMs(results: SearchResult[]) {
  return results.map((r) => {
    let { providerUpdateAtMs } = r;
    if (providerUpdateAtMs && providerUpdateAtMs > MAX_DATE_NUM) {
      let providerUpdatedAt = new Date(providerUpdateAtMs);
      if (isNaN(providerUpdatedAt.getTime())) {
        providerUpdateAtMs = Math.trunc(providerUpdateAtMs / 1000);
        providerUpdatedAt = new Date(providerUpdateAtMs);
        if (isNaN(providerUpdatedAt.getTime())) {
          // Invalid date provided
          r.providerUpdateAtMs = 0;
        } else {
          r.providerUpdateAtMs = providerUpdateAtMs;
        }
      }
    }
    return r;
  });
}

function tagSearchResultSource(
  results: SearchResult[],
  searchResultSource: SearchResultSource,
) {
  return results.map((r) => {
    return {
      ...r,
      searchResultSource,
    };
  });
}

const unsupportedFilterTypes = [SearchFilterType.Person];
export function containsUnsupportedFilters(filters: SearchFilter[]) {
  return filters.some((filter) => unsupportedFilterTypes.includes(filter.type));
}

export function isFilteredToOnlyLocalFiles(filters: ConnectorFilter[]) {
  if (
    filters.length === 1 &&
    filters[0]?.id &&
    filters[0].id === LOCAL_FILE_CONNECTOR.id_attrs?.id
  ) {
    return true;
  }
  return false;
}
