package org.apache.pinot.integration.tests;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableList;
import java.io.File;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.InstanceTypeUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/integration/tests/CancelQueryIntegrationTests.class */
public class CancelQueryIntegrationTests extends BaseClusterIntegrationTestSet {
    private static final int NUM_BROKERS = 1;
    private static final int NUM_SERVERS = 4;
    private final List<ServiceStatus.ServiceStatusCallback> _serviceStatusCallbacks = new ArrayList(getNumBrokers() + getNumServers());

    protected int getNumBrokers() {
        return 1;
    }

    protected int getNumServers() {
        return 4;
    }

    protected void overrideBrokerConf(PinotConfiguration pinotConfiguration) {
        super.overrideBrokerConf(pinotConfiguration);
        pinotConfiguration.setProperty("pinot.broker.enable.query.cancellation", "true");
    }

    protected void overrideServerConf(PinotConfiguration pinotConfiguration) {
        super.overrideServerConf(pinotConfiguration);
        pinotConfiguration.setProperty("pinot.server.enable.query.cancellation", "true");
    }

    @BeforeClass
    public void setUp() throws Exception {
        TestUtils.ensureDirectoriesExistAndEmpty(new File[]{this._tempDir, this._segmentDir, this._tarDir});
        startZk();
        startController();
        this._helixManager.getConfigAccessor().set(new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName()).build(), "default.hyperloglog.log2m", Integer.toString(12));
        startBrokers(getNumBrokers());
        startServers(getNumServers());
        Schema createSchema = createSchema();
        addSchema(createSchema);
        TableConfig createOfflineTableConfig = createOfflineTableConfig();
        addTableConfig(createOfflineTableConfig);
        List unpackAvroData = unpackAvroData(this._tempDir);
        ClusterIntegrationTestUtils.buildSegmentsFromAvro(unpackAvroData, createOfflineTableConfig, createSchema, 0, this._segmentDir, this._tarDir);
        File file = new File(this._tempDir, "tarDir2");
        FileUtils.copyDirectory(this._tarDir, file);
        ArrayList arrayList = new ArrayList();
        arrayList.add(this._tarDir);
        arrayList.add(file);
        try {
            uploadSegments(getTableName(), TableType.OFFLINE, arrayList);
        } catch (Exception e) {
            Assert.assertTrue(e.getMessage().contains("Another segment upload is in progress for segment") || e.getMessage().contains("Failed to create ZK metadata for segment") || e.getMessage().contains("java.nio.file.FileAlreadyExistsException"), e.getMessage());
            uploadSegments(getTableName(), this._tarDir);
        }
        setUpH2Connection(unpackAvroData);
        setUpQueryGenerator(unpackAvroData);
        registerCallbackHandlers();
        waitForAllDocsLoaded(600000L);
    }

    private void registerCallbackHandlers() {
        List<String> instancesInCluster = this._helixAdmin.getInstancesInCluster(getHelixClusterName());
        instancesInCluster.removeIf(str -> {
            return (InstanceTypeUtils.isBroker(str) || InstanceTypeUtils.isServer(str)) ? false : true;
        });
        List<String> resourcesInCluster = this._helixAdmin.getResourcesInCluster(getHelixClusterName());
        resourcesInCluster.removeIf(str2 -> {
            return (TableNameBuilder.isTableResource(str2) || "brokerResource".equals(str2)) ? false : true;
        });
        for (String str3 : instancesInCluster) {
            ArrayList arrayList = new ArrayList();
            for (String str4 : resourcesInCluster) {
                IdealState resourceIdealState = this._helixAdmin.getResourceIdealState(getHelixClusterName(), str4);
                Iterator it = resourceIdealState.getPartitionSet().iterator();
                while (true) {
                    if (it.hasNext()) {
                        if (resourceIdealState.getInstanceSet((String) it.next()).contains(str3)) {
                            arrayList.add(str4);
                            break;
                        }
                    } else {
                        break;
                    }
                }
            }
            this._serviceStatusCallbacks.add(new ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList.of(new ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(this._helixManager, getHelixClusterName(), str3, arrayList, 100.0d), new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(this._helixManager, getHelixClusterName(), str3, arrayList, 100.0d))));
        }
    }

    @Test
    public void testInstancesStarted() {
        Assert.assertEquals(this._serviceStatusCallbacks.size(), getNumBrokers() + getNumServers());
        Iterator<ServiceStatus.ServiceStatusCallback> it = this._serviceStatusCallbacks.iterator();
        while (it.hasNext()) {
            Assert.assertEquals(it.next().getServiceStatus(), ServiceStatus.Status.GOOD);
        }
    }

    @Test(dataProvider = "useBothQueryEngines")
    public void testCancelByClientQueryId(boolean z) throws Exception {
        setUseMultiStageQueryEngine(z);
        final String uuid = UUID.randomUUID().toString();
        String str = "SET clientQueryId='" + uuid + "'; SELECT sleep(ActualElapsedTime+60000) FROM mytable WHERE ActualElapsedTime > 0 limit 1";
        new Timer().schedule(new TimerTask() { // from class: org.apache.pinot.integration.tests.CancelQueryIntegrationTests.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                try {
                    CancelQueryIntegrationTests.this.sendCancel(uuid);
                } catch (Exception e) {
                    Assert.fail("No exception should be thrown", e);
                }
            }
        }, z ? 5000L : 500L);
        assertQueryCancellation(postQuery(str), z ? "InterruptedException" : "QueryCancellationError");
    }

    private void sendCancel(String str) throws Exception {
        cancelQuery(str);
    }

    private void assertQueryCancellation(JsonNode jsonNode, String str) {
        Assert.assertNotNull(jsonNode);
        JsonNode jsonNode2 = jsonNode.get("exceptions");
        Assert.assertNotNull(jsonNode2);
        Assert.assertTrue(jsonNode2.isArray());
        Assert.assertFalse(jsonNode2.isEmpty());
        Iterator it = jsonNode2.iterator();
        while (it.hasNext()) {
            JsonNode jsonNode3 = ((JsonNode) it.next()).get("message");
            if (jsonNode3 != null && jsonNode3.asText().contains(str)) {
                return;
            }
        }
        Assert.fail("At least one QueryCancellationError expected.");
    }
}
