import { ServiceId } from '@mirage/discovery/id';
import * as services from '@mirage/discovery/services';
import { APIv2Callable } from '@mirage/service-dbx-api/service';
import { tagged } from '@mirage/service-logging';
import Sentry from '@mirage/shared/sentry';
import { isErrorWithMessage } from '@mirage/shared/util/error';
import { KVStorage } from '@mirage/storage';
import WithDefaults from '@mirage/storage/with-defaults';
import { EMPTY, from, Observable, Subject } from 'rxjs';
import { catchError, concatMap } from 'rxjs/operators';
import { ConversationMessage, NewUserMessage } from '../types';
import { getResponse } from './api';

export type Service = ReturnType<typeof conversationService>;

const logger = tagged('conversation-service');

export type StoredConversation = {
  conversations: { [key: string]: ConversationMessage[] };
};

type DbxApiServiceContract = {
  callApiV2: APIv2Callable;
};

export default function conversationService(
  rawStorage: KVStorage<StoredConversation>,
  { callApiV2 }: DbxApiServiceContract,
) {
  const adapter = new WithDefaults(rawStorage, { conversations: {} });
  const userMessages$ = new Subject<NewUserMessage>();
  const conversationUpdates = new Map<string, Subject<ConversationMessage[]>>();

  // Process user messages as they come in
  userMessages$
    .pipe(
      // Ensure only one message is processed at a time
      concatMap((newMessage) => {
        return from(processUserMessage(newMessage)).pipe(
          catchError((error) => {
            logger.error('Failed to process user message:', error);
            Sentry.withScope((scope) => {
              scope.setTag(
                'errorMessage',
                isErrorWithMessage(error) ? error.message : 'unknown',
              );
              Sentry.captureException(error, {}, scope);
              Sentry.captureMessage(
                '[service-conversation] error processing user message',
                'error',
                {},
                scope,
              );
            });
            return EMPTY;
          }),
        );
      }),
    )
    .subscribe();

  async function processUserMessage(newMessage: NewUserMessage): Promise<void> {
    const { conversationId, documentIds, message } = newMessage;
    const history = await getConversation(conversationId);

    // Create and emit pending message immediately
    const pendingMessage: ConversationMessage = {
      id: `pending-${Date.now()}`,
      message_type: {
        '.tag': 'user_message',
        text: message,
        rephrased: '',
      },
      timestamp: new Date().toISOString(),
      status: 'pending',
    };

    // Add pending message to history and emit
    const historyWithPending = [...history, pendingMessage];
    await setConversation(conversationId, historyWithPending);

    try {
      // Filter out any non-success messages from history before sending to API
      const historyWithoutPending = history
        .filter((msg) => msg.status === 'success')
        .map(({ status: _status, ...msg }) => msg); // Strip out status field for API

      // Get response from API
      const response = await getResponse(
        callApiV2,
        { text: message },
        conversationId,
        historyWithoutPending,
        documentIds,
      );

      // Add success status to all messages in the response
      const responseWithStatus: ConversationMessage[] = response.map((msg) => ({
        ...msg,
        status: 'success',
      }));

      // Update with real response from server
      await setConversation(conversationId, responseWithStatus);
    } catch (error) {
      // On error, update pending message to error status
      const historyWithError: ConversationMessage[] = [
        ...history,
        {
          ...pendingMessage,
          status: 'error',
        },
      ];
      await setConversation(conversationId, historyWithError);
      throw error;
    }
  }

  function getOrCreateConversationSubject(
    conversationId: string,
  ): Subject<ConversationMessage[]> {
    if (!conversationUpdates.has(conversationId)) {
      conversationUpdates.set(conversationId, new Subject());
    }
    return conversationUpdates.get(conversationId)!;
  }

  /**
   * Observe a conversation. This will emit the full conversation history as it changes.
   * @param conversationId The ID of the conversation to observe.
   * @returns An observable that emits the full conversation history.
   */
  function observeConversation(
    conversationId: string,
  ): Observable<ConversationMessage[]> {
    const subject = getOrCreateConversationSubject(conversationId);
    return subject.asObservable();
  }

  // TODO: remove after server-side persistence is implemented
  async function getConversation(
    conversationId: string,
  ): Promise<ConversationMessage[]> {
    const conversations = await adapter.get('conversations');
    return conversations[conversationId] ?? [];
  }

  // TODO: remove after server-side persistence is implemented
  // move subject emit to processUserMessage
  async function setConversation(
    conversationId: string,
    messages: ConversationMessage[],
  ): Promise<void> {
    const conversations = await adapter.get('conversations');
    conversations[conversationId] = messages;
    await adapter.set('conversations', conversations);

    // Emit update for this specific conversation
    const subject = getOrCreateConversationSubject(conversationId);
    subject.next(messages);
  }

  /**
   * Post a new user message to the conversation.
   * @param newMessage The new user message to post.
   */
  function postUserMessage(newMessage: NewUserMessage): void {
    userMessages$.next(newMessage);
  }

  return services.provide(
    ServiceId.CONVERSATION,
    {
      postUserMessage,
      observeConversation,
    },
    [ServiceId.DBX_API],
  );
}
