package org.apache.pinot.integration.tests.custom;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.integration.tests.BaseClusterIntegrationTest;
import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
import org.apache.pinot.integration.tests.ClusterTest;
import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.stream.StreamDataServerStartable;
import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.util.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeSuite;

/* loaded from: input_file:org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.class */
public abstract class CustomDataQueryClusterIntegrationTest extends BaseClusterIntegrationTest {
    protected static final Logger LOGGER = LoggerFactory.getLogger(CustomDataQueryClusterIntegrationTest.class);
    protected static CustomDataQueryClusterIntegrationTest _sharedClusterTestSuite = null;
    protected static final String TIMESTAMP_FIELD_NAME = "ts";

    @BeforeSuite
    public void setUpSuite() throws Exception {
        LOGGER.warn("Setting up integration test suite");
        TestUtils.ensureDirectoriesExistAndEmpty(new File[]{this._tempDir, this._segmentDir, this._tarDir});
        _sharedClusterTestSuite = this;
        startZk();
        LOGGER.warn("Start Kafka in the integration test suite");
        startKafka();
        startController();
        startBroker();
        startServer();
        LOGGER.warn("Finished setting up integration test suite");
    }

    @AfterSuite
    public void tearDownSuite() throws Exception {
        LOGGER.warn("Tearing down integration test suite");
        LOGGER.warn("Stop Kafka in the integration test suite");
        stopKafka();
        stopServer();
        stopBroker();
        stopController();
        stopZk();
        FileUtils.deleteDirectory(this._tempDir);
        LOGGER.warn("Finished tearing down integration test suite");
    }

    @BeforeClass
    public void setUp() throws Exception {
        LOGGER.warn("Setting up integration test class: {}", getClass().getSimpleName());
        if (this._controllerRequestURLBuilder == null) {
            this._controllerRequestURLBuilder = ControllerRequestURLBuilder.baseUrl("http://localhost:" + _sharedClusterTestSuite.getControllerPort());
        }
        TestUtils.ensureDirectoriesExistAndEmpty(new File[]{this._tempDir, this._segmentDir, this._tarDir});
        Schema createSchema = createSchema();
        addSchema(createSchema);
        List<File> createAvroFiles = createAvroFiles();
        if (isRealtimeTable()) {
            addTableConfig(createRealtimeTableConfig(createAvroFiles.get(0)));
            pushAvroIntoKafka(createAvroFiles);
        } else {
            TableConfig createOfflineTableConfig = createOfflineTableConfig();
            addTableConfig(createOfflineTableConfig);
            int i = 0;
            Iterator<File> it = createAvroFiles.iterator();
            while (it.hasNext()) {
                int i2 = i;
                i++;
                ClusterIntegrationTestUtils.buildSegmentFromAvro(it.next(), createOfflineTableConfig, createSchema, i2, this._segmentDir, this._tarDir);
                uploadSegments(getTableName(), this._tarDir);
            }
        }
        waitForAllDocsLoaded(60000L);
        LOGGER.warn("Finished setting up integration test class: {}", getClass().getSimpleName());
    }

    @AfterClass
    public void tearDown() throws IOException {
        LOGGER.warn("Tearing down integration test class: {}", getClass().getSimpleName());
        if (isRealtimeTable()) {
            dropRealtimeTable(getTableName());
        } else {
            dropOfflineTable(getTableName());
        }
        FileUtils.deleteDirectory(this._tempDir);
        LOGGER.warn("Finished tearing down integration test class: {}", getClass().getSimpleName());
    }

    protected void pushAvroIntoKafka(List<File> list) throws Exception {
        ClusterIntegrationTestUtils.pushAvroIntoKafka(list, "localhost:" + ((StreamDataServerStartable) _sharedClusterTestSuite._kafkaStarters.get(0)).getPort(), getKafkaTopic(), getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn(), injectTombstones());
    }

    public String getZkUrl() {
        return _sharedClusterTestSuite != this ? _sharedClusterTestSuite.getZkUrl() : super.getZkUrl();
    }

    protected String getBrokerBaseApiUrl() {
        return _sharedClusterTestSuite != this ? _sharedClusterTestSuite.getBrokerBaseApiUrl() : super.getBrokerBaseApiUrl();
    }

    protected String getBrokerGrpcEndpoint() {
        return _sharedClusterTestSuite != this ? _sharedClusterTestSuite.getBrokerGrpcEndpoint() : super.getBrokerGrpcEndpoint();
    }

    public int getControllerPort() {
        return _sharedClusterTestSuite != this ? _sharedClusterTestSuite.getControllerPort() : super.getControllerPort();
    }

    public int getRandomBrokerPort() {
        return _sharedClusterTestSuite != this ? _sharedClusterTestSuite.getRandomBrokerPort() : super.getRandomBrokerPort();
    }

    public String getHelixClusterName() {
        return "CustomDataQueryClusterIntegrationTest";
    }

    public TableConfig createOfflineTableConfig() {
        return new TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName()).build();
    }

    @Nullable
    protected String getSortedColumn() {
        return TIMESTAMP_FIELD_NAME;
    }

    @Nullable
    protected List<String> getInvertedIndexColumns() {
        return List.of();
    }

    @Nullable
    protected List<String> getNoDictionaryColumns() {
        return List.of();
    }

    @Nullable
    protected List<String> getRangeIndexColumns() {
        return List.of();
    }

    @Nullable
    protected List<String> getBloomFilterColumns() {
        return List.of();
    }

    protected Map<String, String> getStreamConfigMap() {
        HashMap hashMap = new HashMap();
        hashMap.put("streamType", "kafka");
        hashMap.put(KafkaStreamConfigProperties.constructStreamProperty("kafka.broker.list"), "localhost:" + ((StreamDataServerStartable) _sharedClusterTestSuite._kafkaStarters.get(0)).getPort());
        if (useKafkaTransaction()) {
            hashMap.put(KafkaStreamConfigProperties.constructStreamProperty("kafka.isolation.level"), "read_committed");
        }
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "consumer.factory.class.name"), getStreamConsumerFactoryClassName());
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "topic.name"), getKafkaTopic());
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "decoder.class.name"), ClusterTest.AvroFileSchemaKafkaAvroMessageDecoder.class.getName());
        hashMap.put("realtime.segment.flush.threshold.rows", Integer.toString(getRealtimeSegmentFlushSize()));
        hashMap.put(StreamConfigProperties.constructStreamProperty("kafka", "consumer.prop.auto.offset.reset"), "smallest");
        return hashMap;
    }

    public String getTimeColumnName() {
        return TIMESTAMP_FIELD_NAME;
    }

    public String getKafkaTopic() {
        return getTableName() + "-kafka";
    }

    public abstract String getTableName();

    public abstract Schema createSchema();

    public abstract List<File> createAvroFiles() throws Exception;

    public boolean isRealtimeTable() {
        return false;
    }
}
