package org.apache.pinot.core.transport;

import com.google.common.util.concurrent.Futures;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.pinot.common.config.NettyConfig;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.datatable.DataTableBuilder;
import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.QueryScheduler;
import org.apache.pinot.core.routing.ServerRouteInfo;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
import org.apache.pinot.server.access.AccessControl;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.query.QueryThreadContext;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
import org.apache.pinot.util.TestUtils;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/core/transport/QueryRoutingTest.class */
public class QueryRoutingTest {
    private static final int TEST_PORT = 12345;
    private static final ServerInstance SERVER_INSTANCE = new ServerInstance("localhost", TEST_PORT);
    private static final ServerRoutingInstance OFFLINE_SERVER_ROUTING_INSTANCE = SERVER_INSTANCE.toServerRoutingInstance(TableType.OFFLINE, ServerInstance.RoutingType.NETTY);
    private static final ServerRoutingInstance REALTIME_SERVER_ROUTING_INSTANCE = SERVER_INSTANCE.toServerRoutingInstance(TableType.REALTIME, ServerInstance.RoutingType.NETTY);
    private static final BrokerRequest BROKER_REQUEST = CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM testTable");
    private static final Map<ServerInstance, ServerRouteInfo> ROUTING_TABLE = Collections.singletonMap(SERVER_INSTANCE, new ServerRouteInfo(Collections.emptyList(), Collections.emptyList()));
    private QueryRouter _queryRouter;
    private ServerRoutingStatsManager _serverRoutingStatsManager;
    int _requestCount;
    private QueryServer _queryServer;
    private QueryThreadContext.CloseableContext _closeableContext;

    @BeforeClass
    public void setUp() {
        HashMap hashMap = new HashMap();
        hashMap.put("pinot.broker.adaptive.server.selector.enable.stats.collection", true);
        this._serverRoutingStatsManager = new ServerRoutingStatsManager(new PinotConfiguration(hashMap), (BrokerMetrics) Mockito.mock(BrokerMetrics.class));
        this._serverRoutingStatsManager.init();
        this._queryRouter = new QueryRouter("testBroker", (BrokerMetrics) Mockito.mock(BrokerMetrics.class), this._serverRoutingStatsManager);
        this._requestCount = 0;
    }

    @BeforeMethod
    public void setupQueryThreadContext() {
        this._closeableContext = QueryThreadContext.open();
    }

    @AfterMethod
    void closeQueryThreadContext() {
        if (this._closeableContext != null) {
            this._closeableContext.close();
            this._closeableContext = null;
        }
    }

    @AfterMethod
    void shutdownServer() {
        if (this._queryServer != null) {
            this._queryServer.shutDown();
            this._queryServer = null;
        }
    }

    @AfterMethod
    void deregisterServerMetrics() {
        ServerMetrics.deregister();
    }

    private QueryServer getQueryServer(int i, byte[] bArr) {
        return getQueryServer(i, bArr, TEST_PORT);
    }

    private QueryServer getQueryServer(int i, byte[] bArr, int i2) {
        ServerMetrics serverMetrics = (ServerMetrics) Mockito.mock(ServerMetrics.class);
        InstanceRequestHandler instanceRequestHandler = new InstanceRequestHandler("server01", new PinotConfiguration(), mockQueryScheduler(i, bArr), serverMetrics, (AccessControl) Mockito.mock(AccessControl.class));
        ServerMetrics.register(serverMetrics);
        return new QueryServer(i2, (NettyConfig) null, instanceRequestHandler);
    }

    private QueryScheduler mockQueryScheduler(int i, byte[] bArr) {
        QueryScheduler queryScheduler = (QueryScheduler) Mockito.mock(QueryScheduler.class);
        Mockito.when(queryScheduler.submit((ServerQueryRequest) ArgumentMatchers.any())).thenAnswer(invocationOnMock -> {
            Thread.sleep(i);
            return Futures.immediateFuture(bArr);
        });
        return queryScheduler;
    }

