package org.apache.pinot.query.mailbox.channel;

import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.query.mailbox.GrpcMailboxService;
import org.apache.pinot.query.mailbox.GrpcReceivingMailbox;
import org.apache.pinot.query.mailbox.MailboxIdentifier;
import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.class */
public class MailboxContentStreamObserver implements StreamObserver<Mailbox.MailboxContent> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) MailboxContentStreamObserver.class);
    private static final int DEFAULT_MAILBOX_QUEUE_CAPACITY = 5;
    private final GrpcMailboxService _mailboxService;
    private final StreamObserver<Mailbox.MailboxStatus> _responseObserver;
    private final boolean _isEnabledFeedback;
    private final AtomicBoolean _isCompleted;
    private final ArrayBlockingQueue<Mailbox.MailboxContent> _receivingBuffer;
    ReadWriteLock _errorLock;
    private Mailbox.MailboxContent _errorContent;
    private StringMailboxIdentifier _mailboxId;
    private Consumer<MailboxIdentifier> _gotMailCallback;

    private static Mailbox.MailboxContent createErrorContent(Throwable th) throws IOException {
        return Mailbox.MailboxContent.newBuilder().setPayload(ByteString.copyFrom(TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException(th)).getDataBlock().toBytes())).putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, "true").build();
    }

    public MailboxContentStreamObserver(GrpcMailboxService grpcMailboxService, StreamObserver<Mailbox.MailboxStatus> streamObserver) {
        this(grpcMailboxService, streamObserver, false);
    }

    public MailboxContentStreamObserver(GrpcMailboxService grpcMailboxService, StreamObserver<Mailbox.MailboxStatus> streamObserver, boolean z) {
        this._isCompleted = new AtomicBoolean(false);
        this._errorLock = new ReentrantReadWriteLock();
        this._errorContent = null;
        this._mailboxService = grpcMailboxService;
        this._responseObserver = streamObserver;
        this._receivingBuffer = new ArrayBlockingQueue<>(5);
        this._isEnabledFeedback = z;
    }

    public Mailbox.MailboxContent poll() {
        try {
            this._errorLock.readLock().lock();
            if (this._errorContent != null) {
                return this._errorContent;
            }
            if (isCompleted()) {
                return null;
            }
            return this._receivingBuffer.poll();
        } finally {
            this._errorLock.readLock().unlock();
        }
    }

    public boolean isCompleted() {
        return this._isCompleted.get() && this._receivingBuffer.isEmpty();
    }

    @Override // io.grpc.stub.StreamObserver
    public void onNext(Mailbox.MailboxContent mailboxContent) {
        this._mailboxId = new StringMailboxIdentifier(mailboxContent.getMailboxId());
        this._gotMailCallback = ((GrpcReceivingMailbox) this._mailboxService.getReceivingMailbox(this._mailboxId)).init(this);
        if (mailboxContent.getMetadataMap().containsKey(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY)) {
            return;
        }
        if (!this._receivingBuffer.offer(mailboxContent)) {
            RuntimeException runtimeException = new RuntimeException("Mailbox receivingBuffer is full:" + this._mailboxId);
            LOGGER.error(runtimeException.getMessage());
            try {
                try {
                    this._errorLock.writeLock().lock();
                    this._errorContent = createErrorContent(runtimeException);
                    this._errorLock.writeLock().unlock();
                } catch (IOException e) {
                    RuntimeException runtimeException2 = new RuntimeException("Unable to encode exception for cascade reporting: " + runtimeException, e);
                    LOGGER.error(runtimeException2.getMessage());
                    throw runtimeException2;
                }
            } catch (Throwable th) {
                this._errorLock.writeLock().unlock();
                throw th;
            }
        }
        this._gotMailCallback.accept(this._mailboxId);
        if (this._isEnabledFeedback) {
            Mailbox.MailboxStatus.Builder putMetadata = Mailbox.MailboxStatus.newBuilder().setMailboxId(mailboxContent.getMailboxId()).putMetadata(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY, String.valueOf(this._receivingBuffer.remainingCapacity() - 1));
            if (mailboxContent.getMetadataMap().get(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY) != null) {
                putMetadata.putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, "true");
            }
            this._responseObserver.onNext(putMetadata.build());
        }
    }

    @Override // io.grpc.stub.StreamObserver
    public void onError(Throwable th) {
        try {
            try {
                this._errorLock.writeLock().lock();
                this._errorContent = createErrorContent(th);
                this._gotMailCallback.accept(this._mailboxId);
                throw new RuntimeException(th);
            } catch (IOException e) {
                throw new RuntimeException("Unable to encode exception for cascade reporting: " + th, e);
            }
        } catch (Throwable th2) {
            this._errorLock.writeLock().unlock();
            throw th2;
        }
    }

    @Override // io.grpc.stub.StreamObserver
    public void onCompleted() {
        this._isCompleted.set(true);
        this._responseObserver.onCompleted();
    }
}
