package org.apache.pinot.query.runtime.operator.exchange;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.planner.partitioning.KeySelectorFactory;
import org.apache.pinot.query.runtime.blocks.BlockSplitter;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.shaded.com.google.common.base.Preconditions;

/* loaded from: input_file:org/apache/pinot/query/runtime/operator/exchange/BlockExchange.class */
public abstract class BlockExchange {
    private static final int MAX_MAILBOX_CONTENT_SIZE_BYTES = 4194304;
    private final List<SendingMailbox> _sendingMailboxes;
    private final BlockSplitter _splitter;
    static final /* synthetic */ boolean $assertionsDisabled;

    public static BlockExchange getExchange(List<SendingMailbox> list, RelDistribution.Type type, @Nullable List<Integer> list2, BlockSplitter blockSplitter) {
        switch (type) {
            case SINGLETON:
                return new SingletonExchange(list, blockSplitter);
            case HASH_DISTRIBUTED:
                Preconditions.checkArgument(list2 != null, "Distribution keys must be provided for hash distribution");
                return new HashExchange(list, KeySelectorFactory.getKeySelector(list2), blockSplitter);
            case RANDOM_DISTRIBUTED:
                return new RandomExchange(list, blockSplitter);
            case BROADCAST_DISTRIBUTED:
                return new BroadcastExchange(list, blockSplitter);
            case ROUND_ROBIN_DISTRIBUTED:
            case RANGE_DISTRIBUTED:
            case ANY:
            default:
                throw new UnsupportedOperationException("Unsupported distribution type: " + type);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public BlockExchange(List<SendingMailbox> list, BlockSplitter blockSplitter) {
        this._sendingMailboxes = list;
        this._splitter = blockSplitter;
    }

    public boolean send(TransferableBlock transferableBlock) throws Exception {
        if (transferableBlock.isErrorBlock()) {
            Iterator<SendingMailbox> it2 = this._sendingMailboxes.iterator();
            while (it2.hasNext()) {
                sendBlock(it2.next(), transferableBlock);
            }
            return false;
        }
        if (transferableBlock.isSuccessfulEndOfStreamBlock()) {
            int size = this._sendingMailboxes.size();
            int nextInt = ThreadLocalRandom.current().nextInt(size);
            int i = 0;
            while (i < size) {
                sendBlock(this._sendingMailboxes.get(i), i == nextInt ? transferableBlock : TransferableBlockUtils.getEndOfStreamTransferableBlock());
                i++;
            }
            return false;
        }
        if (!$assertionsDisabled && !transferableBlock.isDataBlock()) {
            throw new AssertionError();
        }
        boolean z = true;
        Iterator<SendingMailbox> it3 = this._sendingMailboxes.iterator();
        while (true) {
            if (!it3.hasNext()) {
                break;
            }
            if (!it3.next().isEarlyTerminated()) {
                z = false;
                break;
            }
        }
        if (!z) {
            route(this._sendingMailboxes, transferableBlock);
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendBlock(SendingMailbox sendingMailbox, TransferableBlock transferableBlock) throws Exception {
        if (transferableBlock.isEndOfStreamBlock()) {
            sendingMailbox.send(transferableBlock);
            sendingMailbox.complete();
        } else {
            Iterator<TransferableBlock> split = this._splitter.split(transferableBlock, transferableBlock.getType(), 4194304);
            while (split.hasNext()) {
                sendingMailbox.send(split.next());
            }
        }
    }

    protected abstract void route(List<SendingMailbox> list, TransferableBlock transferableBlock) throws Exception;

    public void close() {
    }

    public void cancel(Throwable th) {
        Iterator<SendingMailbox> it2 = this._sendingMailboxes.iterator();
        while (it2.hasNext()) {
            it2.next().cancel(th);
        }
    }

    static {
        $assertionsDisabled = !BlockExchange.class.desiredAssertionStatus();
    }
}
