import { tagged } from '@mirage/service-logging';
import * as rx from 'rxjs';
import { createAPIv2GRPCWebPromiseClient } from '../api_v2';
import { ConversationsUserRequest } from './gen/context_engine_conversations_service_pb';
import { ConversationsUserMessage } from './gen/conversations_pb';
import { DocSummarizationApiV2 } from './gen/doc_summarization_connectweb';

import type { GetAnswersForQueryResult } from './gen/context_engine_answers_service_pb';
import type { ConversationsAssistantResponse } from './gen/context_engine_conversations_service_pb';
import type { ConversationsChatMessage } from './gen/conversations_pb';
import type { DocumentId } from './gen/doc_summarization_pb';
import type { DocSummaryQnaResponse } from './types';

const logger = tagged('context_Observables');

export function createDocSummarizationObservable(
  resultId: DocumentId,
  fileType: string = '',
  connectorName: string = '',
  docSummarySetting: string = '',
): rx.Observable<DocSummaryQnaResponse> {
  return new rx.Observable<DocSummaryQnaResponse>((subscriber) => {
    const summarizationQnaClient = createAPIv2GRPCWebPromiseClient(
      DocSummarizationApiV2,
    );
    if (resultId) {
      (async () => {
        try {
          for await (const res of summarizationQnaClient.getDocSummary({
            id: resultId,
            fileType: fileType,
            connectorName: connectorName,
            setting: docSummarySetting,
          })) {
            if (res?.summary) {
              subscriber.next({
                answer: res.summary,
                requestId: res.requestId,
              });
            }
          }
          subscriber.complete();
        } catch (error) {
          logger.error('createDocSummarizationObservable error', error);
          subscriber.error(error);
        }
      })();
    }
  });
}

export function createDocAnswerObservable(
  resultId: DocumentId,
  question: string,
  fileType: string = '',
  connectorName: string = '',
  docQnaSetting: string = '',
): rx.Observable<DocSummaryQnaResponse> {
  return new rx.Observable<DocSummaryQnaResponse>((subscriber) => {
    const summarizationQnaClient = createAPIv2GRPCWebPromiseClient(
      DocSummarizationApiV2,
    );
    if (resultId && question) {
      (async () => {
        try {
          for await (const res of summarizationQnaClient.getDocAnswer({
            id: resultId,
            question: question,
            fileType: fileType,
            connectorName: connectorName,
            setting: docQnaSetting,
          })) {
            if (res?.answer) {
              subscriber.next({
                answer: res.answer,
                requestId: res.requestId,
              });
            }
          }
          subscriber.complete();
        } catch (error) {
          logger.error('createDocAnswerObservable error', error);
          subscriber.error(error);
        }
      })();
    }
  });
}

export function createGenAnswersForQueryObservable(
  userQuery: string,
  experimentSetting: string = '',
  source?: string,
): rx.Observable<GetAnswersForQueryResult> {
  // If no userQuery, just return an empty Observable that completes immediately
  if (!userQuery) {
    return rx.EMPTY;
  }

  const answersGrpcProxyClient = createAPIv2GRPCWebPromiseClient(
    DocSummarizationApiV2,
  );

  async function* generator() {
    try {
      const req = answersGrpcProxyClient.genAnswersForQueryProxy({
        userQuery,
        experimentSetting,
        source: source ?? 'dash',
      });
      for await (const res of req) {
        yield res;
      }
    } catch (error) {
      // Let RxJS observe the error
      logger.error('createGenAnswersForQueryObservable error', error);
      throw error;
    }
  }

  return rx.from(generator());
}

export function createConversationResponseObservable(
  userMessage: string,
  conversationId: string,
  history?: ConversationsChatMessage[],
  documentIds?: string[],
  experimentSetting?: string,
  source?: string,
): rx.Observable<ConversationsAssistantResponse> {
  const conversationsClient = createAPIv2GRPCWebPromiseClient(
    DocSummarizationApiV2,
  );

  async function* generator() {
    try {
      const userMessageObj = new ConversationsUserMessage({
        text: userMessage,
      });

      const request = new ConversationsUserRequest({
        conversationId: conversationId,
        history: history,
        userMessage: userMessageObj,
        documentIds: documentIds,
        experimentSetting: experimentSetting,
        source: source,
      });

      for await (const res of conversationsClient.conversationsGetResponse(
        request,
      )) {
        if (res?.messages?.length > 0) {
          yield res;
        }
      }
    } catch (error) {
      logger.error('createConversationResponseObservable error', error);
      throw error;
    }
  }

  return rx.from(generator());
}
