package org.apache.pinot.query.mailbox.channel;

import io.grpc.Context;
import io.grpc.stub.StreamObserver;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.datablock.DataBlockUtils;
import org.apache.pinot.common.datablock.MetadataBlock;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.ReceivingMailbox;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/query/mailbox/channel/MailboxContentObserver.class */
public class MailboxContentObserver implements StreamObserver<Mailbox.MailboxContent> {
    private static final Logger LOGGER = LoggerFactory.getLogger(MailboxContentObserver.class);
    private final MailboxService _mailboxService;
    private final StreamObserver<Mailbox.MailboxStatus> _responseObserver;
    private transient ReceivingMailbox _mailbox;

    public MailboxContentObserver(MailboxService mailboxService, StreamObserver<Mailbox.MailboxStatus> streamObserver) {
        this._mailboxService = mailboxService;
        this._responseObserver = streamObserver;
    }

    public void onNext(Mailbox.MailboxContent mailboxContent) {
        TransferableBlock transferableBlock;
        String mailboxId = mailboxContent.getMailboxId();
        if (this._mailbox == null) {
            this._mailbox = this._mailboxService.getReceivingMailbox(mailboxId);
        }
        try {
            MetadataBlock dataBlock = DataBlockUtils.getDataBlock(mailboxContent.getPayload().asReadOnlyByteBuffer());
            if (dataBlock instanceof MetadataBlock) {
                Map exceptions = dataBlock.getExceptions();
                if (!exceptions.isEmpty()) {
                    this._mailbox.setErrorBlock(TransferableBlockUtils.getErrorTransferableBlock((Map<Integer, String>) exceptions));
                    return;
                }
                transferableBlock = TransferableBlockUtils.getEndOfStreamTransferableBlock(dataBlock.getStats());
            } else {
                transferableBlock = new TransferableBlock(dataBlock);
            }
            long timeRemaining = Context.current().getDeadline().timeRemaining(TimeUnit.MILLISECONDS);
            ReceivingMailbox.ReceivingMailboxStatus offer = this._mailbox.offer(transferableBlock, timeRemaining);
            switch (offer) {
                case SUCCESS:
                    this._responseObserver.onNext(Mailbox.MailboxStatus.newBuilder().setMailboxId(mailboxId).putMetadata(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY, Integer.toString(this._mailbox.getNumPendingBlocks())).build());
                    break;
                case ERROR:
                    LOGGER.warn("Mailbox: {} already errored out (received error block before)", mailboxId);
                    cancelStream();
                    break;
                case TIMEOUT:
                    LOGGER.warn("Timed out adding block into mailbox: {} with timeout: {}ms", mailboxId, Long.valueOf(timeRemaining));
                    cancelStream();
                    break;
                case EARLY_TERMINATED:
                    LOGGER.debug("Mailbox: {} has been early terminated", mailboxId);
                    onCompleted();
                    break;
                default:
                    throw new IllegalStateException("Unsupported mailbox status: " + offer);
            }
        } catch (Exception e) {
            String str = "Caught exception while processing blocks for mailbox: " + mailboxId;
            LOGGER.error(str, e);
            this._mailbox.setErrorBlock(TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException(str, e)));
            cancelStream();
        }
    }

    private void cancelStream() {
        try {
            this._responseObserver.onCompleted();
        } catch (Exception e) {
            LOGGER.debug("Caught exception cancelling mailbox: {}", this._mailbox != null ? this._mailbox.getId() : "unknown", e);
        }
    }

    public void onError(Throwable th) {
        LOGGER.warn("Error on receiver side", th);
        if (this._mailbox != null) {
            this._mailbox.setErrorBlock(TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException("Cancelled by sender", th)));
        } else {
            LOGGER.error("Got error before mailbox is set up", th);
        }
    }

    public void onCompleted() {
        try {
            this._responseObserver.onCompleted();
        } catch (Exception e) {
            LOGGER.debug("Caught exception sending complete to mailbox: {}", this._mailbox != null ? this._mailbox.getId() : "unknown", e);
        }
    }
}
