package org.apache.pinot.query.runtime.operator.exchange;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import java.util.Iterator;
import java.util.List;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.runtime.blocks.BlockSplitter;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockTestUtils;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.class */
public class BlockExchangeTest {
    private AutoCloseable _mocks;

    @Mock
    private SendingMailbox _mailbox1;

    @Mock
    private SendingMailbox _mailbox2;

    /* loaded from: input_file:org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest$TestBlockExchange.class */
    private static class TestBlockExchange extends BlockExchange {
        protected TestBlockExchange(List<SendingMailbox> list) {
            this(list, (transferableBlock, type, i) -> {
                return Iterators.singletonIterator(transferableBlock);
            });
        }

        protected TestBlockExchange(List<SendingMailbox> list, BlockSplitter blockSplitter) {
            super(list, blockSplitter);
        }

        protected void route(List<SendingMailbox> list, TransferableBlock transferableBlock) throws Exception {
            Iterator<SendingMailbox> it = list.iterator();
            while (it.hasNext()) {
                sendBlock(it.next(), transferableBlock);
            }
        }
    }

    @BeforeMethod
    public void setUp() {
        this._mocks = MockitoAnnotations.openMocks(this);
    }

    @AfterMethod
    public void tearDown() throws Exception {
        this._mocks.close();
    }

    @Test
    public void shouldSendEosBlockToAllDestinations() throws Exception {
        new TestBlockExchange(ImmutableList.of(this._mailbox1, this._mailbox2)).send(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0));
        ArgumentCaptor forClass = ArgumentCaptor.forClass(TransferableBlock.class);
        ((SendingMailbox) Mockito.verify(this._mailbox1)).complete();
        ((SendingMailbox) Mockito.verify(this._mailbox1, Mockito.times(1))).send((TransferableBlock) forClass.capture());
        Assert.assertTrue(((TransferableBlock) forClass.getValue()).isEndOfStreamBlock());
        ((SendingMailbox) Mockito.verify(this._mailbox2)).complete();
        ((SendingMailbox) Mockito.verify(this._mailbox2, Mockito.times(1))).send((TransferableBlock) forClass.capture());
        Assert.assertTrue(((TransferableBlock) forClass.getValue()).isEndOfStreamBlock());
    }

    @Test
    public void shouldSendDataBlocksOnlyToTargetDestination() throws Exception {
        TestBlockExchange testBlockExchange = new TestBlockExchange(ImmutableList.of(this._mailbox1));
        TransferableBlock transferableBlock = new TransferableBlock(ImmutableList.of(new Object[]{"val"}), new DataSchema(new String[]{"foo"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING}), DataBlock.Type.ROW);
        testBlockExchange.send(transferableBlock);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(TransferableBlock.class);
        ((SendingMailbox) Mockito.verify(this._mailbox1, Mockito.times(1))).send((TransferableBlock) forClass.capture());
        Assert.assertEquals(((TransferableBlock) forClass.getValue()).getContainer(), transferableBlock.getContainer());
        ((SendingMailbox) Mockito.verify(this._mailbox2, Mockito.never())).send((TransferableBlock) Mockito.any());
    }

    @Test
    public void shouldSignalEarlyTerminationProperly() throws Exception {
        TestBlockExchange testBlockExchange = new TestBlockExchange(ImmutableList.of(this._mailbox1, this._mailbox2));
        TransferableBlock transferableBlock = new TransferableBlock(ImmutableList.of(new Object[]{"val"}), new DataSchema(new String[]{"foo"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING}), DataBlock.Type.ROW);
        Mockito.when(Boolean.valueOf(this._mailbox1.isEarlyTerminated())).thenReturn(true);
        Assert.assertFalse(testBlockExchange.send(transferableBlock));
        Mockito.when(Boolean.valueOf(this._mailbox2.isTerminated())).thenReturn(true);
        Assert.assertFalse(testBlockExchange.send(transferableBlock));
        Mockito.when(Boolean.valueOf(this._mailbox2.isEarlyTerminated())).thenReturn(true);
        Assert.assertTrue(testBlockExchange.send(transferableBlock));
    }

    @Test
    public void shouldSplitBlocks() throws Exception {
        ImmutableList of = ImmutableList.of(this._mailbox1);
        DataSchema dataSchema = new DataSchema(new String[]{"foo"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING});
        TransferableBlock transferableBlock = new TransferableBlock(ImmutableList.of(new Object[]{"one"}, new Object[]{"two"}), dataSchema, DataBlock.Type.ROW);
        TransferableBlock transferableBlock2 = new TransferableBlock(ImmutableList.of(new Object[]{"one"}), dataSchema, DataBlock.Type.ROW);
        TransferableBlock transferableBlock3 = new TransferableBlock(ImmutableList.of(new Object[]{"two"}), dataSchema, DataBlock.Type.ROW);
        new TestBlockExchange(of, (transferableBlock4, type, i) -> {
            return ImmutableList.of(transferableBlock2, transferableBlock3).iterator();
        }).send(transferableBlock);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(TransferableBlock.class);
        ((SendingMailbox) Mockito.verify(this._mailbox1, Mockito.times(2))).send((TransferableBlock) forClass.capture());
        List allValues = forClass.getAllValues();
        Assert.assertEquals(allValues.size(), 2, "expected to send two blocks");
        Assert.assertEquals(((TransferableBlock) allValues.get(0)).getContainer(), transferableBlock2.getContainer());
        Assert.assertEquals(((TransferableBlock) allValues.get(1)).getContainer(), transferableBlock3.getContainer());
    }
}
