package org.apache.pinot.query.runtime.operator;

import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.routing.StageMetadata;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.class */
public class MailboxSendOperatorTest {
    private static final int SENDER_STAGE_ID = 1;
    private AutoCloseable _mocks;

    @Mock
    private MailboxService _mailboxService;

    @Mock
    private MultiStageOperator _input;

    @Mock
    private BlockExchange _exchange;

    @BeforeMethod
    public void setUpMethod() {
        this._mocks = MockitoAnnotations.openMocks(this);
        Mockito.when(this._mailboxService.getHostname()).thenReturn("localhost");
        Mockito.when(Integer.valueOf(this._mailboxService.getPort())).thenReturn(1234);
    }

    @AfterMethod
    public void tearDownMethod() throws Exception {
        this._mocks.close();
    }

    @Test
    public void shouldSendErrorBlock() throws Exception {
        TransferableBlock errorTransferableBlock = TransferableBlockUtils.getErrorTransferableBlock(new Exception("TEST ERROR"));
        Mockito.when(this._input.nextBlock()).thenReturn(errorTransferableBlock);
        Assert.assertSame(getOperator().nextBlock(), errorTransferableBlock, "expected error block to propagate");
        ((BlockExchange) Mockito.verify(this._exchange)).send((TransferableBlock) ArgumentMatchers.eq(errorTransferableBlock));
    }

    @Test
    public void shouldSendErrorBlockWhenInputThrows() throws Exception {
        Mockito.when(this._input.nextBlock()).thenThrow(new Throwable[]{new RuntimeException("TEST ERROR")});
        Assert.assertTrue(getOperator().nextBlock().isErrorBlock(), "expected error block to propagate");
        ArgumentCaptor forClass = ArgumentCaptor.forClass(TransferableBlock.class);
        ((BlockExchange) Mockito.verify(this._exchange)).send((TransferableBlock) forClass.capture());
        Assert.assertTrue(((TransferableBlock) forClass.getValue()).isErrorBlock(), "expected to send error block to exchange");
    }

    @Test
    public void shouldNotSendErrorBlockWhenTimedOut() throws Exception {
        TransferableBlock dummyDataBlock = getDummyDataBlock();
        Mockito.when(this._input.nextBlock()).thenReturn(dummyDataBlock);
        ((BlockExchange) Mockito.doThrow(new Throwable[]{new TimeoutException()}).when(this._exchange)).send((TransferableBlock) ArgumentMatchers.any());
        Assert.assertTrue(getOperator().nextBlock().isErrorBlock(), "expected error block to propagate");
        ArgumentCaptor forClass = ArgumentCaptor.forClass(TransferableBlock.class);
        ((BlockExchange) Mockito.verify(this._exchange)).send((TransferableBlock) forClass.capture());
        Assert.assertSame(forClass.getValue(), dummyDataBlock, "expected to send data block to exchange");
    }

    @Test
    public void shouldSendEosBlock() throws Exception {
        TransferableBlock endOfStreamTransferableBlock = TransferableBlockUtils.getEndOfStreamTransferableBlock(MultiStageQueryStats.emptyStats(SENDER_STAGE_ID));
        Mockito.when(this._input.nextBlock()).thenReturn(endOfStreamTransferableBlock);
        Assert.assertSame(getOperator().nextBlock(), endOfStreamTransferableBlock, "expected EOS block to propagate");
        ArgumentCaptor forClass = ArgumentCaptor.forClass(TransferableBlock.class);
        ((BlockExchange) Mockito.verify(this._exchange)).send((TransferableBlock) forClass.capture());
        Assert.assertTrue(((TransferableBlock) forClass.getValue()).isSuccessfulEndOfStreamBlock(), "expected to send EOS block to exchange");
    }

    @Test
    public void shouldSendDataBlock() throws Exception {
        TransferableBlock dummyDataBlock = getDummyDataBlock();
        TransferableBlock dummyDataBlock2 = getDummyDataBlock();
        TransferableBlock endOfStreamTransferableBlock = TransferableBlockUtils.getEndOfStreamTransferableBlock(MultiStageQueryStats.emptyStats(SENDER_STAGE_ID));
        Mockito.when(this._input.nextBlock()).thenReturn(dummyDataBlock, new TransferableBlock[]{dummyDataBlock2, endOfStreamTransferableBlock});
        MailboxSendOperator operator = getOperator();
        Assert.assertSame(operator.nextBlock(), dummyDataBlock, "expected first data block to propagate");
        Assert.assertSame(operator.nextBlock(), dummyDataBlock2, "expected second data block to propagate");
        Assert.assertSame(operator.nextBlock(), endOfStreamTransferableBlock, "expected EOS block to propagate");
        ArgumentCaptor forClass = ArgumentCaptor.forClass(TransferableBlock.class);
        ((BlockExchange) Mockito.verify(this._exchange, Mockito.times(3))).send((TransferableBlock) forClass.capture());
        List allValues = forClass.getAllValues();
        Assert.assertSame(allValues.get(0), dummyDataBlock, "expected to send first data block to exchange on first call");
        Assert.assertSame(allValues.get(SENDER_STAGE_ID), dummyDataBlock2, "expected to send second data block to exchange on second call");
        Assert.assertTrue(((TransferableBlock) allValues.get(2)).isSuccessfulEndOfStreamBlock(), "expected to send EOS block to exchange on third call");
        MultiStageQueryStats.StageStats.Open currentStats = ((TransferableBlock) allValues.get(2)).getQueryStats().getCurrentStats();
        Assert.assertNotNull(currentStats, "expected to have stats for sender stage");
        Assert.assertNotNull(currentStats.getOperatorStats(0));
    }

    @Test
    public void shouldEarlyTerminateWhenUpstreamWhenIndicated() throws Exception {
        Mockito.when(this._input.nextBlock()).thenReturn(getDummyDataBlock());
        ((BlockExchange) Mockito.doReturn(true).when(this._exchange)).send((TransferableBlock) ArgumentMatchers.any());
        getOperator().nextBlock();
        ((MultiStageOperator) Mockito.verify(this._input)).earlyTerminate();
    }

    private MailboxSendOperator getOperator() {
        WorkerMetadata workerMetadata = new WorkerMetadata(0, Map.of(), Map.of());
        return new MailboxSendOperator(new OpChainExecutionContext(this._mailboxService, 123L, Long.MAX_VALUE, Map.of(), new StageMetadata(SENDER_STAGE_ID, List.of(workerMetadata), Map.of()), workerMetadata, (PipelineBreakerResult) null, (ThreadExecutionContext) null, true), this._input, statMap -> {
            return this._exchange;
        });
    }

    /* JADX WARN: Type inference failed for: r1v2, types: [java.lang.Object[], java.lang.Object[][]] */
    private static TransferableBlock getDummyDataBlock() {
        return OperatorTestUtil.block(new DataSchema(new String[]{"intCol"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}), new Object[]{new Object[]{Integer.valueOf(SENDER_STAGE_ID)}});
    }
}
