import * as rx from 'rxjs';
import * as op from 'rxjs/operators';

import type { Observable } from 'rxjs';

export type Stats = {
  completed: boolean;
  duration: number;
  emissions: number;
  failed: boolean;
  first: number;
  last: number;
  cancelled: boolean;
};

export default function instrument<R>(
  wrapping: Observable<R>,
  callback: (stats: Stats) => void,
): Observable<R> {
  return rx.defer(() => {
    const start = performance.now();
    let emissions = 0;
    let first = -1;
    let last = -1;
    let failed = false;
    let completed = false;

    return wrapping
      .pipe(
        op.tap(
          () => {
            const delta = performance.now() - start;
            emissions++;
            last = delta;
            if (first === -1) first = delta;
          },
          () => (failed = true),
          () => (completed = true),
        ),
      )
      .pipe(
        op.finalize(() => {
          const end = performance.now();
          callback({
            completed,
            failed,
            duration: end - start,
            emissions,
            first,
            last,
            cancelled: !completed && !failed,
          });
        }),
      );
  });
}
