package org.apache.pinot.query.runtime.operator;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.common.datatable.StatMap;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.ReceivingMailbox;
import org.apache.pinot.query.planner.physical.MailboxIdUtils;
import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
import org.apache.pinot.query.routing.MailboxInfos;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.operator.MultiStageOperator;
import org.apache.pinot.query.runtime.operator.utils.AsyncStream;
import org.apache.pinot.query.runtime.operator.utils.BlockingMultiStreamConsumer;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;

/* loaded from: input_file:org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.class */
public abstract class BaseMailboxReceiveOperator extends MultiStageOperator {
    protected final MailboxService _mailboxService;
    protected final RelDistribution.Type _distributionType;
    protected final List<String> _mailboxIds;
    protected final BlockingMultiStreamConsumer.OfTransferableBlock _multiConsumer;
    protected final List<StatMap<ReceivingMailbox.StatKey>> _receivingStats;
    protected final StatMap<StatKey> _statMap;

    /* loaded from: input_file:org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator$ReadMailboxAsyncStream.class */
    private static class ReadMailboxAsyncStream implements AsyncStream<TransferableBlock> {
        final ReceivingMailbox _mailbox;
        final BaseMailboxReceiveOperator _operator;

        ReadMailboxAsyncStream(ReceivingMailbox receivingMailbox, BaseMailboxReceiveOperator baseMailboxReceiveOperator) {
            this._mailbox = receivingMailbox;
            this._operator = baseMailboxReceiveOperator;
        }

        @Override // org.apache.pinot.query.runtime.operator.utils.AsyncStream
        public Object getId() {
            return this._mailbox.getId();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.pinot.query.runtime.operator.utils.AsyncStream
        @Nullable
        public TransferableBlock poll() {
            TransferableBlock poll = this._mailbox.poll();
            if (poll != null && poll.isSuccessfulEndOfStreamBlock()) {
                this._operator._mailboxService.releaseReceivingMailbox(this._mailbox);
            }
            return poll;
        }

        @Override // org.apache.pinot.query.runtime.operator.utils.AsyncStream
        public void addOnNewDataListener(AsyncStream.OnNewData onNewData) {
            ReceivingMailbox receivingMailbox = this._mailbox;
            Objects.requireNonNull(onNewData);
            receivingMailbox.registeredReader(onNewData::newDataAvailable);
        }

        @Override // org.apache.pinot.query.runtime.operator.utils.AsyncStream
        public void earlyTerminate() {
            this._mailbox.earlyTerminate();
        }

        @Override // org.apache.pinot.query.runtime.operator.utils.AsyncStream
        public void cancel() {
            this._mailbox.cancel();
        }
    }

    /* loaded from: input_file:org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator$StatKey.class */
    public enum StatKey implements StatMap.Key {
        EXECUTION_TIME_MS(StatMap.Type.LONG) { // from class: org.apache.pinot.query.runtime.operator.BaseMailboxReceiveOperator.StatKey.1
            public boolean includeDefaultInJson() {
                return true;
            }
        },
        EMITTED_ROWS(StatMap.Type.LONG) { // from class: org.apache.pinot.query.runtime.operator.BaseMailboxReceiveOperator.StatKey.2
            public boolean includeDefaultInJson() {
                return true;
            }
        },
        FAN_IN(StatMap.Type.INT) { // from class: org.apache.pinot.query.runtime.operator.BaseMailboxReceiveOperator.StatKey.3
            public int merge(int i, int i2) {
                return Math.max(i, i2);
            }
        },
        IN_MEMORY_MESSAGES(StatMap.Type.INT),
        RAW_MESSAGES(StatMap.Type.INT),
        DESERIALIZED_BYTES(StatMap.Type.LONG),
        DESERIALIZATION_TIME_MS(StatMap.Type.LONG),
        DOWNSTREAM_WAIT_MS(StatMap.Type.LONG),
        UPSTREAM_WAIT_MS(StatMap.Type.LONG);

        private final StatMap.Type _type;

        StatKey(StatMap.Type type) {
            this._type = type;
        }

        public StatMap.Type getType() {
            return this._type;
        }
    }

