package org.apache.pinot.query.runtime.operator.exchange;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import java.util.Iterator;
import java.util.List;
import java.util.function.BiFunction;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.mailbox.MailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
import org.apache.pinot.query.runtime.blocks.BlockSplitter;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.class */
public class BlockExchangeTest {
    private static final MailboxIdentifier MAILBOX_1 = new StringMailboxIdentifier("1:host:1:host:1");
    private static final MailboxIdentifier MAILBOX_2 = new StringMailboxIdentifier("1:host:1:host:2");
    private AutoCloseable _mocks;

    @Mock
    private MailboxService<TransferableBlock> _mailboxService;

    @Mock
    private SendingMailbox<TransferableBlock> _mailbox1;

    @Mock
    private SendingMailbox<TransferableBlock> _mailbox2;

    /* loaded from: input_file:org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest$TestBlockExchange.class */
    private static class TestBlockExchange extends BlockExchange {
        private final BiFunction<List<MailboxIdentifier>, TransferableBlock, Iterator<BlockExchange.RoutedBlock>> _router;

        protected TestBlockExchange(MailboxService<TransferableBlock> mailboxService, List<MailboxIdentifier> list, BiFunction<List<MailboxIdentifier>, TransferableBlock, Iterator<BlockExchange.RoutedBlock>> biFunction) {
            this(mailboxService, list, biFunction, (transferableBlock, type, i) -> {
                return Iterators.singletonIterator(transferableBlock);
            });
        }

        protected TestBlockExchange(MailboxService<TransferableBlock> mailboxService, List<MailboxIdentifier> list, BiFunction<List<MailboxIdentifier>, TransferableBlock, Iterator<BlockExchange.RoutedBlock>> biFunction, BlockSplitter blockSplitter) {
            super(mailboxService, list, blockSplitter);
            this._router = biFunction;
        }

        protected Iterator<BlockExchange.RoutedBlock> route(List<MailboxIdentifier> list, TransferableBlock transferableBlock) {
            return this._router.apply(list, transferableBlock);
        }
    }

    @BeforeMethod
    public void setUp() {
        this._mocks = MockitoAnnotations.openMocks(this);
        Mockito.when(this._mailboxService.getSendingMailbox(MAILBOX_1)).thenReturn(this._mailbox1);
        Mockito.when(this._mailboxService.getSendingMailbox(MAILBOX_2)).thenReturn(this._mailbox2);
    }

    @AfterMethod
    public void tearDown() throws Exception {
        this._mocks.close();
    }

    @Test
    public void shouldSendEosBlockToAllDestinations() {
        new TestBlockExchange(this._mailboxService, ImmutableList.of(MAILBOX_1, MAILBOX_2), (list, transferableBlock) -> {
            return Iterators.singletonIterator(new BlockExchange.RoutedBlock(MAILBOX_1, transferableBlock));
        }).send(TransferableBlockUtils.getEndOfStreamTransferableBlock());
        ArgumentCaptor forClass = ArgumentCaptor.forClass(TransferableBlock.class);
        ((SendingMailbox) Mockito.verify(this._mailbox1)).complete();
        ((SendingMailbox) Mockito.verify(this._mailbox1, Mockito.times(1))).send((TransferableBlock) forClass.capture());
        Assert.assertTrue(((TransferableBlock) forClass.getValue()).isEndOfStreamBlock());
        ((SendingMailbox) Mockito.verify(this._mailbox2)).complete();
        ((SendingMailbox) Mockito.verify(this._mailbox2, Mockito.times(1))).send((TransferableBlock) forClass.capture());
        Assert.assertTrue(((TransferableBlock) forClass.getValue()).isEndOfStreamBlock());
    }

    @Test
    public void shouldSendDataBlocksOnlyToTargetDestination() {
        TestBlockExchange testBlockExchange = new TestBlockExchange(this._mailboxService, ImmutableList.of(MAILBOX_1, MAILBOX_2), (list, transferableBlock) -> {
            return Iterators.singletonIterator(new BlockExchange.RoutedBlock(MAILBOX_1, transferableBlock));
        });
        TransferableBlock transferableBlock2 = new TransferableBlock(ImmutableList.of(new Object[]{"val"}), new DataSchema(new String[]{"foo"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING}), DataBlock.Type.ROW);
        testBlockExchange.send(transferableBlock2);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(TransferableBlock.class);
        ((SendingMailbox) Mockito.verify(this._mailbox1, Mockito.times(1))).send((TransferableBlock) forClass.capture());
        Assert.assertEquals(((TransferableBlock) forClass.getValue()).getContainer(), transferableBlock2.getContainer());
        ((SendingMailbox) Mockito.verify(this._mailbox2, Mockito.never())).send((TransferableBlock) Mockito.any());
    }

    @Test
    public void shouldSplitBlocks() {
        ImmutableList of = ImmutableList.of(MAILBOX_1, MAILBOX_2);
        DataSchema dataSchema = new DataSchema(new String[]{"foo"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING});
        TransferableBlock transferableBlock = new TransferableBlock(ImmutableList.of(new Object[]{"one"}, new Object[]{"two"}), dataSchema, DataBlock.Type.ROW);
        TransferableBlock transferableBlock2 = new TransferableBlock(ImmutableList.of(new Object[]{"one"}), dataSchema, DataBlock.Type.ROW);
        TransferableBlock transferableBlock3 = new TransferableBlock(ImmutableList.of(new Object[]{"two"}), dataSchema, DataBlock.Type.ROW);
        new TestBlockExchange(this._mailboxService, of, (list, transferableBlock4) -> {
            return Iterators.singletonIterator(new BlockExchange.RoutedBlock(MAILBOX_1, transferableBlock4));
        }, (transferableBlock5, type, i) -> {
            return ImmutableList.of(transferableBlock2, transferableBlock3).iterator();
        }).send(transferableBlock);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(TransferableBlock.class);
        ((SendingMailbox) Mockito.verify(this._mailbox1, Mockito.times(2))).send((TransferableBlock) forClass.capture());
        List allValues = forClass.getAllValues();
        Assert.assertEquals(allValues.size(), 2, "expected to send two blocks");
        Assert.assertEquals(((TransferableBlock) allValues.get(0)).getContainer(), transferableBlock2.getContainer());
        Assert.assertEquals(((TransferableBlock) allValues.get(1)).getContainer(), transferableBlock3.getContainer());
    }
}
