package org.apache.pinot.query.mailbox;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.shaded.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/query/mailbox/ReceivingMailbox.class */
public class ReceivingMailbox {
    public static final int DEFAULT_MAX_PENDING_BLOCKS = 5;
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) ReceivingMailbox.class);
    private static final TransferableBlock CANCELLED_ERROR_BLOCK = TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException("Cancelled by receiver"));
    private final String _id;
    private final BlockingQueue<TransferableBlock> _blocks = new ArrayBlockingQueue(5);
    private final AtomicReference<TransferableBlock> _errorBlock = new AtomicReference<>();

    @Nullable
    private volatile Reader _reader;

    /* loaded from: input_file:org/apache/pinot/query/mailbox/ReceivingMailbox$Reader.class */
    public interface Reader {
        void blockReadyToRead();
    }

    /* loaded from: input_file:org/apache/pinot/query/mailbox/ReceivingMailbox$ReceivingMailboxStatus.class */
    public enum ReceivingMailboxStatus {
        SUCCESS,
        ERROR,
        TIMEOUT,
        EARLY_TERMINATED
    }

    public ReceivingMailbox(String str) {
        this._id = str;
    }

    public void registeredReader(Reader reader) {
        if (this._reader != null) {
            throw new IllegalArgumentException("Only one reader is supported");
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("==[MAILBOX]== Reader registered for mailbox: " + this._id);
        }
        this._reader = reader;
    }

    public String getId() {
        return this._id;
    }

    public ReceivingMailboxStatus offer(TransferableBlock transferableBlock, long j) {
        TransferableBlock transferableBlock2 = this._errorBlock.get();
        if (transferableBlock2 != null) {
            LOGGER.debug("Mailbox: {} is already cancelled or errored out, ignoring the late block", this._id);
            return transferableBlock2 == CANCELLED_ERROR_BLOCK ? ReceivingMailboxStatus.EARLY_TERMINATED : ReceivingMailboxStatus.ERROR;
        }
        if (j <= 0) {
            LOGGER.debug("Mailbox: {} is already timed out", this._id);
            setErrorBlock(TransferableBlockUtils.getErrorTransferableBlock(new TimeoutException("Timed out while offering data to mailbox: " + this._id)));
            return ReceivingMailboxStatus.TIMEOUT;
        }
        try {
            if (!this._blocks.offer(transferableBlock, j, TimeUnit.MILLISECONDS)) {
                LOGGER.debug("Failed to offer block into mailbox: {} within: {}ms", this._id, Long.valueOf(j));
                setErrorBlock(TransferableBlockUtils.getErrorTransferableBlock(new TimeoutException("Timed out while waiting for receive operator to consume data from mailbox: " + this._id)));
                return ReceivingMailboxStatus.TIMEOUT;
            }
            TransferableBlock transferableBlock3 = this._errorBlock.get();
            if (transferableBlock3 != null) {
                LOGGER.debug("Mailbox: {} is already cancelled or errored out, ignoring the late block", this._id);
                this._blocks.clear();
                return transferableBlock3 == CANCELLED_ERROR_BLOCK ? ReceivingMailboxStatus.EARLY_TERMINATED : ReceivingMailboxStatus.ERROR;
            }
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("==[MAILBOX]== Block " + transferableBlock + " ready to read from mailbox: " + this._id);
            }
            notifyReader();
            return ReceivingMailboxStatus.SUCCESS;
        } catch (InterruptedException e) {
            LOGGER.error("Interrupted while offering block into mailbox: {}", this._id);
            setErrorBlock(TransferableBlockUtils.getErrorTransferableBlock(e));
            return ReceivingMailboxStatus.ERROR;
        }
    }

    public void setErrorBlock(TransferableBlock transferableBlock) {
        if (this._errorBlock.compareAndSet(null, transferableBlock)) {
            this._blocks.clear();
            notifyReader();
        }
    }

    @Nullable
    public TransferableBlock poll() {
        Preconditions.checkState(this._reader != null, "A reader must be registered");
        TransferableBlock transferableBlock = this._errorBlock.get();
        return transferableBlock != null ? transferableBlock : this._blocks.poll();
    }

    public void cancel() {
        LOGGER.debug("Cancelling mailbox: {}", this._id);
        if (this._errorBlock.compareAndSet(null, CANCELLED_ERROR_BLOCK)) {
            this._blocks.clear();
        }
    }

    public int getNumPendingBlocks() {
        return this._blocks.size();
    }

    private void notifyReader() {
        Reader reader = this._reader;
        if (reader == null) {
            LOGGER.debug("No reader to notify");
        } else {
            LOGGER.debug("Notifying reader");
            reader.blockReadyToRead();
        }
    }
}
