package org.apache.pinot.integration.tests;

import java.io.File;
import java.io.IOException;
import java.lang.ProcessBuilder;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import jnr.constants.platform.Signal;
import jnr.posix.POSIXFactory;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.client.ConnectionFactory;
import org.apache.pinot.tools.admin.PinotAdministrator;
import org.apache.pinot.util.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/integration/tests/ChaosMonkeyIntegrationTest.class */
public class ChaosMonkeyIntegrationTest {
    private static final String TOTAL_RECORD_COUNT = "1000";
    private static final String SEGMENT_COUNT = "25";
    private final List<Process> _processes = new ArrayList();
    private static final Logger LOGGER = LoggerFactory.getLogger(ChaosMonkeyIntegrationTest.class);
    private static final String AVRO_DIR = "/tmp/temp-avro-" + ChaosMonkeyIntegrationTest.class.getName();
    private static final String SEGMENT_DIR = "/tmp/temp-segment-" + ChaosMonkeyIntegrationTest.class.getName();

    private Process runAdministratorCommand(String[] strArr) {
        String property = System.getProperty("java.class.path");
        System.getProperties().setProperty("pinot.admin.system.exit", "false");
        ArrayList arrayList = new ArrayList();
        arrayList.add("java");
        arrayList.add("-Xms4G");
        arrayList.add("-Xmx4G");
        arrayList.add("-cp");
        arrayList.add(property);
        arrayList.add(PinotAdministrator.class.getName());
        arrayList.addAll(Arrays.asList(strArr));
        try {
            Process start = new ProcessBuilder(arrayList).redirectError(ProcessBuilder.Redirect.INHERIT).redirectOutput(ProcessBuilder.Redirect.INHERIT).start();
            synchronized (this._processes) {
                this._processes.add(start);
            }
            return start;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void sendSignalToProcess(Process process, Signal signal) {
        int processPid = getProcessPid(process);
        if (processPid != -1) {
            LOGGER.info("Sending signal {} to process {}", Integer.valueOf(signal.intValue()), Integer.valueOf(processPid));
            POSIXFactory.getNativePOSIX().kill(processPid, signal.intValue());
        }
    }

    private int getProcessPid(Process process) {
        try {
            return (int) MethodHandles.lookup().findStatic(Class.forName("java.lang.ProcessHandle", false, ChaosMonkeyIntegrationTest.class.getClassLoader()), "pid", MethodType.methodType(Long.TYPE)).invoke();
        } catch (Throwable th) {
            LOGGER.warn("failed to invoke java.lang.ProcessHandle#pid - running on JDK8?", th);
            try {
                Field declaredField = process.getClass().getDeclaredField("pid");
                declaredField.setAccessible(true);
                return declaredField.getInt(process);
            } catch (IllegalAccessException | NoSuchFieldException e) {
                return -1;
            }
        }
    }

    private Process startZookeeper() {
        return runAdministratorCommand(new String[]{"StartZookeeper", "-zkPort", "2191"});
    }

    private Process startController() {
        return runAdministratorCommand(new String[]{"StartController", "-zkAddress", "localhost:2191", "-controllerPort", "39000", "-dataDir", "/tmp/ChaosMonkeyClusterController"});
    }

    private Process startBroker() {
        return runAdministratorCommand(new String[]{"StartBroker", "-brokerPort", "8099", "-zkAddress", "localhost:2191"});
    }

    private Process startServer() {
        return runAdministratorCommand(new String[]{"StartServer", "-serverPort", "8098", "-zkAddress", "localhost:2191", "-dataDir", "/tmp/ChaosMonkeyCluster/data", "-segmentDir", "/tmp/ChaosMonkeyCluster/segments"});
    }

    private void generateData() throws InterruptedException {
        runAdministratorCommand(new String[]{"GenerateData", "-numRecords", TOTAL_RECORD_COUNT, "-numFiles", SEGMENT_COUNT, "-schemaFile", TestUtils.getFileFromResourceUrl(ChaosMonkeyIntegrationTest.class.getClassLoader().getResource("chaos-monkey-schema.json")), "-schemaAnnotationFile", TestUtils.getFileFromResourceUrl(ChaosMonkeyIntegrationTest.class.getClassLoader().getResource("chaos-monkey-schema-annotations.json")), "-overwrite", "-outDir", AVRO_DIR}).waitFor();
    }

    private void createTable() throws InterruptedException {
        runAdministratorCommand(new String[]{"AddTable", "-controllerPort", "39000", "-schemaFile", TestUtils.getFileFromResourceUrl(ChaosMonkeyIntegrationTest.class.getClassLoader().getResource("chaos-monkey-schema.json")), "-tableConfigFile", TestUtils.getFileFromResourceUrl(ChaosMonkeyIntegrationTest.class.getClassLoader().getResource("chaos-monkey-create-table.json")), "-exec"}).waitFor();
    }

    private void convertData() throws InterruptedException {
        runAdministratorCommand(new String[]{"CreateSegment", "-schemaFile", TestUtils.getFileFromResourceUrl(ChaosMonkeyIntegrationTest.class.getClassLoader().getResource("chaos-monkey-schema.json")), "-dataDir", AVRO_DIR, "-tableName", "myTable", "-segmentName", "my_table", "-outDir", SEGMENT_DIR, "-overwrite"}).waitFor();
    }

    private void uploadData() throws InterruptedException {
        runAdministratorCommand(new String[]{"UploadSegment", "-controllerPort", "39000", "-segmentDir", SEGMENT_DIR}).waitFor();
    }

    private int countRecords() {
        return ConnectionFactory.fromHostList(new String[]{"localhost:8099"}).execute("select count(*) from myTable").getResultSet(0).getInt(0);
    }

    @Test(enabled = false)
    public void testShortZookeeperFreeze() throws Exception {
        testFreezeZookeeper(10000L);
    }

    @Test(enabled = false)
    public void testLongZookeeperFreeze() throws Exception {
        testFreezeZookeeper(60000L);
    }

    public void testFreezeZookeeper(long j) throws Exception {
        int i;
        Process startZookeeper = startZookeeper();
        Thread.sleep(1000L);
        startController();
        Thread.sleep(3000L);
        startServer();
        startBroker();
        Thread.sleep(3000L);
        createTable();
        generateData();
        convertData();
        uploadData();
        Thread.sleep(5000L);
        long currentTimeMillis = System.currentTimeMillis() + 120000;
        int countRecords = countRecords();
        int parseInt = Integer.parseInt(TOTAL_RECORD_COUNT);
        while (countRecords != parseInt && System.currentTimeMillis() < currentTimeMillis) {
            Thread.sleep(1000L);
            countRecords = countRecords();
        }
        Assert.assertEquals(countRecords, parseInt, "All segments did not load within 120 seconds");
        sendSignalToProcess(startZookeeper, Signal.SIGSTOP);
        Thread.sleep(j);
        sendSignalToProcess(startZookeeper, Signal.SIGCONT);
        Thread.sleep(5000L);
        long currentTimeMillis2 = System.currentTimeMillis() + 120000;
        int countRecords2 = countRecords();
        while (true) {
            i = countRecords2;
            if (i == parseInt || System.currentTimeMillis() >= currentTimeMillis2) {
                break;
            }
            Thread.sleep(1000L);
            countRecords2 = countRecords();
        }
        Assert.assertEquals(i, parseInt, "Record count still inconsistent 120 seconds after zookeeper restart");
    }

    @AfterMethod
    public void tearDown() {
        Iterator<Process> it = this._processes.iterator();
        while (it.hasNext()) {
            it.next().destroy();
        }
        FileUtils.deleteQuietly(new File(AVRO_DIR));
        FileUtils.deleteQuietly(new File(SEGMENT_DIR));
    }
}
