package org.apache.pinot.integration.tests;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.response.CursorResponse;
import org.apache.pinot.common.response.broker.CursorResponseNative;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.util.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/integration/tests/CursorIntegrationTest.class */
public class CursorIntegrationTest extends BaseClusterIntegrationTestSet {
    private static final Logger LOGGER = LoggerFactory.getLogger(CursorIntegrationTest.class);
    private static final int NUM_OFFLINE_SEGMENTS = 8;
    private static final int COUNT_STAR_RESULT = 79003;
    private static final String TEST_QUERY_ONE = "SELECT SUM(CAST(CAST(ArrTime AS varchar) AS LONG)) FROM mytable WHERE DaysSinceEpoch <> 16312 AND Carrier = 'DL'";
    private static final String TEST_QUERY_TWO = "SELECT CAST(CAST(ArrTime AS varchar) AS LONG) FROM mytable WHERE DaysSinceEpoch <> 16312 AND Carrier = 'DL' ORDER BY ArrTime DESC";
    private static final String TEST_QUERY_THREE = "SELECT ArrDelay, CarrierDelay, (ArrDelay - CarrierDelay) AS diff FROM mytable WHERE ArrDelay > CarrierDelay ORDER BY diff, ArrDelay, CarrierDelay LIMIT 100000";
    private static final String EMPTY_RESULT_QUERY = "SELECT SUM(CAST(CAST(ArrTime AS varchar) AS LONG)) FROM mytable WHERE DaysSinceEpoch <> 16312 AND 1 != 1";
    private static int _resultSize;

