package org.apache.pinot.query.runtime.operator.utils;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.utils.AsyncStream;
import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.class */
public abstract class BlockingMultiStreamConsumer<E> implements AutoCloseable {
    private static final Logger LOGGER;
    private final Object _id;
    protected final List<? extends AsyncStream<E>> _mailboxes;
    private final long _deadlineMs;
    protected int _lastRead;
    static final /* synthetic */ boolean $assertionsDisabled;
    protected final ArrayBlockingQueue<Boolean> _newDataReady = new ArrayBlockingQueue<>(1);
    private E _errorBlock = null;

    /* loaded from: input_file:org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer$OfTransferableBlock.class */
    public static class OfTransferableBlock extends BlockingMultiStreamConsumer<TransferableBlock> {
        private final int _stageId;

        @Nullable
        private MultiStageQueryStats _stats;

        public OfTransferableBlock(OpChainExecutionContext opChainExecutionContext, List<? extends AsyncStream<TransferableBlock>> list) {
            super(opChainExecutionContext.getId(), opChainExecutionContext.getDeadlineMs(), list);
            this._stageId = opChainExecutionContext.getStageId();
            this._stats = MultiStageQueryStats.emptyStats(opChainExecutionContext.getStageId());
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.pinot.query.runtime.operator.utils.BlockingMultiStreamConsumer
        public boolean isError(TransferableBlock transferableBlock) {
            return transferableBlock.isErrorBlock();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.pinot.query.runtime.operator.utils.BlockingMultiStreamConsumer
        public boolean isEos(TransferableBlock transferableBlock) {
            return transferableBlock.isSuccessfulEndOfStreamBlock();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.pinot.query.runtime.operator.utils.BlockingMultiStreamConsumer
        public void onMailboxEnd(TransferableBlock transferableBlock) {
            try {
                MultiStageQueryStats multiStageQueryStats = this._stats;
                if (multiStageQueryStats != null) {
                    if (transferableBlock.getQueryStats() != null) {
                        multiStageQueryStats.mergeUpstream(transferableBlock.getQueryStats(), true);
                    } else {
                        multiStageQueryStats.mergeUpstream(transferableBlock.getSerializedStatsByStage(), true);
                    }
                }
            } catch (Exception e) {
                BlockingMultiStreamConsumer.LOGGER.warn("Error merging stats", e);
                this._stats = null;
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.pinot.query.runtime.operator.utils.BlockingMultiStreamConsumer
        public TransferableBlock onTimeout() {
            String str = "Timed out on stage " + this._stageId + " waiting for data sent by a child stage";
            BlockingMultiStreamConsumer.LOGGER.debug(str);
            return TransferableBlockUtils.getErrorTransferableBlock((Exception) QueryErrorCode.EXECUTION_TIMEOUT.asException(str));
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.pinot.query.runtime.operator.utils.BlockingMultiStreamConsumer
        public TransferableBlock onException(Exception exc) {
            String str = "Found an error on stage " + this._stageId + " while reading from a child stage";
            BlockingMultiStreamConsumer.LOGGER.warn(str, exc);
            return TransferableBlockUtils.getErrorTransferableBlock((Exception) QueryErrorCode.INTERNAL.asException(str));
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.pinot.query.runtime.operator.utils.BlockingMultiStreamConsumer
        public TransferableBlock onEos() {
            MultiStageQueryStats multiStageQueryStats = this._stats;
            if (multiStageQueryStats == null) {
                multiStageQueryStats = MultiStageQueryStats.emptyStats(this._stageId);
            }
            return TransferableBlockUtils.getEndOfStreamTransferableBlock(multiStageQueryStats);
        }
    }

    public BlockingMultiStreamConsumer(Object obj, long j, List<? extends AsyncStream<E>> list) {
        this._id = obj;
        this._deadlineMs = j;
        AsyncStream.OnNewData onNewData = this::onData;
        this._mailboxes = list;
        this._mailboxes.forEach(asyncStream -> {
            asyncStream.addOnNewDataListener(onNewData);
        });
        this._lastRead = this._mailboxes.size() - 1;
    }

    protected abstract boolean isError(E e);

    protected abstract boolean isEos(E e);

    protected abstract void onMailboxEnd(E e);

    protected abstract E onTimeout();

    protected abstract E onException(Exception exc);

    protected abstract E onEos();

    @Override // java.lang.AutoCloseable
    public void close() {
        cancelRemainingMailboxes();
    }

    public void cancel(Throwable th) {
        cancelRemainingMailboxes();
    }

    public void earlyTerminate() {
        Iterator<? extends AsyncStream<E>> it = this._mailboxes.iterator();
        while (it.hasNext()) {
            it.next().earlyTerminate();
        }
    }

    protected void cancelRemainingMailboxes() {
        Iterator<? extends AsyncStream<E>> it = this._mailboxes.iterator();
        while (it.hasNext()) {
            it.next().cancel();
        }
    }

    protected void onData() {
        if (this._newDataReady.offer(Boolean.TRUE)) {
            if (LOGGER.isTraceEnabled()) {
                LOGGER.trace("New data notification delivered on " + String.valueOf(this._id) + ". " + System.identityHashCode(this._newDataReady));
            }
        } else if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("New data notification ignored on " + String.valueOf(this._id) + ". " + System.identityHashCode(this._newDataReady));
        }
    }

    public E readBlockBlocking() {
        E readDroppingSuccessEos;
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("==[RECEIVE]== Enter getNextBlock from: " + String.valueOf(this._id) + ". Mailboxes: " + ((String) this._mailboxes.stream().map((v0) -> {
                return v0.getId();
            }).map((v0) -> {
                return v0.toString();
            }).collect(Collectors.joining(","))));
        }
        E readDroppingSuccessEos2 = readDroppingSuccessEos();
        if (readDroppingSuccessEos2 != null) {
            return readDroppingSuccessEos2;
        }
        do {
            try {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("==[RECEIVE]== Blocked on : " + String.valueOf(this._id) + ". " + System.identityHashCode(this._newDataReady));
                }
                if (this._newDataReady.poll(this._deadlineMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS) == null) {
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.warn("==[RECEIVE]== Timeout on: " + String.valueOf(this._id));
                    }
                    this._errorBlock = onTimeout();
                    return this._errorBlock;
                }
                LOGGER.debug("==[RECEIVE]== More data available. Trying to read again");
                readDroppingSuccessEos = readDroppingSuccessEos();
            } catch (InterruptedException e) {
                return onException(e);
            }
        } while (readDroppingSuccessEos == null);
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("==[RECEIVE]== Ready to emit on: " + String.valueOf(this._id));
        }
        return readDroppingSuccessEos;
    }

    @Nullable
    private E readDroppingSuccessEos() {
        E e;
        if (System.currentTimeMillis() > this._deadlineMs) {
            this._errorBlock = onTimeout();
            return this._errorBlock;
        }
        E readBlockOrNull = readBlockOrNull();
        while (true) {
            e = readBlockOrNull;
            if (e == null || !isEos(e)) {
                break;
            }
            if (!$assertionsDisabled && this._mailboxes.isEmpty()) {
                throw new AssertionError("readBlockOrNull should return null when there are no mailboxes");
            }
            AsyncStream<E> remove = this._mailboxes.remove(this._lastRead);
            this._lastRead--;
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("==[RECEIVE]== EOS received : " + String.valueOf(this._id) + " in mailbox: " + String.valueOf(remove.getId()) + " (mailboxes alive: " + ((String) this._mailboxes.stream().map((v0) -> {
                    return v0.getId();
                }).map((v0) -> {
                    return v0.toString();
                }).collect(Collectors.joining(","))) + ")");
            }
            onMailboxEnd(e);
            readBlockOrNull = readBlockOrNull();
        }
        if (this._mailboxes.isEmpty()) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("==[RECEIVE]== Finished : " + String.valueOf(this._id));
            }
            return onEos();
        }
        if (e != null) {
            if (LOGGER.isTraceEnabled()) {
                LOGGER.trace("==[RECEIVE]== Returned block from : " + String.valueOf(this._id) + " in mailbox: " + String.valueOf(this._mailboxes.get(this._lastRead).getId()));
            }
            if (isError(e)) {
                LOGGER.info("==[RECEIVE]== Error block found from : " + String.valueOf(this._id) + " in mailbox " + String.valueOf(this._mailboxes.get(this._lastRead).getId()));
                this._errorBlock = e;
            }
        }
        return e;
    }

    @Nullable
    private E readBlockOrNull() {
        for (int i = this._lastRead + 1; i < this._mailboxes.size(); i++) {
            E poll = this._mailboxes.get(i).poll();
            if (poll != null) {
                this._lastRead = i;
                return poll;
            }
        }
        for (int i2 = 0; i2 <= this._lastRead; i2++) {
            E poll2 = this._mailboxes.get(i2).poll();
            if (poll2 != null) {
                this._lastRead = i2;
                return poll2;
            }
        }
        return null;
    }

    static {
        $assertionsDisabled = !BlockingMultiStreamConsumer.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger(BlockingMultiStreamConsumer.class);
    }
}
