package org.apache.pinot.query.runtime.operator;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.mailbox.MailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.ReceivingMailbox;
import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.shaded.com.google.common.base.Preconditions;
import org.apache.pinot.shaded.com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.class */
public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> {
    private static final String EXPLAIN_NAME = "MAILBOX_RECEIVE";
    private final MailboxService<TransferableBlock> _mailboxService;
    private final RelDistribution.Type _exchangeType;
    private final List<MailboxIdentifier> _sendingMailbox;
    private final long _deadlineTimestampNano;
    private int _serverIdx;
    private TransferableBlock _upstreamErrorBlock;
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) MailboxReceiveOperator.class);
    private static final Set<RelDistribution.Type> SUPPORTED_EXCHANGE_TYPES = ImmutableSet.of(RelDistribution.Type.BROADCAST_DISTRIBUTED, RelDistribution.Type.HASH_DISTRIBUTED, RelDistribution.Type.SINGLETON, RelDistribution.Type.RANDOM_DISTRIBUTED);

    private static MailboxIdentifier toMailboxId(ServerInstance serverInstance, long j, long j2, String str, int i) {
        return new StringMailboxIdentifier(String.format("%s_%s", Long.valueOf(j), Long.valueOf(j2)), serverInstance.getHostname(), serverInstance.getQueryMailboxPort(), str, i);
    }

    public MailboxReceiveOperator(MailboxService<TransferableBlock> mailboxService, List<ServerInstance> list, RelDistribution.Type type, String str, int i, long j, int i2, Long l) {
        this._mailboxService = mailboxService;
        Preconditions.checkState(SUPPORTED_EXCHANGE_TYPES.contains(type), "Exchange/Distribution type: " + type + " is not supported!");
        this._deadlineTimestampNano = ((l != null ? l.longValue() : 10000L) * 1000000) + System.nanoTime();
        this._exchangeType = type;
        if (this._exchangeType == RelDistribution.Type.SINGLETON) {
            ServerInstance serverInstance = null;
            for (ServerInstance serverInstance2 : list) {
                if (serverInstance2.getHostname().equals(this._mailboxService.getHostname()) && serverInstance2.getQueryMailboxPort() == this._mailboxService.getMailboxPort()) {
                    Preconditions.checkState(serverInstance == null, "multiple instance found for singleton exchange type!");
                    serverInstance = serverInstance2;
                }
            }
            if (serverInstance == null) {
                this._sendingMailbox = Collections.emptyList();
            } else {
                this._sendingMailbox = Collections.singletonList(toMailboxId(serverInstance, j, i2, str, i));
            }
        } else {
            this._sendingMailbox = new ArrayList(list.size());
            Iterator<ServerInstance> it2 = list.iterator();
            while (it2.hasNext()) {
                this._sendingMailbox.add(toMailboxId(it2.next(), j, i2, str, i));
            }
        }
        this._upstreamErrorBlock = null;
        this._serverIdx = 0;
    }

    public List<MailboxIdentifier> getSendingMailbox() {
        return this._sendingMailbox;
    }

    @Override // org.apache.pinot.core.common.Operator
    public List<Operator> getChildOperators() {
        return null;
    }

    @Override // org.apache.pinot.core.common.Operator
    @Nullable
    public String toExplainString() {
        return EXPLAIN_NAME;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.pinot.core.operator.BaseOperator
    /* renamed from: getNextBlock */
    public TransferableBlock getNextBlock2() {
        if (this._upstreamErrorBlock != null) {
            return this._upstreamErrorBlock;
        }
        if (System.nanoTime() >= this._deadlineTimestampNano) {
            LOGGER.error("Timed out after polling mailboxes: {}", this._sendingMailbox);
            return TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
        }
        int i = this._serverIdx;
        int i2 = 0;
        int i3 = 0;
        for (int i4 = 0; i4 < this._sendingMailbox.size(); i4++) {
            this._serverIdx = (i + i4) % this._sendingMailbox.size();
            MailboxIdentifier mailboxIdentifier = this._sendingMailbox.get(this._serverIdx);
            try {
                ReceivingMailbox<TransferableBlock> receivingMailbox = this._mailboxService.getReceivingMailbox(mailboxIdentifier);
                if (!receivingMailbox.isClosed()) {
                    i2++;
                    TransferableBlock receive = receivingMailbox.receive();
                    if (receive != null) {
                        if (receive.isErrorBlock()) {
                            this._upstreamErrorBlock = TransferableBlockUtils.getErrorTransferableBlock(receive.getDataBlock().getExceptions());
                            return this._upstreamErrorBlock;
                        }
                        if (!receive.isEndOfStreamBlock()) {
                            return receive;
                        }
                        i3++;
                    }
                }
            } catch (Exception e) {
                LOGGER.error(String.format("Error receiving data from mailbox %s", mailboxIdentifier), (Throwable) e);
            }
        }
        return (i2 <= 0 || i2 <= i3) ? TransferableBlockUtils.getEndOfStreamTransferableBlock() : TransferableBlockUtils.getNoOpTransferableBlock();
    }
}