    protected void overrideControllerConf(Map<String, Object> map) {
        map.put("controller.cluster.response.store.cleaner.frequencyPeriod", "5m");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void overrideBrokerConf(PinotConfiguration pinotConfiguration) {
        pinotConfiguration.setProperty("pinot.broker.cursor.response.store.type", "memory");
    }

    protected long getCountStarResult() {
        return 79003L;
    }

    @BeforeClass
    public void setUp() throws Exception {
        TestUtils.ensureDirectoriesExistAndEmpty(new File[]{this._tempDir, this._segmentDir, this._tarDir});
        startZk();
        startController();
        startBroker();
        startServer();
        List allAvroFiles = getAllAvroFiles();
        List offlineAvroFiles = getOfflineAvroFiles(allAvroFiles, NUM_OFFLINE_SEGMENTS);
        Schema createSchema = createSchema();
        getControllerRequestClient().addSchema(createSchema);
        TableConfig createOfflineTableConfig = createOfflineTableConfig();
        addTableConfig(createOfflineTableConfig);
        ClusterIntegrationTestUtils.buildSegmentsFromAvro(offlineAvroFiles, createOfflineTableConfig, createSchema, 0, this._segmentDir, this._tarDir);
        uploadSegments(getTableName(), this._tarDir);
        setUpQueryGenerator(allAvroFiles);
        waitForAllDocsLoaded(100000L);
    }

    protected String getBrokerGetAllResponseStoresApiUrl(String str) {
        return str + "/responseStore";
    }

    protected String getBrokerResponseApiUrl(String str, String str2) {
        return getBrokerGetAllResponseStoresApiUrl(str) + "/" + str2 + "/results";
    }

    protected String getBrokerDeleteResponseStoresApiUrl(String str, String str2) {
        return getBrokerGetAllResponseStoresApiUrl(str) + "/" + str2;
    }

    protected String getCursorQueryProperties(int i) {
        return String.format("?getCursor=true&numRows=%d", Integer.valueOf(i));
    }

    protected String getCursorOffset(int i) {
        return String.format("?offset=%d", Integer.valueOf(i));
    }

    protected String getCursorOffset(int i, int i2) {
        return String.format("?offset=%d&numRows=%d", Integer.valueOf(i), Integer.valueOf(i2));
    }

    protected Map<String, String> getHeaders() {
        return Collections.emptyMap();
    }

    protected void testQuery(String str, String str2) throws Exception {
        String brokerBaseApiUrl = getBrokerBaseApiUrl();
        Map<String, String> headers = getHeaders();
        JsonNode postQuery = ClusterTest.postQuery(str, ClusterIntegrationTestUtils.getBrokerQueryApiUrl(brokerBaseApiUrl, useMultiStageQueryEngine()), headers, getExtraQueryProperties());
        if (!postQuery.get("exceptions").isEmpty()) {
            throw new RuntimeException("Got Exceptions from Query Response: " + String.valueOf(postQuery));
        }
        int asInt = postQuery.get("numRowsResultSet").asInt();
        CursorResponse cursorResponse = (CursorResponse) JsonUtils.jsonNodeToObject(ClusterTest.postQuery(str, ClusterIntegrationTestUtils.getBrokerQueryApiUrl(brokerBaseApiUrl, useMultiStageQueryEngine()) + getCursorQueryProperties(_resultSize), headers, getExtraQueryProperties()), CursorResponseNative.class);
        if (!cursorResponse.getExceptions().isEmpty()) {
            throw new RuntimeException("Got Exceptions from Query Response: " + String.valueOf(cursorResponse.getExceptions().get(0)));
        }
        int i = 0;
        Iterator<CursorResponse> it = getAllResultPages(brokerBaseApiUrl, headers, cursorResponse, _resultSize).iterator();
        while (it.hasNext()) {
            i += it.next().getNumRows();
        }
        if (asInt != i) {
            throw new RuntimeException("Pinot # of rows from paging API " + i + " doesn't match # of rows from default API " + asInt);
        }
    }

    private List<CursorResponse> getAllResultPages(String str, Map<String, String> map, CursorResponse cursorResponse, int i) throws Exception {
        int i2 = i == 0 ? 10000 : i;
        ArrayList arrayList = new ArrayList();
        arrayList.add(cursorResponse);
        int numRowsResultSet = cursorResponse.getNumRowsResultSet();
        int numRows = cursorResponse.getNumRows();
        while (true) {
            int i3 = numRows;
            if (i3 >= numRowsResultSet) {
                return arrayList;
            }
            CursorResponse cursorResponse2 = (CursorResponse) JsonUtils.stringToObject(ClusterTest.sendGetRequest(getBrokerResponseApiUrl(str, cursorResponse.getRequestId()) + getCursorOffset(i3, i2), map), CursorResponseNative.class);
            arrayList.add(cursorResponse2);
            numRows = i3 + cursorResponse2.getNumRows();
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    protected Object[][] getPageSizesAndQueryEngine() {
        return new Object[]{new Object[]{false, 2}, new Object[]{false, 3}, new Object[]{false, 10}, new Object[]{false, 0}, new Object[]{true, 2}, new Object[]{true, 3}, new Object[]{true, 10}, new Object[]{true, 0}};
    }

    @DataProvider(name = "pageSizeAndQueryEngineProvider")
    public Object[][] pageSizeAndQueryEngineProvider() {
        return getPageSizesAndQueryEngine();
    }

    @Test(dataProvider = "pageSizeAndQueryEngineProvider")
    public void testHardcodedQueries(boolean z, int i) throws Exception {
        _resultSize = i;
        setUseMultiStageQueryEngine(z);
        super.testHardcodedQueries();
    }

    @Test(dataProvider = "useBothQueryEngines")
    public void testCursorWorkflow(boolean z) throws Exception {
        _resultSize = 10000;
        setUseMultiStageQueryEngine(z);
        CursorResponse cursorResponse = (CursorResponse) JsonUtils.jsonNodeToObject(ClusterTest.postQuery(TEST_QUERY_THREE, ClusterIntegrationTestUtils.getBrokerQueryApiUrl(getBrokerBaseApiUrl(), useMultiStageQueryEngine()) + getCursorQueryProperties(_resultSize), getHeaders(), getExtraQueryProperties()), CursorResponseNative.class);
        if (!cursorResponse.getExceptions().isEmpty()) {
            throw new RuntimeException("Got Exceptions from Query Response: " + String.valueOf(cursorResponse.getExceptions().get(0)));
        }
        String requestId = cursorResponse.getRequestId();
        Assert.assertFalse(cursorResponse.getBrokerHost().isEmpty());
        Assert.assertTrue(cursorResponse.getBrokerPort() > 0);
        Assert.assertTrue(cursorResponse.getCursorFetchTimeMs() >= 0);
        Assert.assertTrue(cursorResponse.getCursorResultWriteTimeMs() >= 0);
        int numRowsResultSet = cursorResponse.getNumRowsResultSet();
        int numRows = cursorResponse.getNumRows();
        while (true) {
            int i = numRows;
            if (i >= numRowsResultSet) {
                ClusterTest.sendDeleteRequest(getBrokerDeleteResponseStoresApiUrl(getBrokerBaseApiUrl(), requestId), getHeaders());
                return;
            }
            CursorResponse cursorResponse2 = (CursorResponse) JsonUtils.stringToObject(ClusterTest.sendGetRequest(getBrokerResponseApiUrl(getBrokerBaseApiUrl(), requestId) + getCursorOffset(i, _resultSize), getHeaders()), CursorResponseNative.class);
            Assert.assertFalse(cursorResponse2.getBrokerHost().isEmpty());
            Assert.assertTrue(cursorResponse2.getBrokerPort() > 0);
            Assert.assertTrue(cursorResponse2.getCursorFetchTimeMs() >= 0);
            numRows = i + _resultSize;
        }
    }

    @Test
    public void testGetAndDelete() throws Exception {
        _resultSize = 100000;
        testQuery(TEST_QUERY_ONE);
        testQuery(TEST_QUERY_TWO);
        List list = (List) JsonUtils.stringToObject(ClusterTest.sendGetRequest(getBrokerGetAllResponseStoresApiUrl(getBrokerBaseApiUrl()), getHeaders()), new TypeReference<List<CursorResponseNative>>() { // from class: org.apache.pinot.integration.tests.CursorIntegrationTest.1
        });
        Assert.assertEquals(list.size(), 2);
        String requestId = ((CursorResponseNative) list.get(0)).getRequestId();
        ClusterTest.sendDeleteRequest(getBrokerDeleteResponseStoresApiUrl(getBrokerBaseApiUrl(), requestId), getHeaders());
        List list2 = (List) JsonUtils.stringToObject(ClusterTest.sendGetRequest(getBrokerGetAllResponseStoresApiUrl(getBrokerBaseApiUrl()), getHeaders()), new TypeReference<List<CursorResponseNative>>() { // from class: org.apache.pinot.integration.tests.CursorIntegrationTest.2
        });
        Assert.assertEquals(list2.size(), 1);
        Assert.assertNotEquals(((CursorResponseNative) list2.get(0)).getRequestId(), requestId);
    }

    @Test
    public void testBadGet() {
        try {
            ClusterTest.sendGetRequest(getBrokerResponseApiUrl(getBrokerBaseApiUrl(), "dummy") + getCursorOffset(0), getHeaders());
        } catch (IOException e) {
            HttpErrorStatusException cause = e.getCause();
            Assert.assertEquals(cause.getStatusCode(), 404);
            Assert.assertTrue(cause.getMessage().contains("Query results for dummy not found"));
        }
    }

    @Test
    public void testBadDelete() {
        try {
            ClusterTest.sendDeleteRequest(getBrokerDeleteResponseStoresApiUrl(getBrokerBaseApiUrl(), "dummy"), getHeaders());
        } catch (IOException e) {
            HttpErrorStatusException cause = e.getCause();
            Assert.assertEquals(cause.getStatusCode(), 404);
            Assert.assertTrue(cause.getMessage().contains("Query results for dummy not found"));
        }
    }

    @Test
    public void testQueryWithEmptyResult() throws Exception {
        JsonNode postQuery = ClusterTest.postQuery(EMPTY_RESULT_QUERY, ClusterIntegrationTestUtils.getBrokerQueryApiUrl(getBrokerBaseApiUrl(), useMultiStageQueryEngine()) + getCursorQueryProperties(1000), getHeaders(), getExtraQueryProperties());
        Assert.assertNull(postQuery.get("resultTable"));
        Assert.assertEquals(postQuery.get("numRowsResultSet").asInt(), 0);
        Assert.assertEquals(postQuery.get("numRows").asInt(), 0);
        Assert.assertTrue(postQuery.get("exceptions").isEmpty());
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "InvalidOffsetQueryProvider")
    public Object[][] invalidOffsetQueryProvider() {
        return new Object[]{new Object[]{TEST_QUERY_ONE}, new Object[]{EMPTY_RESULT_QUERY}};
    }

    @Test(dataProvider = "InvalidOffsetQueryProvider", expectedExceptions = {IOException.class}, expectedExceptionsMessageRegExp = ".*Offset \\d+ should be lesser than totalRecords \\d+.*")
    public void testGetInvalidOffset(String str) throws Exception {
        CursorResponse cursorResponse = (CursorResponse) JsonUtils.jsonNodeToObject(ClusterTest.postQuery(str, ClusterIntegrationTestUtils.getBrokerQueryApiUrl(getBrokerBaseApiUrl(), useMultiStageQueryEngine()) + getCursorQueryProperties(_resultSize), getHeaders(), getExtraQueryProperties()), CursorResponseNative.class);
        Assert.assertTrue(cursorResponse.getExceptions().isEmpty());
        ClusterTest.sendGetRequest(getBrokerResponseApiUrl(getBrokerBaseApiUrl(), cursorResponse.getRequestId()) + getCursorOffset(cursorResponse.getNumRowsResultSet() + 1), getHeaders());
    }

    @Test
    public void testQueryWithRuntimeError() throws Exception {
        JsonNode postQuery = ClusterTest.postQuery("SELECT * mytable limit 100", ClusterIntegrationTestUtils.getBrokerQueryApiUrl(getBrokerBaseApiUrl(), useMultiStageQueryEngine()) + getCursorQueryProperties(_resultSize), getHeaders(), getExtraQueryProperties());
        Assert.assertFalse(postQuery.get("exceptions").isEmpty());
        JsonNode jsonNode = postQuery.get("exceptions").get(0);
        Assert.assertTrue(jsonNode.get("message").asText().startsWith("QueryValidationError:"));
        Assert.assertEquals(jsonNode.get("errorCode").asInt(), 700);
        Assert.assertTrue(postQuery.get("brokerId").asText().startsWith("Broker_"));
        Assert.assertNull(postQuery.get("resultTable"));
    }

    @Test
    public void testResponseStoreCleaner() throws Exception {
        int size = ((List) JsonUtils.stringToObject(ClusterTest.sendGetRequest(getBrokerGetAllResponseStoresApiUrl(getBrokerBaseApiUrl()), getHeaders()), new TypeReference<List<CursorResponseNative>>() { // from class: org.apache.pinot.integration.tests.CursorIntegrationTest.3
        })).size();
        _resultSize = 100000;
        testQuery(TEST_QUERY_ONE);
        Thread.sleep(50L);
        testQuery(TEST_QUERY_TWO);
        List list = (List) JsonUtils.stringToObject(ClusterTest.sendGetRequest(getBrokerGetAllResponseStoresApiUrl(getBrokerBaseApiUrl()), getHeaders()), new TypeReference<List<CursorResponseNative>>() { // from class: org.apache.pinot.integration.tests.CursorIntegrationTest.4
        });
        int size2 = list.size();
        Assert.assertEquals(list.size() - size, 2);
        CursorResponseNative cursorResponseNative = (CursorResponseNative) JsonUtils.stringToObject(ClusterTest.sendGetRequest(getBrokerResponseApiUrl(getBrokerBaseApiUrl(), ((CursorResponseNative) list.get(0)).getRequestId()), getHeaders()), new TypeReference<CursorResponseNative>() { // from class: org.apache.pinot.integration.tests.CursorIntegrationTest.5
        });
        CursorResponseNative cursorResponseNative2 = (CursorResponseNative) JsonUtils.stringToObject(ClusterTest.sendGetRequest(getBrokerResponseApiUrl(getBrokerBaseApiUrl(), ((CursorResponseNative) list.get(1)).getRequestId()), getHeaders()), new TypeReference<CursorResponseNative>() { // from class: org.apache.pinot.integration.tests.CursorIntegrationTest.6
        });
        long expirationTimeMs = cursorResponseNative.getExpirationTimeMs();
        long expirationTimeMs2 = cursorResponseNative2.getExpirationTimeMs();
        Properties properties = new Properties();
        properties.setProperty("requestId", "CursorIntegrationTest");
        properties.setProperty("response.store.cleaner.clean.at.ms", Long.toString(Math.min(expirationTimeMs, expirationTimeMs2)));
        this._controllerStarter.getPeriodicTaskScheduler().scheduleNow("ResponseStoreCleaner", properties);
        TestUtils.waitForCondition(r5 -> {
            try {
                return Boolean.valueOf(((List) JsonUtils.stringToObject(ClusterTest.sendGetRequest(getBrokerGetAllResponseStoresApiUrl(getBrokerBaseApiUrl()), getHeaders()), List.class)).size() < size2);
            } catch (Exception e) {
                LOGGER.error(e.getMessage());
                return false;
            }
        }, 500L, 100000L, "Failed to load delete query results", true);
    }
}