    @Test
    public void testValidResponse() throws Exception {
        DataTable emptyDataTable = DataTableBuilderFactory.getEmptyDataTable();
        emptyDataTable.getMetadata().put(DataTable.MetadataKey.REQUEST_ID.getName(), Long.toString(123L));
        byte[] bytes = emptyDataTable.toBytes();
        String instanceId = SERVER_INSTANCE.getInstanceId();
        this._queryServer = getQueryServer(0, bytes);
        this._queryServer.start();
        Map finalResponses = this._queryRouter.submitQuery(123L, "testTable", BROKER_REQUEST, ROUTING_TABLE, (BrokerRequest) null, (Map) null, 600000L).getFinalResponses();
        Assert.assertEquals(finalResponses.size(), 1);
        Assert.assertTrue(finalResponses.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
        ServerResponse serverResponse = (ServerResponse) finalResponses.get(OFFLINE_SERVER_ROUTING_INSTANCE);
        Assert.assertNotNull(serverResponse.getDataTable());
        Assert.assertEquals(serverResponse.getResponseSize(), bytes.length);
        this._requestCount += 2;
        waitForStatsUpdate(this._requestCount);
        Assert.assertEquals(this._serverRoutingStatsManager.fetchNumInFlightRequestsForServer(instanceId).intValue(), 0);
        Map finalResponses2 = this._queryRouter.submitQuery(123L, "testTable", (BrokerRequest) null, (Map) null, BROKER_REQUEST, ROUTING_TABLE, 1000L).getFinalResponses();
        Assert.assertEquals(finalResponses2.size(), 1);
        Assert.assertTrue(finalResponses2.containsKey(REALTIME_SERVER_ROUTING_INSTANCE));
        ServerResponse serverResponse2 = (ServerResponse) finalResponses2.get(REALTIME_SERVER_ROUTING_INSTANCE);
        Assert.assertNotNull(serverResponse2.getDataTable());
        Assert.assertEquals(serverResponse2.getResponseSize(), bytes.length);
        this._requestCount += 2;
        waitForStatsUpdate(this._requestCount);
        Assert.assertEquals(this._serverRoutingStatsManager.fetchNumInFlightRequestsForServer(instanceId).intValue(), 0);
        Map finalResponses3 = this._queryRouter.submitQuery(123L, "testTable", BROKER_REQUEST, ROUTING_TABLE, BROKER_REQUEST, ROUTING_TABLE, 1000L).getFinalResponses();
        Assert.assertEquals(finalResponses3.size(), 2);
        Assert.assertTrue(finalResponses3.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
        ServerResponse serverResponse3 = (ServerResponse) finalResponses3.get(OFFLINE_SERVER_ROUTING_INSTANCE);
        Assert.assertNotNull(serverResponse3.getDataTable());
        Assert.assertEquals(serverResponse3.getResponseSize(), bytes.length);
        Assert.assertTrue(finalResponses3.containsKey(REALTIME_SERVER_ROUTING_INSTANCE));
        ServerResponse serverResponse4 = (ServerResponse) finalResponses3.get(REALTIME_SERVER_ROUTING_INSTANCE);
        Assert.assertNotNull(serverResponse4.getDataTable());
        Assert.assertEquals(serverResponse4.getResponseSize(), bytes.length);
        this._requestCount += 4;
        waitForStatsUpdate(this._requestCount);
        Assert.assertEquals(this._serverRoutingStatsManager.fetchNumInFlightRequestsForServer(instanceId).intValue(), 0);
    }

    @Test
    public void testInvalidResponse() throws Exception {
        String instanceId = SERVER_INSTANCE.getInstanceId();
        this._queryServer = getQueryServer(0, new byte[0]);
        this._queryServer.start();
        long currentTimeMillis = System.currentTimeMillis();
        Map finalResponses = this._queryRouter.submitQuery(123L, "testTable", BROKER_REQUEST, ROUTING_TABLE, (BrokerRequest) null, (Map) null, 1000L).getFinalResponses();
        Assert.assertEquals(finalResponses.size(), 1);
        Assert.assertTrue(finalResponses.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
        ServerResponse serverResponse = (ServerResponse) finalResponses.get(OFFLINE_SERVER_ROUTING_INSTANCE);
        Assert.assertNull(serverResponse.getDataTable());
        Assert.assertEquals(serverResponse.getResponseDelayMs(), -1);
        Assert.assertEquals(serverResponse.getResponseSize(), 0);
        Assert.assertEquals(serverResponse.getDeserializationTimeMs(), 0);
        Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis >= 1000);
        this._requestCount += 2;
        waitForStatsUpdate(this._requestCount);
        Assert.assertEquals(this._serverRoutingStatsManager.fetchNumInFlightRequestsForServer(instanceId).intValue(), 0);
    }

    @Test
    public void testLatencyForQueryServerException() throws Exception {
        DataTable emptyDataTable = DataTableBuilderFactory.getEmptyDataTable();
        emptyDataTable.getMetadata().put(DataTable.MetadataKey.REQUEST_ID.getName(), Long.toString(123L));
        emptyDataTable.addException(QueryErrorCode.SERVER_TABLE_MISSING, "Test error message");
        byte[] bytes = emptyDataTable.toBytes();
        String instanceId = SERVER_INSTANCE.getInstanceId();
        this._queryServer = getQueryServer(0, bytes);
        this._queryServer.start();
        Double fetchEMALatencyForServer = this._serverRoutingStatsManager.fetchEMALatencyForServer(instanceId);
        Map finalResponses = this._queryRouter.submitQuery(123L, "testTable", BROKER_REQUEST, ROUTING_TABLE, (BrokerRequest) null, (Map) null, 1000L).getFinalResponses();
        Assert.assertEquals(finalResponses.size(), 1);
        Assert.assertTrue(finalResponses.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
        this._requestCount += 2;
        waitForStatsUpdate(this._requestCount);
        Double fetchEMALatencyForServer2 = this._serverRoutingStatsManager.fetchEMALatencyForServer(instanceId);
        if (fetchEMALatencyForServer == null) {
            Assert.assertEquals(fetchEMALatencyForServer2.doubleValue(), 666.334d, 13.32d);
        } else {
            Assert.assertTrue(fetchEMALatencyForServer2.doubleValue() > fetchEMALatencyForServer.doubleValue(), fetchEMALatencyForServer2 + " should be greater than " + fetchEMALatencyForServer);
        }
    }

    @Test
    public void testLatencyForClientException() throws Exception {
        DataTable emptyDataTable = DataTableBuilderFactory.getEmptyDataTable();
        emptyDataTable.getMetadata().put(DataTable.MetadataKey.REQUEST_ID.getName(), Long.toString(123L));
        emptyDataTable.addException(QueryErrorCode.QUERY_CANCELLATION, "Test error message");
        byte[] bytes = emptyDataTable.toBytes();
        String instanceId = SERVER_INSTANCE.getInstanceId();
        this._queryServer = getQueryServer(0, bytes);
        this._queryServer.start();
        Double fetchEMALatencyForServer = this._serverRoutingStatsManager.fetchEMALatencyForServer(instanceId);
        Map finalResponses = this._queryRouter.submitQuery(123L, "testTable", BROKER_REQUEST, ROUTING_TABLE, (BrokerRequest) null, (Map) null, 1000L).getFinalResponses();
        Assert.assertEquals(finalResponses.size(), 1);
        Assert.assertTrue(finalResponses.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
        ServerResponse serverResponse = (ServerResponse) finalResponses.get(OFFLINE_SERVER_ROUTING_INSTANCE);
        this._requestCount += 2;
        waitForStatsUpdate(this._requestCount);
        Double fetchEMALatencyForServer2 = this._serverRoutingStatsManager.fetchEMALatencyForServer(instanceId);
        if (fetchEMALatencyForServer == null) {
            Assert.assertTrue(fetchEMALatencyForServer2.doubleValue() <= ((double) serverResponse.getResponseDelayMs()));
        } else {
            Assert.assertTrue(fetchEMALatencyForServer2.doubleValue() < fetchEMALatencyForServer.doubleValue(), fetchEMALatencyForServer2 + " should be lesser than " + fetchEMALatencyForServer);
        }
    }

    @Test
    public void testLatencyForMultipleExceptions() throws Exception {
        DataTable emptyDataTable = DataTableBuilderFactory.getEmptyDataTable();
        emptyDataTable.getMetadata().put(DataTable.MetadataKey.REQUEST_ID.getName(), Long.toString(123L));
        emptyDataTable.addException(QueryErrorCode.QUERY_CANCELLATION, "Test cancellation error message");
        emptyDataTable.addException(QueryErrorCode.SERVER_TABLE_MISSING, "Test table missing error message");
        byte[] bytes = emptyDataTable.toBytes();
        String instanceId = SERVER_INSTANCE.getInstanceId();
        this._queryServer = getQueryServer(0, bytes);
        this._queryServer.start();
        Double fetchEMALatencyForServer = this._serverRoutingStatsManager.fetchEMALatencyForServer(instanceId);
        Map finalResponses = this._queryRouter.submitQuery(123L, "testTable", BROKER_REQUEST, ROUTING_TABLE, (BrokerRequest) null, (Map) null, 1000L).getFinalResponses();
        Assert.assertEquals(finalResponses.size(), 1);
        Assert.assertTrue(finalResponses.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
        this._requestCount += 2;
        waitForStatsUpdate(this._requestCount);
        Double fetchEMALatencyForServer2 = this._serverRoutingStatsManager.fetchEMALatencyForServer(instanceId);
        if (fetchEMALatencyForServer == null) {
            Assert.assertEquals(fetchEMALatencyForServer2.doubleValue(), 666.334d, 13.32d);
        } else {
            Assert.assertTrue(fetchEMALatencyForServer2.doubleValue() > fetchEMALatencyForServer.doubleValue(), fetchEMALatencyForServer2 + " should be greater than " + fetchEMALatencyForServer);
        }
    }

    @Test
    public void testLatencyForNoException() throws Exception {
        DataTable emptyDataTable = DataTableBuilderFactory.getEmptyDataTable();
        emptyDataTable.getMetadata().put(DataTable.MetadataKey.REQUEST_ID.getName(), Long.toString(123L));
        byte[] bytes = emptyDataTable.toBytes();
        String instanceId = SERVER_INSTANCE.getInstanceId();
        this._queryServer = getQueryServer(0, bytes);
        this._queryServer.start();
        Double fetchEMALatencyForServer = this._serverRoutingStatsManager.fetchEMALatencyForServer(instanceId);
        Map finalResponses = this._queryRouter.submitQuery(123L, "testTable", BROKER_REQUEST, ROUTING_TABLE, (BrokerRequest) null, (Map) null, 1000L).getFinalResponses();
        Assert.assertEquals(finalResponses.size(), 1);
        Assert.assertTrue(finalResponses.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
        ServerResponse serverResponse = (ServerResponse) finalResponses.get(OFFLINE_SERVER_ROUTING_INSTANCE);
        this._requestCount += 2;
        waitForStatsUpdate(this._requestCount);
        Double fetchEMALatencyForServer2 = this._serverRoutingStatsManager.fetchEMALatencyForServer(instanceId);
        if (fetchEMALatencyForServer == null) {
            Assert.assertTrue(fetchEMALatencyForServer2.doubleValue() <= ((double) serverResponse.getResponseDelayMs()));
        } else {
            Assert.assertTrue(fetchEMALatencyForServer2.doubleValue() < fetchEMALatencyForServer.doubleValue(), fetchEMALatencyForServer2 + " should be lesser than " + fetchEMALatencyForServer);
        }
    }

    @Test
    public void testNonMatchingRequestId() throws Exception {
        DataTable emptyDataTable = DataTableBuilderFactory.getEmptyDataTable();
        emptyDataTable.getMetadata().put(DataTable.MetadataKey.REQUEST_ID.getName(), Long.toString(123L));
        byte[] bytes = emptyDataTable.toBytes();
        String instanceId = SERVER_INSTANCE.getInstanceId();
        this._queryServer = getQueryServer(0, bytes);
        this._queryServer.start();
        long currentTimeMillis = System.currentTimeMillis();
        Map finalResponses = this._queryRouter.submitQuery(123 + 1, "testTable", BROKER_REQUEST, ROUTING_TABLE, (BrokerRequest) null, (Map) null, 1000L).getFinalResponses();
        Assert.assertEquals(finalResponses.size(), 1);
        Assert.assertTrue(finalResponses.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
        ServerResponse serverResponse = (ServerResponse) finalResponses.get(OFFLINE_SERVER_ROUTING_INSTANCE);
        Assert.assertNull(serverResponse.getDataTable());
        Assert.assertEquals(serverResponse.getResponseDelayMs(), -1);
        Assert.assertEquals(serverResponse.getResponseSize(), 0);
        Assert.assertEquals(serverResponse.getDeserializationTimeMs(), 0);
        Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis >= 1000);
        this._requestCount += 2;
        waitForStatsUpdate(this._requestCount);
        Assert.assertEquals(this._serverRoutingStatsManager.fetchNumInFlightRequestsForServer(instanceId).intValue(), 0);
    }

    @Test
    public void testServerDown() throws Exception {
        DataTable emptyDataTable = DataTableBuilderFactory.getEmptyDataTable();
        emptyDataTable.getMetadata().put(DataTable.MetadataKey.REQUEST_ID.getName(), Long.toString(123L));
        byte[] bytes = emptyDataTable.toBytes();
        String instanceId = SERVER_INSTANCE.getInstanceId();
        this._queryServer = getQueryServer(500, bytes);
        this._queryServer.start();
        long currentTimeMillis = System.currentTimeMillis();
        AsyncQueryResponse submitQuery = this._queryRouter.submitQuery(123 + 1, "testTable", BROKER_REQUEST, ROUTING_TABLE, (BrokerRequest) null, (Map) null, 2000L);
        this._queryServer.shutDown();
        try {
            Assert.assertFalse(this._queryServer.getChannel().isOpen());
            Assert.assertFalse(this._queryServer.getChannel().isActive());
            Map finalResponses = submitQuery.getFinalResponses();
            Assert.assertEquals(finalResponses.size(), 1);
            Assert.assertTrue(finalResponses.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
            ServerResponse serverResponse = (ServerResponse) finalResponses.get(OFFLINE_SERVER_ROUTING_INSTANCE);
            Assert.assertNull(serverResponse.getDataTable());
            Assert.assertEquals(serverResponse.getResponseDelayMs(), -1);
            Assert.assertEquals(serverResponse.getResponseSize(), 0);
            Assert.assertEquals(serverResponse.getDeserializationTimeMs(), 0);
            Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis < 2000);
            this._requestCount += 2;
            waitForStatsUpdate(this._requestCount);
            Assert.assertEquals(this._serverRoutingStatsManager.fetchNumInFlightRequestsForServer(instanceId).intValue(), 0);
            long currentTimeMillis2 = System.currentTimeMillis();
            Map finalResponses2 = this._queryRouter.submitQuery(123 + 1, "testTable", BROKER_REQUEST, ROUTING_TABLE, (BrokerRequest) null, (Map) null, 2000L).getFinalResponses();
            Assert.assertEquals(finalResponses2.size(), 1);
            Assert.assertTrue(finalResponses2.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
            ServerResponse serverResponse2 = (ServerResponse) finalResponses2.get(OFFLINE_SERVER_ROUTING_INSTANCE);
            Assert.assertNull(serverResponse2.getDataTable());
            Assert.assertEquals(serverResponse2.getSubmitDelayMs(), -1);
            Assert.assertEquals(serverResponse2.getResponseDelayMs(), -1);
            Assert.assertEquals(serverResponse2.getResponseSize(), 0);
            Assert.assertEquals(serverResponse2.getDeserializationTimeMs(), 0);
            Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis2 < 2000);
            this._requestCount += 2;
            waitForStatsUpdate(this._requestCount);
            Assert.assertEquals(this._serverRoutingStatsManager.fetchNumInFlightRequestsForServer(instanceId).intValue(), 0);
            this._queryServer = null;
        } catch (Throwable th) {
            this._queryServer = null;
            throw th;
        }
    }

    @Test
    public void testSkipUnavailableServer() throws IOException, InterruptedException {
        ServerInstance serverInstance = new ServerInstance("localhost", 12346);
        ServerInstance serverInstance2 = new ServerInstance("localhost", 12346 + 1);
        ServerRoutingInstance serverRoutingInstance = serverInstance.toServerRoutingInstance(TableType.OFFLINE, ServerInstance.RoutingType.NETTY);
        ServerRoutingInstance serverRoutingInstance2 = serverInstance2.toServerRoutingInstance(TableType.OFFLINE, ServerInstance.RoutingType.NETTY);
        Map of = Map.of(serverInstance, new ServerRouteInfo(Collections.emptyList(), Collections.emptyList()), serverInstance2, new ServerRouteInfo(Collections.emptyList(), Collections.emptyList()));
        DataTableBuilder dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(new DataSchema(new String[]{"column1"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING}));
        dataTableBuilder.startRow();
        dataTableBuilder.setColumn(0, "value1");
        dataTableBuilder.finishRow();
        DataTable build = dataTableBuilder.build();
        build.getMetadata().put(DataTable.MetadataKey.REQUEST_ID.getName(), Long.toString(123L));
        this._queryServer = getQueryServer(500, build.toBytes(), 12346);
        this._queryServer.start();
        BrokerRequest compileToBrokerRequest = CalciteSqlCompiler.compileToBrokerRequest("SET skipUnavailableServers=true; SELECT * FROM testTable");
        long currentTimeMillis = System.currentTimeMillis();
        Map finalResponses = this._queryRouter.submitQuery(123L, "testTable", compileToBrokerRequest, of, (BrokerRequest) null, (Map) null, 10000L).getFinalResponses();
        Assert.assertEquals(finalResponses.size(), 2);
        Assert.assertTrue(finalResponses.containsKey(serverRoutingInstance));
        Assert.assertTrue(finalResponses.containsKey(serverRoutingInstance2));
        ServerResponse serverResponse = (ServerResponse) finalResponses.get(serverRoutingInstance);
        ServerResponse serverResponse2 = (ServerResponse) finalResponses.get(serverRoutingInstance2);
        Assert.assertNotNull(serverResponse.getDataTable());
        Assert.assertNull(serverResponse2.getDataTable());
        Assert.assertTrue(serverResponse.getResponseDelayMs() > 500);
        Assert.assertTrue(serverResponse2.getResponseDelayMs() < 100);
        Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis > 500);
        this._requestCount += 4;
        waitForStatsUpdate(this._requestCount);
        Assert.assertEquals(this._serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverInstance.getInstanceId()).intValue(), 0);
        Assert.assertEquals(this._serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverInstance2.getInstanceId()).intValue(), 0);
        BrokerRequest compileToBrokerRequest2 = CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM testTable");
        long currentTimeMillis2 = System.currentTimeMillis();
        Map finalResponses2 = this._queryRouter.submitQuery(123L, "testTable", compileToBrokerRequest2, of, (BrokerRequest) null, (Map) null, 10000L).getFinalResponses();
        Assert.assertEquals(finalResponses2.size(), 2);
        Assert.assertTrue(finalResponses2.containsKey(serverRoutingInstance));
        Assert.assertTrue(finalResponses2.containsKey(serverRoutingInstance2));
        ServerResponse serverResponse3 = (ServerResponse) finalResponses2.get(serverRoutingInstance);
        ServerResponse serverResponse4 = (ServerResponse) finalResponses2.get(serverRoutingInstance2);
        Assert.assertNull(serverResponse3.getDataTable());
        Assert.assertNull(serverResponse4.getDataTable());
        Assert.assertTrue(serverResponse3.getResponseDelayMs() < 100);
        Assert.assertTrue(serverResponse4.getResponseDelayMs() < 100);
        Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis2 < 100);
        this._requestCount += 4;
        waitForStatsUpdate(this._requestCount);
        Assert.assertEquals(this._serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverInstance.getInstanceId()).intValue(), 0);
        Assert.assertEquals(this._serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverInstance2.getInstanceId()).intValue(), 0);
    }

    private void waitForStatsUpdate(long j) {
        TestUtils.waitForCondition(r8 -> {
            return Boolean.valueOf(this._serverRoutingStatsManager.getCompletedTaskCount() == j);
        }, 5L, 5000L, "Failed to record stats for AdaptiveServerSelectorTest");
    }
}
