package org.apache.pinot.query.mailbox.channel;

import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.query.mailbox.GrpcMailboxService;
import org.apache.pinot.query.mailbox.GrpcReceivingMailbox;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.class */
public class MailboxContentStreamObserver implements StreamObserver<Mailbox.MailboxContent> {
    private static final Logger LOGGER = LoggerFactory.getLogger(MailboxContentStreamObserver.class);
    private static final int DEFAULT_MAILBOX_QUEUE_CAPACITY = 5;
    private static final long DEFAULT_MAILBOX_POLL_TIMEOUT = 1000;
    private final GrpcMailboxService _mailboxService;
    private final StreamObserver<Mailbox.MailboxStatus> _responseObserver;
    private final boolean _isEnabledFeedback;
    private final AtomicBoolean _isCompleted;
    private String _mailboxId;
    private ArrayBlockingQueue<Mailbox.MailboxContent> _receivingBuffer;

    public MailboxContentStreamObserver(GrpcMailboxService grpcMailboxService, StreamObserver<Mailbox.MailboxStatus> streamObserver) {
        this(grpcMailboxService, streamObserver, false);
    }

    public MailboxContentStreamObserver(GrpcMailboxService grpcMailboxService, StreamObserver<Mailbox.MailboxStatus> streamObserver, boolean z) {
        this._isCompleted = new AtomicBoolean(false);
        this._mailboxService = grpcMailboxService;
        this._responseObserver = streamObserver;
        this._receivingBuffer = new ArrayBlockingQueue<>(DEFAULT_MAILBOX_QUEUE_CAPACITY);
        this._isEnabledFeedback = z;
    }

    public Mailbox.MailboxContent poll() {
        Mailbox.MailboxContent poll;
        while (!isCompleted()) {
            try {
                poll = this._receivingBuffer.poll(DEFAULT_MAILBOX_POLL_TIMEOUT, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                LOGGER.error("Interrupt occurred while waiting for mailbox content", e);
            }
            if (poll != null) {
                return poll;
            }
        }
        return null;
    }

    public boolean isCompleted() {
        return this._isCompleted.get() && this._receivingBuffer.isEmpty();
    }

    public void onNext(Mailbox.MailboxContent mailboxContent) {
        this._mailboxId = mailboxContent.getMailboxId();
        ((GrpcReceivingMailbox) this._mailboxService.getReceivingMailbox(this._mailboxId)).init(this);
        if (mailboxContent.getMetadataMap().containsKey(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY)) {
            return;
        }
        this._receivingBuffer.offer(mailboxContent);
        if (this._isEnabledFeedback) {
            Mailbox.MailboxStatus.Builder putMetadata = Mailbox.MailboxStatus.newBuilder().setMailboxId(mailboxContent.getMailboxId()).putMetadata(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY, String.valueOf(this._receivingBuffer.remainingCapacity() - 1));
            if (mailboxContent.getMetadataMap().get(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY) != null) {
                putMetadata.putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, "true");
            }
            this._responseObserver.onNext(putMetadata.build());
        }
    }

    public void onError(Throwable th) {
        try {
            this._receivingBuffer.offer(Mailbox.MailboxContent.newBuilder().setPayload(ByteString.copyFrom(TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException(th)).toBytes())).putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, "true").build());
            throw new RuntimeException(th);
        } catch (IOException e) {
            throw new RuntimeException("Unable to encode exception for cascade reporting: " + th, e);
        }
    }

    public void onCompleted() {
        this._isCompleted.set(true);
        this._responseObserver.onCompleted();
    }
}
