package org.apache.pinot.integration.tests;

import com.fasterxml.jackson.databind.JsonNode;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Predicate;
import java.io.File;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.util.TestUtils;
import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/integration/tests/SpoolIntegrationTest.class */
public class SpoolIntegrationTest extends BaseClusterIntegrationTest implements ExplainIntegrationTestTrait {
    @BeforeClass
    public void setUp() throws Exception {
        TestUtils.ensureDirectoriesExistAndEmpty(new File[]{this._tempDir, this._segmentDir, this._tarDir});
        startZk();
        startController();
        startBroker();
        startServers(2);
        Schema createSchema = createSchema();
        addSchema(createSchema);
        TableConfig createOfflineTableConfig = createOfflineTableConfig();
        addTableConfig(createOfflineTableConfig);
        ClusterIntegrationTestUtils.buildSegmentsFromAvro(unpackAvroData(this._tempDir), createOfflineTableConfig, createSchema, 0, this._segmentDir, this._tarDir);
        uploadSegments(getTableName(), this._tarDir);
        waitForAllDocsLoaded(600000L);
    }

    protected void overrideBrokerConf(PinotConfiguration pinotConfiguration) {
        pinotConfiguration.setProperty("pinot.query.multistage.explain.include.segment.plan", "true");
    }

    @BeforeMethod
    public void resetMultiStage() {
        setUseMultiStageQueryEngine(true);
    }

    @Test
    public void intermediateSpool() throws Exception {
        JsonNode postQuery = postQuery("SET useSpools = true;\nWITH group_and_sum AS (\n  SELECT ArrTimeBlk,\n    Dest,\n    SUM(ArrTime) AS ArrTime\n  FROM mytable\n  GROUP BY ArrTimeBlk,\n    Dest\n  limit 1000\n),\naggregated_data AS (\n  SELECT\n    Dest,\n    SUM(ArrTime) AS ArrTime\n  FROM group_and_sum\n  GROUP BY\n    Dest\n),\njoined AS (\n  SELECT\n    s.Dest,\n    s.ArrTime,\n    (o.ArrTime) AS ArrTime2\n  FROM group_and_sum s\n  JOIN aggregated_data o\n  ON s.Dest = o.Dest\n)\nSELECT *\nFROM joined\nLIMIT 1");
        JsonNode jsonNode = postQuery.get("stageStats");
        assertNoError(postQuery);
        DocumentContext parse = JsonPath.parse(jsonNode.toString());
        checkSpoolTimes(parse, 4, 3, 1);
        checkSpoolTimes(parse, 4, 7, 1);
        checkSpoolSame(parse, 4, 3, 7);
    }

    @Test
    public void testNestedSpools() throws Exception {
        JsonNode postQuery = postQuery("SET useSpools = true;\n\nWITH\n    q1 AS (\n        SELECT ArrTimeBlk as userUUID,\n               Dest as deviceOS,\n               SUM(ArrTime) AS totalTrips\n        FROM mytable\n        GROUP BY ArrTimeBlk, Dest\n    ),\n     q2 AS (\n         SELECT userUUID,\n                deviceOS,\n                SUM(totalTrips) AS totalTrips,\n                COUNT(DISTINCT userUUID) AS reach\n         FROM q1\n         GROUP BY userUUID,\n                  deviceOS\n     ),\n     q3 AS (\n         SELECT userUUID,\n                (totalTrips / reach) AS frequency\n         FROM q2\n     ),\n     q4 AS (\n         SELECT rd.userUUID,\n                rd.deviceOS,\n                rd.totalTrips as totalTrips,\n                rd.reach AS reach\n         FROM q2 rd\n     ),\n     q5 AS (\n         SELECT userUUID,\n                SUM(totalTrips) AS totalTrips\n         FROM q4\n         GROUP BY userUUID\n     ),\n     q6 AS (\n         SELECT s.userUUID,\n                s.totalTrips,\n                (s.totalTrips / o.frequency) AS reach,\n                'some fake device' AS deviceOS\n         FROM q5 s\n                  JOIN q3 o ON s.userUUID = o.userUUID\n     ),\n     q7 AS (\n         SELECT rd.userUUID,\n                rd.totalTrips,\n                rd.reach,\n                rd.deviceOS\n         FROM q4 rd\n         UNION ALL\n         SELECT f.userUUID,\n                f.totalTrips,\n                f.reach,\n                f.deviceOS\n         FROM q6 f\n     ),\n     q8 AS (\n         SELECT sd.*\n         FROM q7 sd\n                  JOIN (\n             SELECT deviceOS,\n                    PERCENTILETDigest(totalTrips, 20) AS p20\n             FROM q7\n             GROUP BY deviceOS\n         ) q ON sd.deviceOS = q.deviceOS\n     )\nSELECT *\nFROM q8");
        JsonNode jsonNode = postQuery.get("stageStats");
        assertNoError(postQuery);
        DocumentContext parse = JsonPath.parse(jsonNode.toString());
        checkSpoolTimes(parse, 6, 5, 1);
        checkSpoolTimes(parse, 6, 14, 1);
        checkSpoolSame(parse, 6, 5, 14);
        checkSpoolTimes(parse, 7, 6, 2);
        checkSpoolTimes(parse, 4, 3, 1);
        checkSpoolTimes(parse, 4, 7, 2);
        checkSpoolTimes(parse, 4, 9, 1);
        checkSpoolTimes(parse, 4, 12, 1);
        checkSpoolTimes(parse, 4, 18, 1);
        checkSpoolSame(parse, 4, 3, 7, 9, 12, 18);
    }

    private List<Map<String, Object>> findDescendantById(DocumentContext documentContext, int i, int i2) {
        return (List) documentContext.read("$..[?(@.stage == " + i + ")]..[?(@.stage == " + i2 + ")]", new Predicate[0]);
    }

    private void checkSpoolTimes(DocumentContext documentContext, int i, int i2, int i3) {
        List<Map<String, Object>> findDescendantById = findDescendantById(documentContext, i2, i);
        Assert.assertEquals(findDescendantById.size(), i3, "Stage " + i + " should be descended from stage " + i2 + " exactly " + i3 + " times");
        Map<String, Object> map = findDescendantById.get(0);
        for (int i4 = 1; i4 < findDescendantById.size(); i4++) {
            Assert.assertEquals(findDescendantById.get(i4), map, "Stage " + i + " should be the same in all " + i3 + " descendants");
        }
    }

    private void checkSpoolSame(DocumentContext documentContext, int i, int... iArr) {
        List list = (List) Arrays.stream(iArr).mapToObj(i2 -> {
            return Pair.of(Integer.valueOf(i2), findDescendantById(documentContext, i2, i));
        }).collect(Collectors.toList());
        Pair pair = (Pair) list.stream().filter(pair2 -> {
            return !((List) pair2.getValue()).isEmpty();
        }).findFirst().orElse(null);
        if (pair == null) {
            Assert.fail("None of the parent nodes " + Arrays.toString(iArr) + " have a descendant with id " + i);
        }
        if (((List) list.stream().filter(pair3 -> {
            return !((Map) ((List) pair3.getValue()).get(0)).equals(((List) pair.getValue()).get(0));
        }).collect(Collectors.toList())).isEmpty()) {
            return;
        }
        Assert.fail("The descendant with id " + i + " is not the same in all parent nodes " + String.valueOf(list));
    }

    @AfterClass
    public void tearDown() throws Exception {
        dropOfflineTable("mytable");
        stopServer();
        stopBroker();
        stopController();
        stopZk();
        FileUtils.deleteDirectory(this._tempDir);
    }
}
