import { tagged } from '@mirage/service-logging';
import * as jobs from '@mirage/shared/util/jobs';
import { KVStorage } from '@mirage/storage';
import WithDefaults from '@mirage/storage/with-defaults';
import fetch from 'cross-fetch';
import pqueue from 'p-queue';

import type {
  LogMessage,
  SinkTransport,
} from '@mirage/service-logging/service';

const logger = tagged('service-logging/service/transports/datadog.ts');

const LOG_BATCH_SIZE = 200;
const FLUSH_INTERVAL = 10_000;

// max number of log lines to keep in memory while we are flushing to datadog
// capping this ensures that we do not OOM in the case that we fail to send logs
// to datadog whether their service is down or the host is blocking outbound
// access to datadog
const BACKLOG_THRESHOLD = 10_000;

type DatadogLogMessage = LogMessage & DatadogMetadata;

type DatadogMetadata = ExternalMetadata & {
  service: string;
  environment: string;
};

interface ExternalMetadata {
  version?: string;
  platform?: string;
  installId?: string;
  sessionId?: string;
  onboardingDbid?: string;
  user?: {
    dbid: string | undefined;
    id: number | undefined;
  };
}

export type PersistentLogData = {
  pending: string[];
};

export default function datadog(
  clientToken: string,
  serviceId: string,
  environment: string,
  metadata: ExternalMetadata,
  persistence: KVStorage<PersistentLogData>,
): SinkTransport {
  const host = `browser-http-intake.logs.datadoghq.com`;
  const endpoint = `https://${host}/v1/input/${clientToken}?ddsource=browser`;

  const storage = new WithDefaults(persistence, {
    pending: [],
  });

  const queue = new pqueue({ concurrency: 1 });

  jobs.register(
    'service-logging/datadog/flush',
    FLUSH_INTERVAL,
    true,
    flush,
    false,
  );

  function flush() {
    return queue.add(async () => {
      while (await attemptToFlushNextBatch());
    });
  }

  // the individual page-sized flush is a best-effort mechanism to flush the
  // next batch of log lines to datadog. it is based on a default assumption
  // that the flush will succeed and only on failure will it re-insert values
  // back into the queue
  //
  // this is not completely correct, but is oriented as a solution to short-
  // running page loads where we may leave the page before we get a promise
  // resolution and guarantee that logs were received
  //
  // we'll return true on a flush success or false when we have no more items or
  // we received a failure on log flushing indicating whether or not we should
  // attempt to continue flushing
  async function attemptToFlushNextBatch(): Promise<boolean> {
    const pending = await storage.get('pending');

    // nothing to flush, indicate upwards that we should stop flushing
    if (!pending.length) return false;

    // capture next batch and write an updated pending state to storage
    const batch = pending.splice(0, LOG_BATCH_SIZE);
    await storage.set('pending', pending);

    return fetch(endpoint, {
      method: 'POST',
      headers: { 'Content-Type': 'text/plain' },
      body: batch.join('\n'),
    })
      .then((response) => {
        if (!response.ok) return Promise.reject('Response indicates failure');
        return true;
      })
      .catch(async () => {
        // on failure, merge the two sets together, and re-store data
        await storage.set('pending', [...batch, ...pending]);
        return false;
      });
  }

  function decorate(log: LogMessage): DatadogLogMessage {
    return {
      ...log,
      environment,
      service: serviceId,
      ...metadata,
    };
  }

  return {
    log: (logs: LogMessage[]) => {
      queue.add(async () => {
        const pending = await storage.get('pending');

        // it's possible that a few cases have happened to get here. it could be
        // that we're in a context that prevents access to datadog (e.g. dns
        // blocking access, browser extension blocking access, no network, or in a
        // development environment with harsh CORS restrictions). in these cases,
        // we'll limit the backlog to prevent accumulating too much storage
        if (pending.length >= BACKLOG_THRESHOLD) {
          logger.debug('datadog queue exceeds threshold, dropping messages!');
          pending.splice(
            0,
            pending.length - BACKLOG_THRESHOLD + (logs.length || 1),
          );
        }

        pending.push(...logs.map((log) => JSON.stringify(decorate(log))));
        await storage.set('pending', pending);
      });
    },
  };
}
