package org.apache.pinot.query.runtime.operator;

import com.google.common.collect.ImmutableList;
import java.util.Arrays;
import java.util.List;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.runtime.blocks.BlockSplitter;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.class */
public class MailboxSendOperatorTest {
    private AutoCloseable _mocks;

    @Mock
    private Operator<TransferableBlock> _input;

    @Mock
    private MailboxService<TransferableBlock> _mailboxService;

    @Mock
    private ServerInstance _server;

    @Mock
    private KeySelector<Object[], Object[]> _selector;

    @Mock
    private MailboxSendOperator.BlockExchangeFactory _exchangeFactory;

    @Mock
    private BlockExchange _exchange;

    @BeforeMethod
    public void setUp() {
        this._mocks = MockitoAnnotations.openMocks(this);
        Mockito.when(this._exchangeFactory.build((MailboxService) Mockito.any(), (List) Mockito.any(), (RelDistribution.Type) Mockito.any(), (KeySelector) Mockito.any(), (BlockSplitter) Mockito.any())).thenReturn(this._exchange);
    }

    @AfterMethod
    public void tearDown() throws Exception {
        this._mocks.close();
    }

    @Test
    public void shouldSwallowNoOpBlockFromUpstream() {
        MailboxSendOperator mailboxSendOperator = new MailboxSendOperator(this._mailboxService, this._input, ImmutableList.of(this._server), RelDistribution.Type.HASH_DISTRIBUTED, this._selector, serverInstance -> {
            return new StringMailboxIdentifier("123:from:1:to:2");
        }, this._exchangeFactory);
        Mockito.when(this._input.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
        Assert.assertTrue(mailboxSendOperator.nextBlock().isNoOpBlock(), "expected noop block to propagate");
        ((BlockExchange) Mockito.verify(this._exchange, Mockito.never())).send((TransferableBlock) Mockito.any());
    }

    @Test
    public void shouldSendErrorBlock() {
        MailboxSendOperator mailboxSendOperator = new MailboxSendOperator(this._mailboxService, this._input, ImmutableList.of(this._server), RelDistribution.Type.HASH_DISTRIBUTED, this._selector, serverInstance -> {
            return new StringMailboxIdentifier("123:from:1:to:2");
        }, this._exchangeFactory);
        TransferableBlock errorTransferableBlock = TransferableBlockUtils.getErrorTransferableBlock(new Exception("foo!"));
        Mockito.when(this._input.nextBlock()).thenReturn(errorTransferableBlock);
        Assert.assertTrue(mailboxSendOperator.nextBlock().isErrorBlock(), "expected error block to propagate");
        ((BlockExchange) Mockito.verify(this._exchange)).send(errorTransferableBlock);
    }

    @Test
    public void shouldSendErrorBlockWhenInputThrows() {
        MailboxSendOperator mailboxSendOperator = new MailboxSendOperator(this._mailboxService, this._input, ImmutableList.of(this._server), RelDistribution.Type.HASH_DISTRIBUTED, this._selector, serverInstance -> {
            return new StringMailboxIdentifier("123:from:1:to:2");
        }, this._exchangeFactory);
        Mockito.when(this._input.nextBlock()).thenThrow(new Throwable[]{new RuntimeException("foo!")});
        ArgumentCaptor forClass = ArgumentCaptor.forClass(TransferableBlock.class);
        Assert.assertTrue(mailboxSendOperator.nextBlock().isErrorBlock(), "expected error block when input throws error");
        ((BlockExchange) Mockito.verify(this._exchange)).send((TransferableBlock) forClass.capture());
        Assert.assertTrue(((TransferableBlock) forClass.getValue()).isErrorBlock(), "expected to send error block to exchange");
    }

    @Test
    public void shouldSendEosBlock() {
        MailboxSendOperator mailboxSendOperator = new MailboxSendOperator(this._mailboxService, this._input, ImmutableList.of(this._server), RelDistribution.Type.HASH_DISTRIBUTED, this._selector, serverInstance -> {
            return new StringMailboxIdentifier("123:from:1:to:2");
        }, this._exchangeFactory);
        TransferableBlock endOfStreamTransferableBlock = TransferableBlockUtils.getEndOfStreamTransferableBlock();
        Mockito.when(this._input.nextBlock()).thenReturn(endOfStreamTransferableBlock);
        Assert.assertTrue(mailboxSendOperator.nextBlock().isEndOfStreamBlock(), "expected EOS block to propagate");
        ((BlockExchange) Mockito.verify(this._exchange)).send(endOfStreamTransferableBlock);
    }

    /* JADX WARN: Type inference failed for: r1v3, types: [java.lang.Object[], java.lang.Object[][]] */
    @Test
    public void shouldSendDataBlock() {
        MailboxSendOperator mailboxSendOperator = new MailboxSendOperator(this._mailboxService, this._input, ImmutableList.of(this._server), RelDistribution.Type.HASH_DISTRIBUTED, this._selector, serverInstance -> {
            return new StringMailboxIdentifier("123:from:1:to:2");
        }, this._exchangeFactory);
        Mockito.when(this._input.nextBlock()).thenReturn(block(new DataSchema(new String[0], new DataSchema.ColumnDataType[0]), new Object[0])).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
        mailboxSendOperator.nextBlock();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(TransferableBlock.class);
        ((BlockExchange) Mockito.verify(this._exchange)).send((TransferableBlock) forClass.capture());
        Assert.assertSame(((TransferableBlock) forClass.getValue()).getType(), DataBlock.Type.ROW, "expected data block to propagate");
    }

    private static TransferableBlock block(DataSchema dataSchema, Object[]... objArr) {
        return new TransferableBlock(Arrays.asList(objArr), dataSchema, DataBlock.Type.ROW);
    }
}
