package org.apache.pinot.query.runtime.operator;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.common.datatable.StatMap;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.ReceivingMailbox;
import org.apache.pinot.query.planner.physical.MailboxIdUtils;
import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
import org.apache.pinot.query.routing.MailboxInfo;
import org.apache.pinot.query.routing.SharedMailboxInfos;
import org.apache.pinot.query.routing.StageMetadata;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockTestUtils;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.class */
public class MailboxReceiveOperatorTest {
    private static final DataSchema DATA_SCHEMA = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT});
    private static final String MAILBOX_ID_1 = MailboxIdUtils.toMailboxId(0, 1, 0, 0, 0);
    private static final String MAILBOX_ID_2 = MailboxIdUtils.toMailboxId(0, 1, 1, 0, 0);
    private StageMetadata _stageMetadataBoth;
    private StageMetadata _stageMetadata1;
    private AutoCloseable _mocks;

    @Mock
    private MailboxService _mailboxService;

    @Mock
    private ReceivingMailbox _mailbox1;

    @Mock
    private ReceivingMailbox _mailbox2;

    @BeforeClass
    public void setUp() {
        SharedMailboxInfos sharedMailboxInfos = new SharedMailboxInfos(new MailboxInfo("localhost", 1234, List.of(0, 1)));
        this._stageMetadataBoth = new StageMetadata(0, (List) Stream.of((Object[]) new Integer[]{0, 1}).map(num -> {
            return new WorkerMetadata(num.intValue(), Map.of(1, sharedMailboxInfos), Map.of());
        }).collect(Collectors.toList()), Map.of());
        this._stageMetadata1 = new StageMetadata(0, List.of(new WorkerMetadata(0, Map.of(1, new SharedMailboxInfos(new MailboxInfo("localhost", 1234, List.of(0)))), Map.of())), Map.of());
    }

    @BeforeMethod
    public void setUpMethod() {
        this._mocks = MockitoAnnotations.openMocks(this);
        Mockito.when(this._mailboxService.getHostname()).thenReturn("localhost");
        Mockito.when(Integer.valueOf(this._mailboxService.getPort())).thenReturn(1234);
        Mockito.when(this._mailbox1.getStatMap()).thenReturn(new StatMap(ReceivingMailbox.StatKey.class));
        Mockito.when(this._mailbox2.getStatMap()).thenReturn(new StatMap(ReceivingMailbox.StatKey.class));
    }

    @AfterMethod
    public void tearDownMethod() throws Exception {
        this._mocks.close();
    }

    @Test(expectedExceptions = {IllegalStateException.class}, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
    public void shouldThrowRangeDistributionNotSupported() {
        getOperator(this._stageMetadata1, RelDistribution.Type.RANGE_DISTRIBUTED);
    }

    @Test
    public void shouldTimeout() {
        Mockito.when(this._mailboxService.getReceivingMailbox((String) Mockito.eq(MAILBOX_ID_1))).thenReturn(this._mailbox1);
        MailboxReceiveOperator operator = getOperator(this._stageMetadata1, RelDistribution.Type.SINGLETON, System.currentTimeMillis() + 100);
        try {
            TransferableBlock nextBlock = operator.nextBlock();
            Assert.assertTrue(nextBlock.isErrorBlock());
            Assert.assertTrue(nextBlock.getExceptions().containsKey(250));
            if (operator != null) {
                operator.close();
            }
        } catch (Throwable th) {
            if (operator != null) {
                try {
                    operator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void shouldReceiveEosDirectlyFromSender() {
        Mockito.when(this._mailboxService.getReceivingMailbox((String) Mockito.eq(MAILBOX_ID_1))).thenReturn(this._mailbox1);
        Mockito.when(this._mailbox1.poll()).thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0));
        MailboxReceiveOperator operator = getOperator(this._stageMetadata1, RelDistribution.Type.SINGLETON);
        try {
            Assert.assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock());
            if (operator != null) {
                operator.close();
            }
        } catch (Throwable th) {
            if (operator != null) {
                try {
                    operator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    /* JADX WARN: Type inference failed for: r2v3, types: [java.lang.Object[], java.lang.Object[][]] */
    @Test
    public void shouldReceiveSingletonMailbox() {
        Mockito.when(this._mailboxService.getReceivingMailbox((String) Mockito.eq(MAILBOX_ID_1))).thenReturn(this._mailbox1);
        Object[] objArr = {1, 1};
        Mockito.when(this._mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{objArr}), new TransferableBlock[]{TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0)});
        MailboxReceiveOperator operator = getOperator(this._stageMetadata1, RelDistribution.Type.SINGLETON);
        try {
            List container = operator.nextBlock().getContainer();
            Assert.assertEquals(container.size(), 1);
            Assert.assertEquals((Object[]) container.get(0), objArr);
            Assert.assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock());
            if (operator != null) {
                operator.close();
            }
        } catch (Throwable th) {
            if (operator != null) {
                try {
                    operator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void shouldReceiveSingletonErrorMailbox() {
        Mockito.when(this._mailboxService.getReceivingMailbox((String) Mockito.eq(MAILBOX_ID_1))).thenReturn(this._mailbox1);
        Mockito.when(this._mailbox1.poll()).thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException("TEST ERROR")));
        MailboxReceiveOperator operator = getOperator(this._stageMetadata1, RelDistribution.Type.SINGLETON);
        try {
            TransferableBlock nextBlock = operator.nextBlock();
            Assert.assertTrue(nextBlock.isErrorBlock());
            Assert.assertTrue(((String) nextBlock.getExceptions().get(1000)).contains("TEST ERROR"));
            if (operator != null) {
                operator.close();
            }
        } catch (Throwable th) {
            if (operator != null) {
                try {
                    operator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    /* JADX WARN: Type inference failed for: r2v5, types: [java.lang.Object[], java.lang.Object[][]] */
    @Test
    public void shouldReceiveMailboxFromTwoServersOneNull() {
        Mockito.when(this._mailboxService.getReceivingMailbox((String) Mockito.eq(MAILBOX_ID_1))).thenReturn(this._mailbox1);
        Mockito.when(this._mailbox1.poll()).thenReturn((Object) null, new TransferableBlock[]{TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0)});
        Mockito.when(this._mailboxService.getReceivingMailbox((String) Mockito.eq(MAILBOX_ID_2))).thenReturn(this._mailbox2);
        Object[] objArr = {1, 1};
        Mockito.when(this._mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{objArr}), new TransferableBlock[]{TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0)});
        MailboxReceiveOperator operator = getOperator(this._stageMetadataBoth, RelDistribution.Type.HASH_DISTRIBUTED);
        try {
            List container = operator.nextBlock().getContainer();
            Assert.assertEquals(container.size(), 1);
            Assert.assertEquals((Object[]) container.get(0), objArr);
            Assert.assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock());
            if (operator != null) {
                operator.close();
            }
        } catch (Throwable th) {
            if (operator != null) {
                try {
                    operator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    /* JADX WARN: Type inference failed for: r2v11, types: [java.lang.Object[], java.lang.Object[][]] */
    /* JADX WARN: Type inference failed for: r2v7, types: [java.lang.Object[], java.lang.Object[][]] */
    /* JADX WARN: Type inference failed for: r6v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @Test
    public void shouldReceiveMailboxFromTwoServers() {
        Object[] objArr = {1, 1};
        Object[] objArr2 = {2, 2};
        Object[] objArr3 = {3, 3};
        Mockito.when(this._mailboxService.getReceivingMailbox((String) Mockito.eq(MAILBOX_ID_1))).thenReturn(this._mailbox1);
        Mockito.when(this._mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{objArr}), new TransferableBlock[]{OperatorTestUtil.block(DATA_SCHEMA, new Object[]{objArr3}), TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0)});
        Mockito.when(this._mailboxService.getReceivingMailbox((String) Mockito.eq(MAILBOX_ID_2))).thenReturn(this._mailbox2);
        Mockito.when(this._mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{objArr2}), new TransferableBlock[]{TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0)});
        MailboxReceiveOperator operator = getOperator(this._stageMetadataBoth, RelDistribution.Type.HASH_DISTRIBUTED);
        try {
            Assert.assertEquals((Object[]) operator.nextBlock().getContainer().get(0), objArr);
            Assert.assertEquals((Object[]) operator.nextBlock().getContainer().get(0), objArr2);
            Assert.assertEquals((Object[]) operator.nextBlock().getContainer().get(0), objArr3);
            Assert.assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock());
            if (operator != null) {
                operator.close();
            }
        } catch (Throwable th) {
            if (operator != null) {
                try {
                    operator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    /* JADX WARN: Type inference failed for: r2v4, types: [java.lang.Object[], java.lang.Object[][]] */
    @Test
    public void shouldGetReceptionReceiveErrorMailbox() {
        Mockito.when(this._mailboxService.getReceivingMailbox((String) Mockito.eq(MAILBOX_ID_1))).thenReturn(this._mailbox1);
        Mockito.when(this._mailbox1.poll()).thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException("TEST ERROR")));
        Mockito.when(this._mailboxService.getReceivingMailbox((String) Mockito.eq(MAILBOX_ID_2))).thenReturn(this._mailbox2);
        Mockito.when(this._mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{new Object[]{3, 3}}), new TransferableBlock[]{TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0)});
        MailboxReceiveOperator operator = getOperator(this._stageMetadataBoth, RelDistribution.Type.HASH_DISTRIBUTED);
        try {
            TransferableBlock nextBlock = operator.nextBlock();
            Assert.assertTrue(nextBlock.isErrorBlock());
            Assert.assertTrue(((String) nextBlock.getExceptions().get(1000)).contains("TEST ERROR"));
            if (operator != null) {
                operator.close();
            }
        } catch (Throwable th) {
            if (operator != null) {
                try {
                    operator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    /* JADX WARN: Type inference failed for: r2v11, types: [java.lang.Object[], java.lang.Object[][]] */
    /* JADX WARN: Type inference failed for: r2v7, types: [java.lang.Object[], java.lang.Object[][]] */
    /* JADX WARN: Type inference failed for: r6v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @Test
    public void shouldEarlyTerminateMailboxesWhenIndicated() {
        Object[] objArr = {1, 1};
        Mockito.when(this._mailboxService.getReceivingMailbox((String) Mockito.eq(MAILBOX_ID_1))).thenReturn(this._mailbox1);
        Mockito.when(this._mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{objArr}), new TransferableBlock[]{OperatorTestUtil.block(DATA_SCHEMA, new Object[]{new Object[]{3, 3}}), TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0)});
        Mockito.when(this._mailboxService.getReceivingMailbox((String) Mockito.eq(MAILBOX_ID_2))).thenReturn(this._mailbox2);
        Mockito.when(this._mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{new Object[]{2, 2}}), new TransferableBlock[]{TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0)});
        MailboxReceiveOperator operator = getOperator(this._stageMetadataBoth, RelDistribution.Type.HASH_DISTRIBUTED);
        try {
            Assert.assertEquals((Object[]) operator.nextBlock().getContainer().get(0), objArr);
            operator.earlyTerminate();
            Assert.assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock());
            ((ReceivingMailbox) Mockito.verify(this._mailbox1)).earlyTerminate();
            ((ReceivingMailbox) Mockito.verify(this._mailbox2)).earlyTerminate();
            if (operator != null) {
                operator.close();
            }
        } catch (Throwable th) {
            if (operator != null) {
                try {
                    operator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private MailboxReceiveOperator getOperator(StageMetadata stageMetadata, RelDistribution.Type type, long j) {
        OpChainExecutionContext opChainContext = OperatorTestUtil.getOpChainContext(this._mailboxService, j, stageMetadata);
        MailboxReceiveNode mailboxReceiveNode = (MailboxReceiveNode) Mockito.mock(MailboxReceiveNode.class);
        Mockito.when(mailboxReceiveNode.getDistributionType()).thenReturn(type);
        Mockito.when(Integer.valueOf(mailboxReceiveNode.getSenderStageId())).thenReturn(1);
        return new MailboxReceiveOperator(opChainContext, mailboxReceiveNode);
    }

    private MailboxReceiveOperator getOperator(StageMetadata stageMetadata, RelDistribution.Type type) {
        return getOperator(stageMetadata, type, Long.MAX_VALUE);
    }
}
