import type { DescMethod, MessageShape } from '@bufbuild/protobuf';
import { fromBinary, toBinary } from '@bufbuild/protobuf';
import type { Observable } from 'rxjs';
import { concatMap, startWith } from 'rxjs';

import type { GrpcContext, GrpcValue } from '@up/grpc';
import {
  Code,
  decodeTrailer,
  Envelope,
  EnvelopeDecodeStream,
  GRPC_TRAILER,
  GrpcError,
  GrpcMessageValue,
  GrpcTrailerError,
  GrpcTransport,
  MESSAGE_FLAGS,
  toEnvelopeChunk,
  TRAILER_FLAGS,
} from '@up/grpc';
import { deferReadableStream } from '@up/rx';
import { flat } from '@up/utils';

import { FETCH_ORIGIN } from './fetch-origin.js';
import type { FetchTransportOptions } from './fetch-transport-options.js';
import { GrpcStatusHttpError } from './grpc-status-http-error.js';

const DEFAULT_HEADERS = new Headers({
  // eslint-disable-next-line ts/naming-convention
  Accept: 'application/grpc-web+proto',
  // eslint-disable-next-line ts/naming-convention
  'Content-Type': 'application/grpc-web+proto',
});

export class FetchTransport extends GrpcTransport {
  private readonly origin: string;
  private readonly headers: Headers;
  private readonly fetch: typeof globalThis.fetch;

  constructor(
    origin: string,
    {
      headers = new Headers(),
      fetch = globalThis.fetch,
    }: FetchTransportOptions = {},
  ) {
    super();
    this.origin = origin;
    this.headers = headers;
    this.fetch = fetch.bind(globalThis);
  }

  lowLevelSummon<M extends DescMethod>(
    method: M,
    input: NoInfer<MessageShape<M['input']>>,
    context: GrpcContext,
  ): Observable<GrpcValue<M['output']>> {
    return deferReadableStream<Envelope[]>(async (signal) => {
      const url = new URL(
        `${method.parent.typeName}/${method.name}`,
        this.origin,
      );

      const headers = new Headers([
        ...DEFAULT_HEADERS,
        ...this.headers,
        ...flat<[string, string]>(context.getValues(GRPC_TRAILER)),
      ]);

      const body = toEnvelopeChunk(
        new Envelope(MESSAGE_FLAGS, toBinary(method.input, input)),
      );

      const response = await this.fetch(url, {
        method: 'POST',
        headers,
        body,
        signal,
      });

      GrpcTrailerError.assert(response.headers);
      GrpcStatusHttpError.assert(response.status);

      if (response.body === null) {
        throw new GrpcError(Code.UNKNOWN, 'response.body is empty');
      }

      return response.body.pipeThrough(new EnvelopeDecodeStream());
    }).pipe(
      concatMap((envelops) =>
        envelops.map<GrpcValue<M['output']>>(({ flags, bytes }) => {
          switch (flags) {
            case MESSAGE_FLAGS:
              return new GrpcMessageValue<M['output']>(
                fromBinary<M['output']>(method.output, bytes),
              );
            case TRAILER_FLAGS:
              return GRPC_TRAILER.provide(decodeTrailer(bytes));
            default:
              throw new Error('Not implemented');
          }
        }),
      ),
      startWith(FETCH_ORIGIN.provide(this.origin)),
    );
  }
}
