package org.apache.pinot.query.runtime.plan.pipeline;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.logical.PinotRelExchangeType;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.NamedThreadFactory;
import org.apache.pinot.query.mailbox.MailboxIdUtils;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.ReceivingMailbox;
import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.planner.plannode.JoinNode;
import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.routing.MailboxMetadata;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.executor.ExecutorServiceUtils;
import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
import org.apache.pinot.query.runtime.operator.OperatorTestUtil;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.query.runtime.plan.StageMetadata;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.class */
public class PipelineBreakerExecutorTest {
    private static final VirtualServerAddress RECEIVER_ADDRESS = new VirtualServerAddress("localhost", 123, 0);
    private static final DataSchema DATA_SCHEMA = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT});
    private static final String MAILBOX_ID_1 = MailboxIdUtils.toMailboxId(0, 1, 0, 0, 0);
    private static final String MAILBOX_ID_2 = MailboxIdUtils.toMailboxId(0, 2, 0, 0, 0);
    private AutoCloseable _mocks;

    @Mock
    private MailboxService _mailboxService;

    @Mock
    private ReceivingMailbox _mailbox1;

    @Mock
    private ReceivingMailbox _mailbox2;
    private VirtualServerAddress _server = new VirtualServerAddress("localhost", 123, 0);
    private ExecutorService _executor = Executors.newCachedThreadPool(new NamedThreadFactory("worker_on_asd_" + getClass().getSimpleName()));
    private OpChainSchedulerService _scheduler = new OpChainSchedulerService(this._executor);
    private StageMetadata _stageMetadata1 = new StageMetadata.Builder().setWorkerMetadataList((List) Stream.of(this._server).map(virtualServerAddress -> {
        return new WorkerMetadata.Builder().setVirtualServerAddress(virtualServerAddress).addMailBoxInfoMap(0, new MailboxMetadata(ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(2, 0, 0, 0)), ImmutableList.of(this._server), ImmutableMap.of())).addMailBoxInfoMap(1, new MailboxMetadata(ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), ImmutableList.of(this._server), ImmutableMap.of())).addMailBoxInfoMap(2, new MailboxMetadata(ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(2, 0, 0, 0)), ImmutableList.of(this._server), ImmutableMap.of())).build();
    }).collect(Collectors.toList())).build();

    @AfterClass
    public void tearDownClass() {
        ExecutorServiceUtils.close(this._executor);
    }

    @BeforeMethod
    public void setUp() {
        this._mocks = MockitoAnnotations.openMocks(this);
        Mockito.when(this._mailboxService.getHostname()).thenReturn("localhost");
        Mockito.when(Integer.valueOf(this._mailboxService.getPort())).thenReturn(123);
        Mockito.when(this._mailbox1.getId()).thenReturn("mailbox1");
        Mockito.when(this._mailbox2.getId()).thenReturn("mailbox2");
    }

    @AfterMethod
    public void tearDown() throws Exception {
        this._mocks.close();
    }

    /* JADX WARN: Type inference failed for: r2v7, types: [java.lang.Object[], java.lang.Object[][]] */
    /* JADX WARN: Type inference failed for: r6v2, types: [java.lang.Object[], java.lang.Object[][]] */
    @Test
    public void shouldReturnBlocksUponNormalOperation() {
        DistributedStagePlan distributedStagePlan = new DistributedStagePlan(0, RECEIVER_ADDRESS, new MailboxReceiveNode(0, DATA_SCHEMA, 1, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER, (KeySelector) null, (List) null, false, false, (PlanNode) null), this._stageMetadata1);
        Mockito.when(this._mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(this._mailbox1);
        Mockito.when(this._mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{new Object[]{1, 1}}), new TransferableBlock[]{OperatorTestUtil.block(DATA_SCHEMA, new Object[]{new Object[]{2, 3}}), TransferableBlockUtils.getEndOfStreamTransferableBlock(OperatorTestUtil.getDummyStats(0L, 1, this._server))});
        PipelineBreakerResult executePipelineBreakers = PipelineBreakerExecutor.executePipelineBreakers(this._scheduler, this._mailboxService, distributedStagePlan, Collections.emptyMap(), 0L, Long.MAX_VALUE);
        Assert.assertNotNull(executePipelineBreakers);
        Assert.assertNull(executePipelineBreakers.getErrorBlock());
        Assert.assertEquals(executePipelineBreakers.getResultMap().size(), 1);
        Assert.assertEquals(((List) executePipelineBreakers.getResultMap().values().iterator().next()).size(), 2);
        Assert.assertNotNull(executePipelineBreakers.getOpChainStats());
        Assert.assertEquals(executePipelineBreakers.getOpChainStats().getOperatorStatsMap().size(), 1);
    }

    /* JADX WARN: Type inference failed for: r2v13, types: [java.lang.Object[], java.lang.Object[][]] */
    /* JADX WARN: Type inference failed for: r2v9, types: [java.lang.Object[], java.lang.Object[][]] */
    @Test
    public void shouldWorkWithMultiplePBNodeUponNormalOperation() {
        MailboxReceiveNode mailboxReceiveNode = new MailboxReceiveNode(0, DATA_SCHEMA, 1, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER, (KeySelector) null, (List) null, false, false, (PlanNode) null);
        MailboxReceiveNode mailboxReceiveNode2 = new MailboxReceiveNode(0, DATA_SCHEMA, 2, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER, (KeySelector) null, (List) null, false, false, (PlanNode) null);
        JoinNode joinNode = new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, (JoinNode.JoinKeys) null, (List) null, Collections.emptyList());
        joinNode.addInput(mailboxReceiveNode);
        joinNode.addInput(mailboxReceiveNode2);
        DistributedStagePlan distributedStagePlan = new DistributedStagePlan(0, RECEIVER_ADDRESS, joinNode, this._stageMetadata1);
        Mockito.when(this._mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(this._mailbox1);
        Mockito.when(this._mailboxService.getReceivingMailbox(MAILBOX_ID_2)).thenReturn(this._mailbox2);
        Mockito.when(this._mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{new Object[]{1, 1}}), new TransferableBlock[]{TransferableBlockUtils.getEndOfStreamTransferableBlock(OperatorTestUtil.getDummyStats(0L, 1, this._server))});
        Mockito.when(this._mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{new Object[]{2, 3}}), new TransferableBlock[]{TransferableBlockUtils.getEndOfStreamTransferableBlock(OperatorTestUtil.getDummyStats(0L, 2, this._server))});
        PipelineBreakerResult executePipelineBreakers = PipelineBreakerExecutor.executePipelineBreakers(this._scheduler, this._mailboxService, distributedStagePlan, Collections.emptyMap(), 0L, Long.MAX_VALUE);
        Assert.assertNotNull(executePipelineBreakers);
        Assert.assertNull(executePipelineBreakers.getErrorBlock());
        Assert.assertEquals(executePipelineBreakers.getResultMap().size(), 2);
        Iterator it = executePipelineBreakers.getResultMap().values().iterator();
        Assert.assertEquals(((List) it.next()).size(), 1);
        Assert.assertEquals(((List) it.next()).size(), 1);
        Assert.assertFalse(it.hasNext());
        Assert.assertNotNull(executePipelineBreakers.getOpChainStats());
        Assert.assertEquals(executePipelineBreakers.getOpChainStats().getOperatorStatsMap().size(), 2);
    }

    @Test
    public void shouldReturnEmptyBlockWhenPBExecuteWithIncorrectMailboxNode() {
        PipelineBreakerResult executePipelineBreakers = PipelineBreakerExecutor.executePipelineBreakers(this._scheduler, this._mailboxService, new DistributedStagePlan(0, RECEIVER_ADDRESS, new MailboxReceiveNode(0, DATA_SCHEMA, 3, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER, (KeySelector) null, (List) null, false, false, (PlanNode) null), this._stageMetadata1), Collections.emptyMap(), 0L, Long.MAX_VALUE);
        Assert.assertNotNull(executePipelineBreakers);
        Assert.assertNull(executePipelineBreakers.getErrorBlock());
        Assert.assertEquals(executePipelineBreakers.getResultMap().size(), 1);
        Assert.assertEquals(((List) executePipelineBreakers.getResultMap().values().iterator().next()).size(), 0);
        Assert.assertNotNull(executePipelineBreakers.getOpChainStats());
    }

    @Test
    public void shouldReturnErrorBlocksFailureWhenPBTimeout() {
        DistributedStagePlan distributedStagePlan = new DistributedStagePlan(0, RECEIVER_ADDRESS, new MailboxReceiveNode(0, DATA_SCHEMA, 1, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER, (KeySelector) null, (List) null, false, false, (PlanNode) null), this._stageMetadata1);
        Mockito.when(this._mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(this._mailbox1);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Mockito.when(this._mailbox1.poll()).thenAnswer(invocationOnMock -> {
            countDownLatch.await();
            return TransferableBlockUtils.getEndOfStreamTransferableBlock();
        });
        PipelineBreakerResult executePipelineBreakers = PipelineBreakerExecutor.executePipelineBreakers(this._scheduler, this._mailboxService, distributedStagePlan, Collections.emptyMap(), 0L, System.currentTimeMillis() + 100);
        Assert.assertNotNull(executePipelineBreakers);
        TransferableBlock errorBlock = executePipelineBreakers.getErrorBlock();
        Assert.assertNotNull(errorBlock);
        Assert.assertTrue(errorBlock.isErrorBlock());
        countDownLatch.countDown();
    }

    /* JADX WARN: Type inference failed for: r2v13, types: [java.lang.Object[], java.lang.Object[][]] */
    /* JADX WARN: Type inference failed for: r2v9, types: [java.lang.Object[], java.lang.Object[][]] */
    @Test
    public void shouldReturnWhenAnyPBReturnsEmpty() {
        MailboxReceiveNode mailboxReceiveNode = new MailboxReceiveNode(0, DATA_SCHEMA, 1, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER, (KeySelector) null, (List) null, false, false, (PlanNode) null);
        MailboxReceiveNode mailboxReceiveNode2 = new MailboxReceiveNode(0, DATA_SCHEMA, 3, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER, (KeySelector) null, (List) null, false, false, (PlanNode) null);
        JoinNode joinNode = new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, (JoinNode.JoinKeys) null, (List) null, Collections.emptyList());
        joinNode.addInput(mailboxReceiveNode);
        joinNode.addInput(mailboxReceiveNode2);
        DistributedStagePlan distributedStagePlan = new DistributedStagePlan(0, RECEIVER_ADDRESS, joinNode, this._stageMetadata1);
        Mockito.when(this._mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(this._mailbox1);
        Mockito.when(this._mailboxService.getReceivingMailbox(MAILBOX_ID_2)).thenReturn(this._mailbox2);
        Mockito.when(this._mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{new Object[]{1, 1}}), new TransferableBlock[]{TransferableBlockUtils.getEndOfStreamTransferableBlock()});
        Mockito.when(this._mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{new Object[]{2, 3}}), new TransferableBlock[]{TransferableBlockUtils.getEndOfStreamTransferableBlock()});
        PipelineBreakerResult executePipelineBreakers = PipelineBreakerExecutor.executePipelineBreakers(this._scheduler, this._mailboxService, distributedStagePlan, Collections.emptyMap(), 0L, Long.MAX_VALUE);
        Assert.assertNotNull(executePipelineBreakers);
        Assert.assertEquals(executePipelineBreakers.getResultMap().size(), 2);
        Assert.assertEquals(((List) executePipelineBreakers.getResultMap().get(0)).size(), 1);
        Assert.assertEquals(((List) executePipelineBreakers.getResultMap().get(1)).size(), 0);
        Assert.assertNotNull(executePipelineBreakers.getOpChainStats());
    }

    /* JADX WARN: Type inference failed for: r2v13, types: [java.lang.Object[], java.lang.Object[][]] */
    /* JADX WARN: Type inference failed for: r2v9, types: [java.lang.Object[], java.lang.Object[][]] */
    @Test
    public void shouldReturnErrorBlocksWhenReceivedErrorFromSender() {
        MailboxReceiveNode mailboxReceiveNode = new MailboxReceiveNode(0, DATA_SCHEMA, 1, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER, (KeySelector) null, (List) null, false, false, (PlanNode) null);
        MailboxReceiveNode mailboxReceiveNode2 = new MailboxReceiveNode(0, DATA_SCHEMA, 2, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER, (KeySelector) null, (List) null, false, false, (PlanNode) null);
        JoinNode joinNode = new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, (JoinNode.JoinKeys) null, (List) null, Collections.emptyList());
        joinNode.addInput(mailboxReceiveNode);
        joinNode.addInput(mailboxReceiveNode2);
        DistributedStagePlan distributedStagePlan = new DistributedStagePlan(0, RECEIVER_ADDRESS, joinNode, this._stageMetadata1);
        Mockito.when(this._mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(this._mailbox1);
        Mockito.when(this._mailboxService.getReceivingMailbox(MAILBOX_ID_2)).thenReturn(this._mailbox2);
        Mockito.when(this._mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{new Object[]{1, 1}}), new TransferableBlock[]{TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException("ERROR ON 1"))});
        Mockito.when(this._mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{new Object[]{2, 3}}), new TransferableBlock[]{TransferableBlockUtils.getEndOfStreamTransferableBlock()});
        PipelineBreakerResult executePipelineBreakers = PipelineBreakerExecutor.executePipelineBreakers(this._scheduler, this._mailboxService, distributedStagePlan, Collections.emptyMap(), 0L, Long.MAX_VALUE);
        Assert.assertNotNull(executePipelineBreakers);
        TransferableBlock errorBlock = executePipelineBreakers.getErrorBlock();
        Assert.assertNotNull(errorBlock);
        Assert.assertTrue(errorBlock.isErrorBlock());
    }
}
