package org.apache.pinot.query.runtime.operator;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.mailbox.MailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.runtime.blocks.BlockSplitter;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
import org.apache.pinot.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.pinot.shaded.com.google.common.base.Preconditions;
import org.apache.pinot.shaded.com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/query/runtime/operator/MailboxSendOperator.class */
public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
    private static final String EXPLAIN_NAME = "MAILBOX_SEND";
    private final Operator<TransferableBlock> _dataTableBlockBaseOperator;
    private final BlockExchange _exchange;
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) MailboxSendOperator.class);
    private static final Set<RelDistribution.Type> SUPPORTED_EXCHANGE_TYPE = ImmutableSet.of(RelDistribution.Type.SINGLETON, RelDistribution.Type.RANDOM_DISTRIBUTED, RelDistribution.Type.BROADCAST_DISTRIBUTED, RelDistribution.Type.HASH_DISTRIBUTED);

    @VisibleForTesting
    /* loaded from: input_file:org/apache/pinot/query/runtime/operator/MailboxSendOperator$BlockExchangeFactory.class */
    interface BlockExchangeFactory {
        BlockExchange build(MailboxService<TransferableBlock> mailboxService, List<MailboxIdentifier> list, RelDistribution.Type type, KeySelector<Object[], Object[]> keySelector, BlockSplitter blockSplitter);
    }

    @VisibleForTesting
    /* loaded from: input_file:org/apache/pinot/query/runtime/operator/MailboxSendOperator$MailboxIdGenerator.class */
    interface MailboxIdGenerator {
        MailboxIdentifier generate(ServerInstance serverInstance);
    }

    public MailboxSendOperator(MailboxService<TransferableBlock> mailboxService, Operator<TransferableBlock> operator, List<ServerInstance> list, RelDistribution.Type type, KeySelector<Object[], Object[]> keySelector, String str, int i, long j, int i2) {
        this(mailboxService, operator, list, type, keySelector, serverInstance -> {
            return toMailboxId(serverInstance, j, i2, str, i);
        }, BlockExchange::getExchange);
    }

    @VisibleForTesting
    MailboxSendOperator(MailboxService<TransferableBlock> mailboxService, Operator<TransferableBlock> operator, List<ServerInstance> list, RelDistribution.Type type, KeySelector<Object[], Object[]> keySelector, MailboxIdGenerator mailboxIdGenerator, BlockExchangeFactory blockExchangeFactory) {
        List<MailboxIdentifier> list2;
        this._dataTableBlockBaseOperator = operator;
        if (type == RelDistribution.Type.SINGLETON) {
            ServerInstance serverInstance = null;
            for (ServerInstance serverInstance2 : list) {
                if (serverInstance2.getHostname().equals(mailboxService.getHostname()) && serverInstance2.getQueryMailboxPort() == mailboxService.getMailboxPort()) {
                    Preconditions.checkState(serverInstance == null, "multiple instance found for singleton exchange type!");
                    serverInstance = serverInstance2;
                }
            }
            Preconditions.checkNotNull(serverInstance, "Unable to find receiving instance for singleton exchange");
            list2 = Collections.singletonList(mailboxIdGenerator.generate(serverInstance));
        } else {
            Stream<ServerInstance> stream = list.stream();
            Objects.requireNonNull(mailboxIdGenerator);
            list2 = (List) stream.map(mailboxIdGenerator::generate).collect(Collectors.toList());
        }
        this._exchange = blockExchangeFactory.build(mailboxService, list2, type, keySelector, TransferableBlockUtils::splitBlock);
        Preconditions.checkState(SUPPORTED_EXCHANGE_TYPE.contains(type), String.format("Exchange type '%s' is not supported yet", type));
    }

    @Override // org.apache.pinot.core.common.Operator
    public List<Operator> getChildOperators() {
        return null;
    }

    @Override // org.apache.pinot.core.common.Operator
    @Nullable
    public String toExplainString() {
        return EXPLAIN_NAME;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.pinot.core.operator.BaseOperator
    /* renamed from: getNextBlock */
    public TransferableBlock getNextBlock2() {
        TransferableBlock errorTransferableBlock;
        try {
            errorTransferableBlock = this._dataTableBlockBaseOperator.nextBlock();
            while (!errorTransferableBlock.isNoOpBlock()) {
                this._exchange.send(errorTransferableBlock);
                if (errorTransferableBlock.isEndOfStreamBlock()) {
                    return errorTransferableBlock;
                }
                errorTransferableBlock = this._dataTableBlockBaseOperator.nextBlock();
            }
        } catch (Exception e) {
            errorTransferableBlock = TransferableBlockUtils.getErrorTransferableBlock(e);
            try {
                this._exchange.send(errorTransferableBlock);
            } catch (Exception e2) {
                LOGGER.error("Exception while sending block to mailbox.", (Throwable) e2);
            }
        }
        return errorTransferableBlock;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static StringMailboxIdentifier toMailboxId(ServerInstance serverInstance, long j, int i, String str, int i2) {
        return new StringMailboxIdentifier(String.format("%s_%s", Long.valueOf(j), Integer.valueOf(i)), str, i2, serverInstance.getHostname(), serverInstance.getQueryMailboxPort());
    }
}
