import { Observable } from 'rxjs';

export function deferReadableStream<T>(
  handler: (signal: AbortSignal) => Promise<ReadableStream<T>>,
): Observable<T> {
  return new Observable<T>((observer) => {
    const controller = new AbortController();
    const { signal } = controller;

    observer.add(() => {
      controller.abort();
    });

    handler(signal)
      .then(async (stream) => {
        const reader = stream.getReader();

        observer.add(() => {
          void reader.cancel();
        });

        // eslint-disable-next-line ts/no-unnecessary-condition
        while (true) {
          const result = await reader.read();

          if (result.done) {
            break;
          }

          observer.next(result.value);
        }

        observer.complete();
      })
      .catch((err: unknown) => {
        observer.error(err);
      });
  });
}
