import type {
  Message,
  MethodInfo,
  PartialMessage,
  ServiceType,
} from '@bufbuild/protobuf';
import type { Transport } from '@connectrpc/connect';
import { Observable } from 'rxjs';

import type { ReactiveTransportOptions } from './reactive-transport-options';

export function unary<I extends Message<I>, O extends Message<O>>(
  transport: Transport,
  service: ServiceType,
  method: MethodInfo<I, O>,
  input: PartialMessage<I>,
  { headers, onHeader, onTrailer }: ReactiveTransportOptions = {},
): Observable<O> {
  return new Observable<O>((subscriber) => {
    const controller = new AbortController();

    transport
      .unary(service, method, controller.signal, undefined, headers, input)
      .then(({ header, message, trailer }) => {
        onHeader?.(header);
        subscriber.next(message);
        onTrailer?.(trailer);
      })
      .then(
        () => {
          if (subscriber.closed) {
            return;
          }

          subscriber.complete();
        },
        (error) => {
          if (subscriber.closed) {
            return;
          }

          subscriber.error(error);
        },
      );

    return () => {
      controller.abort();
    };
  });
}
