package org.apache.pinot.query.service.dispatch;

import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.query.QueryEnvironment;
import org.apache.pinot.query.QueryEnvironmentTestBase;
import org.apache.pinot.query.QueryTestSet;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.runtime.QueryRunner;
import org.apache.pinot.query.service.server.QueryServer;
import org.apache.pinot.query.testutils.QueryTestUtils;
import org.apache.pinot.spi.trace.DefaultRequestContext;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/query/service/dispatch/QueryDispatcherTest.class */
public class QueryDispatcherTest extends QueryTestSet {
    private static final AtomicLong REQUEST_ID_GEN = new AtomicLong();
    private static final int QUERY_SERVER_COUNT = 2;
    private final Map<Integer, QueryServer> _queryServerMap = new HashMap();
    private QueryEnvironment _queryEnvironment;
    private QueryDispatcher _queryDispatcher;

    @BeforeClass
    public void setUp() throws Exception {
        for (int i = 0; i < QUERY_SERVER_COUNT; i++) {
            int availablePort = QueryTestUtils.getAvailablePort();
            QueryServer queryServer = (QueryServer) Mockito.spy(new QueryServer(availablePort, (QueryRunner) Mockito.mock(QueryRunner.class)));
            queryServer.start();
            this._queryServerMap.put(Integer.valueOf(availablePort), queryServer);
        }
        ArrayList arrayList = new ArrayList(this._queryServerMap.keySet());
        this._queryEnvironment = QueryEnvironmentTestBase.getQueryEnvironment(1, ((Integer) arrayList.get(0)).intValue(), ((Integer) arrayList.get(1)).intValue(), QueryEnvironmentTestBase.TABLE_SCHEMAS, QueryEnvironmentTestBase.SERVER1_SEGMENTS, QueryEnvironmentTestBase.SERVER2_SEGMENTS, (Map) null);
        this._queryDispatcher = new QueryDispatcher((MailboxService) Mockito.mock(MailboxService.class));
    }

    @AfterClass
    public void tearDown() {
        this._queryDispatcher.shutdown();
        Iterator<QueryServer> it = this._queryServerMap.values().iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
    }

    @Test(dataProvider = "testSql")
    public void testQueryDispatcherCanSendCorrectPayload(String str) throws Exception {
        this._queryDispatcher.submit(REQUEST_ID_GEN.getAndIncrement(), this._queryEnvironment.planQuery(str), 10000L, Collections.emptyMap());
    }

    @Test
    public void testQueryDispatcherThrowsWhenQueryServerThrows() {
        QueryServer next = this._queryServerMap.values().iterator().next();
        ((QueryServer) Mockito.doThrow(new Throwable[]{new RuntimeException("foo")}).when(next)).submit((Worker.QueryRequest) Mockito.any(), (StreamObserver) Mockito.any());
        try {
            this._queryDispatcher.submit(REQUEST_ID_GEN.getAndIncrement(), this._queryEnvironment.planQuery("SELECT * FROM a WHERE col1 = 'foo'"), 10000L, Collections.emptyMap());
            Assert.fail("Method call above should have failed");
        } catch (Exception e) {
            Assert.assertTrue(e.getMessage().contains("Error dispatching query"));
        }
        Mockito.reset(new QueryServer[]{next});
    }

    @Test
    public void testQueryDispatcherCancelWhenQueryServerCallsOnError() throws Exception {
        QueryServer next = this._queryServerMap.values().iterator().next();
        ((QueryServer) Mockito.doAnswer(invocationOnMock -> {
            ((StreamObserver) invocationOnMock.getArgument(1)).onError(new RuntimeException("foo"));
            return null;
        }).when(next)).submit((Worker.QueryRequest) Mockito.any(), (StreamObserver) Mockito.any());
        long andIncrement = REQUEST_ID_GEN.getAndIncrement();
        DefaultRequestContext defaultRequestContext = new DefaultRequestContext();
        defaultRequestContext.setRequestId(andIncrement);
        try {
            this._queryDispatcher.submitAndReduce(defaultRequestContext, this._queryEnvironment.planQuery("SELECT * FROM a WHERE col1 = 'foo'"), 10000L, Collections.emptyMap(), (Map) null);
            Assert.fail("Method call above should have failed");
        } catch (Exception e) {
            Assert.assertTrue(e.getMessage().contains("Error dispatching query"));
        }
        Thread.sleep(50L);
        Iterator<QueryServer> it = this._queryServerMap.values().iterator();
        while (it.hasNext()) {
            ((QueryServer) Mockito.verify(it.next(), Mockito.times(1))).cancel((Worker.CancelRequest) Mockito.argThat(cancelRequest -> {
                return cancelRequest.getRequestId() == andIncrement;
            }), (StreamObserver) Mockito.any());
        }
        Mockito.reset(new QueryServer[]{next});
    }