    public BaseMailboxReceiveOperator(OpChainExecutionContext opChainExecutionContext, MailboxReceiveNode mailboxReceiveNode) {
        super(opChainExecutionContext);
        this._statMap = new StatMap<>(StatKey.class);
        this._mailboxService = opChainExecutionContext.getMailboxService();
        RelDistribution.Type distributionType = mailboxReceiveNode.getDistributionType();
        Preconditions.checkState(MailboxSendOperator.SUPPORTED_EXCHANGE_TYPES.contains(distributionType), "Unsupported exchange type: %s", distributionType);
        this._distributionType = distributionType;
        long requestId = opChainExecutionContext.getRequestId();
        int senderStageId = mailboxReceiveNode.getSenderStageId();
        MailboxInfos mailboxInfos = (MailboxInfos) opChainExecutionContext.getWorkerMetadata().getMailboxInfosMap().get(Integer.valueOf(senderStageId));
        if (mailboxInfos != null) {
            this._mailboxIds = MailboxIdUtils.toMailboxIds(requestId, senderStageId, mailboxInfos.getMailboxInfos(), opChainExecutionContext.getStageId(), opChainExecutionContext.getWorkerId());
            int size = this._mailboxIds.size();
            ArrayList arrayList = new ArrayList(size);
            this._receivingStats = new ArrayList(size);
            Iterator<String> it = this._mailboxIds.iterator();
            while (it.hasNext()) {
                ReadMailboxAsyncStream readMailboxAsyncStream = new ReadMailboxAsyncStream(this._mailboxService.getReceivingMailbox(it.next()), this);
                arrayList.add(readMailboxAsyncStream);
                this._receivingStats.add(readMailboxAsyncStream._mailbox.getStatMap());
            }
            this._multiConsumer = new BlockingMultiStreamConsumer.OfTransferableBlock(opChainExecutionContext, arrayList);
        } else {
            this._mailboxIds = List.of();
            this._receivingStats = List.of();
            this._multiConsumer = new BlockingMultiStreamConsumer.OfTransferableBlock(opChainExecutionContext, List.of());
        }
        this._statMap.merge(StatKey.FAN_IN, this._mailboxIds.size());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pinot.query.runtime.operator.MultiStageOperator
    public void earlyTerminate() {
        this._isEarlyTerminated = true;
        this._multiConsumer.earlyTerminate();
    }

    @Override // org.apache.pinot.query.runtime.operator.MultiStageOperator
    public MultiStageOperator.Type getOperatorType() {
        return MultiStageOperator.Type.MAILBOX_RECEIVE;
    }

    @Override // org.apache.pinot.query.runtime.operator.MultiStageOperator
    public List<MultiStageOperator> getChildOperators() {
        return List.of();
    }

    @Override // org.apache.pinot.query.runtime.operator.MultiStageOperator, java.lang.AutoCloseable
    public void close() {
        super.close();
        this._multiConsumer.close();
    }

    @Override // org.apache.pinot.query.runtime.operator.MultiStageOperator
    public void cancel(Throwable th) {
        super.cancel(th);
        this._multiConsumer.cancel(th);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pinot.query.runtime.operator.MultiStageOperator
    public TransferableBlock updateEosBlock(TransferableBlock transferableBlock, StatMap<?> statMap) {
        Iterator<StatMap<ReceivingMailbox.StatKey>> it = this._receivingStats.iterator();
        while (it.hasNext()) {
            addReceivingStats(it.next());
        }
        return super.updateEosBlock(transferableBlock, statMap);
    }

    @Override // org.apache.pinot.query.runtime.operator.MultiStageOperator
    public void registerExecution(long j, int i) {
        this._statMap.merge(StatKey.EXECUTION_TIME_MS, j);
        this._statMap.merge(StatKey.EMITTED_ROWS, i);
    }

    private void addReceivingStats(StatMap<ReceivingMailbox.StatKey> statMap) {
        this._statMap.merge(StatKey.RAW_MESSAGES, statMap.getInt(ReceivingMailbox.StatKey.DESERIALIZED_MESSAGES));
        this._statMap.merge(StatKey.DESERIALIZED_BYTES, statMap.getLong(ReceivingMailbox.StatKey.DESERIALIZED_BYTES));
        this._statMap.merge(StatKey.DESERIALIZATION_TIME_MS, statMap.getLong(ReceivingMailbox.StatKey.DESERIALIZATION_TIME_MS));
        this._statMap.merge(StatKey.IN_MEMORY_MESSAGES, statMap.getInt(ReceivingMailbox.StatKey.IN_MEMORY_MESSAGES));
        this._statMap.merge(StatKey.DOWNSTREAM_WAIT_MS, statMap.getLong(ReceivingMailbox.StatKey.OFFER_CPU_TIME_MS));
        this._statMap.merge(StatKey.UPSTREAM_WAIT_MS, statMap.getLong(ReceivingMailbox.StatKey.WAIT_CPU_TIME_MS));
    }
}
