package org.apache.pinot.query.runtime.operator;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.pinot.query.mailbox.MailboxIdUtils;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.routing.MailboxMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.pinot.shaded.com.google.common.base.Preconditions;
import org.apache.pinot.shaded.com.google.common.collect.ImmutableSet;
import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/query/runtime/operator/MailboxSendOperator.class */
public class MailboxSendOperator extends MultiStageOperator {
    public static final Set<RelDistribution.Type> SUPPORTED_EXCHANGE_TYPES = ImmutableSet.of(RelDistribution.Type.SINGLETON, RelDistribution.Type.RANDOM_DISTRIBUTED, RelDistribution.Type.BROADCAST_DISTRIBUTED, RelDistribution.Type.HASH_DISTRIBUTED);
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) MailboxSendOperator.class);
    private static final String EXPLAIN_NAME = "MAILBOX_SEND";
    private final MultiStageOperator _sourceOperator;
    private final BlockExchange _exchange;
    private final List<RexExpression> _collationKeys;
    private final List<RelFieldCollation.Direction> _collationDirections;
    private final boolean _isSortOnSender;

    public MailboxSendOperator(OpChainExecutionContext opChainExecutionContext, MultiStageOperator multiStageOperator, RelDistribution.Type type, KeySelector<Object[], Object[]> keySelector, @Nullable List<RexExpression> list, @Nullable List<RelFieldCollation.Direction> list2, boolean z, int i) {
        this(opChainExecutionContext, multiStageOperator, getBlockExchange(opChainExecutionContext, type, keySelector, i), list, list2, z);
    }

    @VisibleForTesting
    MailboxSendOperator(OpChainExecutionContext opChainExecutionContext, MultiStageOperator multiStageOperator, BlockExchange blockExchange, @Nullable List<RexExpression> list, @Nullable List<RelFieldCollation.Direction> list2, boolean z) {
        super(opChainExecutionContext);
        this._sourceOperator = multiStageOperator;
        this._exchange = blockExchange;
        this._collationKeys = list;
        this._collationDirections = list2;
        this._isSortOnSender = z;
    }

    private static BlockExchange getBlockExchange(OpChainExecutionContext opChainExecutionContext, RelDistribution.Type type, KeySelector<Object[], Object[]> keySelector, int i) {
        Preconditions.checkState(SUPPORTED_EXCHANGE_TYPES.contains(type), "Unsupported exchange type: %s", type);
        MailboxService mailboxService = opChainExecutionContext.getMailboxService();
        long requestId = opChainExecutionContext.getRequestId();
        long deadlineMs = opChainExecutionContext.getDeadlineMs();
        MailboxMetadata mailboxMetadata = opChainExecutionContext.getStageMetadata().getWorkerMetadataList().get(opChainExecutionContext.getServer().workerId()).getMailBoxInfosMap().get(Integer.valueOf(i));
        List<String> mailboxIds = MailboxIdUtils.toMailboxIds(requestId, mailboxMetadata);
        ArrayList arrayList = new ArrayList(mailboxIds.size());
        for (int i2 = 0; i2 < mailboxIds.size(); i2++) {
            arrayList.add(mailboxService.getSendingMailbox(mailboxMetadata.getVirtualAddress(i2).hostname(), mailboxMetadata.getVirtualAddress(i2).port(), mailboxIds.get(i2), deadlineMs));
        }
        return BlockExchange.getExchange(arrayList, type, keySelector, TransferableBlockUtils::splitBlock);
    }

    @Override // org.apache.pinot.query.runtime.operator.MultiStageOperator, org.apache.pinot.core.common.Operator
    public List<MultiStageOperator> getChildOperators() {
        return Collections.singletonList(this._sourceOperator);
    }

    @Override // org.apache.pinot.core.common.Operator
    @Nullable
    public String toExplainString() {
        return EXPLAIN_NAME;
    }

    @Override // org.apache.pinot.query.runtime.operator.MultiStageOperator
    protected TransferableBlock getNextBlock() {
        try {
            TransferableBlock nextBlock = this._sourceOperator.nextBlock();
            if (nextBlock.isSuccessfulEndOfStreamBlock()) {
                sendTransferableBlock(TransferableBlockUtils.getEndOfStreamTransferableBlock(OperatorUtils.getMetadataFromOperatorStats(this._opChainStats.getOperatorStatsMap())));
            } else {
                sendTransferableBlock(nextBlock);
            }
            return nextBlock;
        } catch (TimeoutException e) {
            LOGGER.warn("Timed out transferring data on opChain: {}", this._context.getId(), e);
            return TransferableBlockUtils.getErrorTransferableBlock(e);
        } catch (EarlyTerminationException e2) {
            LOGGER.debug("Early terminating opChain: {}", this._context.getId());
            return TransferableBlockUtils.getEndOfStreamTransferableBlock();
        } catch (Exception e3) {
            TransferableBlock errorTransferableBlock = TransferableBlockUtils.getErrorTransferableBlock(e3);
            try {
                LOGGER.error("Exception while transferring data on opChain: {}", this._context.getId(), e3);
                sendTransferableBlock(errorTransferableBlock);
            } catch (Exception e4) {
                LOGGER.error("Exception while sending error block.", (Throwable) e4);
            }
            return errorTransferableBlock;
        }
    }

    private void sendTransferableBlock(TransferableBlock transferableBlock) throws Exception {
        this._exchange.send(transferableBlock);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("==[SEND]== Block " + transferableBlock + " sent from: " + this._context.getId());
        }
    }

    @Override // org.apache.pinot.query.runtime.operator.MultiStageOperator
    protected boolean shouldCollectStats() {
        return true;
    }

    @Override // org.apache.pinot.query.runtime.operator.MultiStageOperator, java.lang.AutoCloseable
    public void close() {
        super.close();
        this._exchange.close();
    }

    @Override // org.apache.pinot.query.runtime.operator.MultiStageOperator
    public void cancel(Throwable th) {
        super.cancel(th);
        this._exchange.cancel(th);
    }
}