    @Test
    public void testQueryDispatcherCancelWhenQueryReducerThrowsError() throws Exception {
        long andIncrement = REQUEST_ID_GEN.getAndIncrement();
        DefaultRequestContext defaultRequestContext = new DefaultRequestContext();
        defaultRequestContext.setRequestId(andIncrement);
        try {
            this._queryDispatcher.submitAndReduce(defaultRequestContext, this._queryEnvironment.planQuery("SELECT * FROM a"), 10000L, Collections.emptyMap(), (Map) null);
            Assert.fail("Method call above should have failed");
        } catch (NullPointerException e) {
        }
        Thread.sleep(50L);
        Iterator<QueryServer> it = this._queryServerMap.values().iterator();
        while (it.hasNext()) {
            ((QueryServer) Mockito.verify(it.next(), Mockito.times(1))).cancel((Worker.CancelRequest) Mockito.argThat(cancelRequest -> {
                return cancelRequest.getRequestId() == andIncrement;
            }), (StreamObserver) Mockito.any());
        }
    }

    @Test
    public void testQueryDispatcherThrowsWhenQueryServerCallsOnError() {
        QueryServer next = this._queryServerMap.values().iterator().next();
        ((QueryServer) Mockito.doAnswer(invocationOnMock -> {
            ((StreamObserver) invocationOnMock.getArgument(1)).onError(new RuntimeException("foo"));
            return null;
        }).when(next)).submit((Worker.QueryRequest) Mockito.any(), (StreamObserver) Mockito.any());
        try {
            this._queryDispatcher.submit(REQUEST_ID_GEN.getAndIncrement(), this._queryEnvironment.planQuery("SELECT * FROM a WHERE col1 = 'foo'"), 10000L, Collections.emptyMap());
            Assert.fail("Method call above should have failed");
        } catch (Exception e) {
            Assert.assertTrue(e.getMessage().contains("Error dispatching query"));
        }
        Mockito.reset(new QueryServer[]{next});
    }

    @Test
    public void testQueryDispatcherThrowsWhenQueryServerTimesOut() {
        QueryServer next = this._queryServerMap.values().iterator().next();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ((QueryServer) Mockito.doAnswer(invocationOnMock -> {
            countDownLatch.await();
            ((StreamObserver) invocationOnMock.getArgument(1)).onCompleted();
            return null;
        }).when(next)).submit((Worker.QueryRequest) Mockito.any(), (StreamObserver) Mockito.any());
        try {
            this._queryDispatcher.submit(REQUEST_ID_GEN.getAndIncrement(), this._queryEnvironment.planQuery("SELECT * FROM a WHERE col1 = 'foo'"), 200L, Collections.emptyMap());
            Assert.fail("Method call above should have failed");
        } catch (Exception e) {
            String message = e.getMessage();
            Assert.assertTrue(message.contains("Timed out waiting for response") || message.contains("Error dispatching query"));
        }
        countDownLatch.countDown();
        Mockito.reset(new QueryServer[]{next});
    }

    @Test
    public void testQueryDispatcherThrowsWhenDeadlinePreExpiredAndAsyncResponseNotPolled() {
        try {
            this._queryDispatcher.submit(REQUEST_ID_GEN.getAndIncrement(), this._queryEnvironment.planQuery("SELECT * FROM a WHERE col1 = 'foo'"), 0L, Collections.emptyMap());
            Assert.fail("Method call above should have failed");
        } catch (Exception e) {
            Assert.assertTrue(e.getMessage().contains("Timed out waiting"));
        }
    }
}
