package org.apache.pinot.query.runtime.operator;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.common.datablock.BaseDataBlock;
import org.apache.pinot.core.common.datablock.DataBlockBuilder;
import org.apache.pinot.core.common.datablock.MetadataBlock;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
import org.apache.pinot.query.mailbox.channel.ChannelUtils;
import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/query/runtime/operator/MailboxSendOperator.class */
public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
    private static final String EXPLAIN_NAME = "MAILBOX_SEND";
    private final List<ServerInstance> _receivingStageInstances;
    private final RelDistribution.Type _exchangeType;
    private final KeySelector<Object[], Object[]> _keySelector;
    private final String _serverHostName;
    private final int _serverPort;
    private final long _jobId;
    private final int _stageId;
    private final MailboxService<Mailbox.MailboxContent> _mailboxService;
    private final DataSchema _dataSchema;
    private BaseOperator<TransferableBlock> _dataTableBlockBaseOperator;
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) MailboxSendOperator.class);
    private static final Set<RelDistribution.Type> SUPPORTED_EXCHANGE_TYPE = ImmutableSet.of(RelDistribution.Type.SINGLETON, RelDistribution.Type.RANDOM_DISTRIBUTED, RelDistribution.Type.BROADCAST_DISTRIBUTED, RelDistribution.Type.HASH_DISTRIBUTED);
    private static final Random RANDOM = new Random();

    public MailboxSendOperator(MailboxService<Mailbox.MailboxContent> mailboxService, DataSchema dataSchema, BaseOperator<TransferableBlock> baseOperator, List<ServerInstance> list, RelDistribution.Type type, KeySelector<Object[], Object[]> keySelector, String str, int i, long j, int i2) {
        this._dataSchema = dataSchema;
        this._mailboxService = mailboxService;
        this._dataTableBlockBaseOperator = baseOperator;
        this._exchangeType = type;
        if (this._exchangeType == RelDistribution.Type.SINGLETON) {
            ServerInstance serverInstance = null;
            for (ServerInstance serverInstance2 : list) {
                if (serverInstance2.getHostname().equals(this._mailboxService.getHostname()) && serverInstance2.getQueryMailboxPort() == this._mailboxService.getMailboxPort()) {
                    Preconditions.checkState(serverInstance == null, "multiple instance found for singleton exchange type!");
                    serverInstance = serverInstance2;
                }
            }
            this._receivingStageInstances = Collections.singletonList(serverInstance);
        } else {
            this._receivingStageInstances = list;
        }
        this._keySelector = keySelector;
        this._serverHostName = str;
        this._serverPort = i;
        this._jobId = j;
        this._stageId = i2;
        Preconditions.checkState(SUPPORTED_EXCHANGE_TYPE.contains(this._exchangeType), String.format("Exchange type '%s' is not supported yet", this._exchangeType));
    }

    @Override // org.apache.pinot.core.common.Operator
    public List<Operator> getChildOperators() {
        return null;
    }

    @Override // org.apache.pinot.core.common.Operator
    @Nullable
    public String toExplainString() {
        return EXPLAIN_NAME;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.pinot.core.operator.BaseOperator
    /* renamed from: getNextBlock */
    public TransferableBlock getNextBlock2() {
        TransferableBlock nextBlock = this._dataTableBlockBaseOperator.nextBlock();
        BaseDataBlock dataBlock = nextBlock.getDataBlock();
        boolean isEndOfStream = TransferableBlockUtils.isEndOfStream(nextBlock);
        try {
            switch (this._exchangeType) {
                case SINGLETON:
                    sendDataTableBlock(this._receivingStageInstances.get(0), dataBlock);
                    break;
                case RANDOM_DISTRIBUTED:
                    if (isEndOfStream) {
                        Iterator<ServerInstance> it2 = this._receivingStageInstances.iterator();
                        while (it2.hasNext()) {
                            sendDataTableBlock(it2.next(), dataBlock);
                        }
                        break;
                    } else {
                        sendDataTableBlock(this._receivingStageInstances.get(this._exchangeType == RelDistribution.Type.SINGLETON ? 0 : RANDOM.nextInt(this._receivingStageInstances.size())), dataBlock);
                        break;
                    }
                case BROADCAST_DISTRIBUTED:
                    Iterator<ServerInstance> it3 = this._receivingStageInstances.iterator();
                    while (it3.hasNext()) {
                        sendDataTableBlock(it3.next(), dataBlock);
                    }
                    break;
                case HASH_DISTRIBUTED:
                    List<BaseDataBlock> constructPartitionedDataBlock = constructPartitionedDataBlock(dataBlock, this._keySelector, this._receivingStageInstances.size(), isEndOfStream);
                    for (int i = 0; i < this._receivingStageInstances.size(); i++) {
                        sendDataTableBlock(this._receivingStageInstances.get(i), constructPartitionedDataBlock.get(i));
                    }
                    break;
                case RANGE_DISTRIBUTED:
                case ROUND_ROBIN_DISTRIBUTED:
                case ANY:
                default:
                    throw new UnsupportedOperationException("Unsupported mailbox exchange type: " + this._exchangeType);
            }
        } catch (Exception e) {
            LOGGER.error("Exception occur while sending data via mailbox", (Throwable) e);
        }
        return nextBlock;
    }

    private static List<BaseDataBlock> constructPartitionedDataBlock(BaseDataBlock baseDataBlock, KeySelector<Object[], Object[]> keySelector, int i, boolean z) throws Exception {
        if (z) {
            ArrayList arrayList = new ArrayList(i);
            for (int i2 = 0; i2 < i; i2++) {
                arrayList.add(baseDataBlock);
            }
            return arrayList;
        }
        ArrayList arrayList2 = new ArrayList(i);
        for (int i3 = 0; i3 < i; i3++) {
            arrayList2.add(new ArrayList());
        }
        for (int i4 = 0; i4 < baseDataBlock.getNumberOfRows(); i4++) {
            Object[] extractRowFromDataTable = SelectionOperatorUtils.extractRowFromDataTable(baseDataBlock, i4);
            ((List) arrayList2.get(keySelector.computeHash(extractRowFromDataTable) % i)).add(extractRowFromDataTable);
        }
        ArrayList arrayList3 = new ArrayList(i);
        for (int i5 = 0; i5 < i; i5++) {
            arrayList3.add(DataBlockBuilder.buildFromRows((List) arrayList2.get(i5), null, baseDataBlock.getDataSchema()));
        }
        return arrayList3;
    }

    private void sendDataTableBlock(ServerInstance serverInstance, BaseDataBlock baseDataBlock) throws IOException {
        String mailboxId = toMailboxId(serverInstance);
        SendingMailbox<Mailbox.MailboxContent> sendingMailbox = this._mailboxService.getSendingMailbox(mailboxId);
        Mailbox.MailboxContent mailboxContent = toMailboxContent(mailboxId, baseDataBlock);
        sendingMailbox.send(mailboxContent);
        if (mailboxContent.getMetadataMap().containsKey(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY)) {
            sendingMailbox.complete();
        }
    }

    private Mailbox.MailboxContent toMailboxContent(String str, BaseDataBlock baseDataBlock) throws IOException {
        Mailbox.MailboxContent.Builder payload = Mailbox.MailboxContent.newBuilder().setMailboxId(str).setPayload(ByteString.copyFrom(new TransferableBlock(baseDataBlock).toBytes()));
        if (baseDataBlock instanceof MetadataBlock) {
            payload.putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, "true");
        }
        return payload.build();
    }

    private String toMailboxId(ServerInstance serverInstance) {
        return new StringMailboxIdentifier(String.format("%s_%s", Long.valueOf(this._jobId), Integer.valueOf(this._stageId)), this._serverHostName, this._serverPort, serverInstance.getHostname(), serverInstance.getQueryMailboxPort()).toString();
    }
}
