package org.apache.pinot.integration.tests;

import com.google.common.primitives.Longs;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import java.io.File;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.integration.tests.kafka.schemaregistry.SchemaRegistryStarter;
import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
import org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder;
import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;

/* loaded from: input_file:org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.class */
public class KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest extends RealtimeClusterIntegrationTest {
    private static final String CONSUMER_DIRECTORY = "/tmp/consumer-test";
    private static final String TEST_UPDATED_INVERTED_INDEX_QUERY = "SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime = 305";
    private static final List<String> UPDATED_INVERTED_INDEX_COLUMNS = Collections.singletonList("DivActualElapsedTime");
    private static final long RANDOM_SEED = System.currentTimeMillis();
    private static final Random RANDOM = new Random(RANDOM_SEED);
    private final boolean _isDirectAlloc = RANDOM.nextBoolean();
    private final boolean _isConsumerDirConfigured = RANDOM.nextBoolean();
    private final boolean _enableSplitCommit = RANDOM.nextBoolean();
    private final boolean _enableLeadControllerResource = RANDOM.nextBoolean();
    private final long _startTime = System.currentTimeMillis();
    private SchemaRegistryStarter.KafkaSchemaRegistryInstance _schemaRegistry;

    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTest
    protected int getNumKafkaBrokers() {
        return 1;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTest
    public void startKafka() {
        super.startKafka();
        startSchemaRegistry();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTest
    public void stopKafka() {
        stopSchemaRegistry();
        super.stopKafka();
    }

    private void startSchemaRegistry() {
        if (this._schemaRegistry == null) {
            this._schemaRegistry = SchemaRegistryStarter.startLocalInstance(SchemaRegistryStarter.DEFAULT_PORT);
        }
    }

    private void stopSchemaRegistry() {
        try {
            if (this._schemaRegistry != null) {
                this._schemaRegistry.stop();
                this._schemaRegistry = null;
            }
        } catch (Exception e) {
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTest
    public void pushAvroIntoKafka(List<File> list) throws Exception {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:" + getKafkaPort());
        properties.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this._schemaRegistry.getUrl());
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        Properties properties2 = new Properties();
        properties2.put("bootstrap.servers", "localhost:" + getKafkaPort());
        properties2.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties2.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer kafkaProducer2 = new KafkaProducer(properties2);
        if (injectTombstones()) {
            for (int i = 0; i < 1000; i++) {
                kafkaProducer2.send(new ProducerRecord(getKafkaTopic(), Longs.toByteArray(System.currentTimeMillis()), null));
            }
        }
        Iterator<File> it2 = list.iterator();
        while (it2.hasNext()) {
            DataFileStream<GenericRecord> avroReader = AvroUtils.getAvroReader(it2.next());
            try {
                Iterator<GenericRecord> it3 = avroReader.iterator();
                while (it3.hasNext()) {
                    GenericRecord next = it3.next();
                    byte[] byteArray = getPartitionColumn() == null ? Longs.toByteArray(System.currentTimeMillis()) : next.get(getPartitionColumn()).toString().getBytes();
                    kafkaProducer2.send(new ProducerRecord(getKafkaTopic(), byteArray, "Rubbish".getBytes(StandardCharsets.UTF_8)));
                    kafkaProducer.send(new ProducerRecord(getKafkaTopic(), byteArray, next));
                }
                if (avroReader != null) {
                    avroReader.close();
                }
            } catch (Throwable th) {
                if (avroReader != null) {
                    try {
                        avroReader.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTest
    public Map<String, String> getStreamConfigs() {
        Map<String, String> streamConfigs = super.getStreamConfigs();
        streamConfigs.put(StreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.STREAM_TYPE, StreamConfigProperties.STREAM_DECODER_CLASS), KafkaConfluentSchemaRegistryAvroMessageDecoder.class.getName());
        streamConfigs.put("stream.kafka.decoder.prop.schema.registry.rest.url", this._schemaRegistry.getUrl());
        return streamConfigs;
    }

    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTest
    protected boolean injectTombstones() {
        return true;
    }

    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTest
    protected boolean useLlc() {
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTest
    public String getLoadMode() {
        return ReadMode.mmap.name();
    }

    @Override // org.apache.pinot.controller.helix.ControllerTest
    public void startController() throws Exception {
        Map<String, Object> defaultControllerConfiguration = getDefaultControllerConfiguration();
        defaultControllerConfiguration.put(ControllerConf.ALLOW_HLC_TABLES, false);
        defaultControllerConfiguration.put(ControllerConf.ENABLE_SPLIT_COMMIT, Boolean.valueOf(this._enableSplitCommit));
        startController(defaultControllerConfiguration);
        enableResourceConfigForLeadControllerResource(this._enableLeadControllerResource);
    }

    @Override // org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest, org.apache.pinot.integration.tests.ClusterTest
    protected void overrideServerConf(PinotConfiguration pinotConfiguration) {
        pinotConfiguration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION, true);
        pinotConfiguration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_DIRECT_ALLOCATION, Boolean.valueOf(this._isDirectAlloc));
        if (this._isConsumerDirConfigured) {
            pinotConfiguration.setProperty(CommonConstants.Server.CONFIG_OF_CONSUMER_DIR, CONSUMER_DIRECTORY);
        }
        if (this._enableSplitCommit) {
            pinotConfiguration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_SPLIT_COMMIT, true);
            pinotConfiguration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_COMMIT_END_WITH_METADATA, true);
        }
    }

    @Override // org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest
    protected void createSegmentsAndUpload(List<File> list, Schema schema, TableConfig tableConfig) throws Exception {
        if (!this._tarDir.exists()) {
            this._tarDir.mkdir();
        }
        if (!this._segmentDir.exists()) {
            this._segmentDir.mkdir();
        }
        ArrayList arrayList = new ArrayList(list);
        ClusterIntegrationTestUtils.buildSegmentsFromAvro(arrayList, tableConfig, schema, 0, this._segmentDir, this._tarDir);
        uploadSegmentsToController(getTableName(), this._tarDir, false, false);
        uploadSegmentsToController(getTableName(), this._tarDir, true, false);
        uploadSegmentsToController(getTableName(), this._tarDir, true, true);
        list.addAll(arrayList);
    }

    private void uploadSegmentsToController(String str, File file, boolean z, boolean z2) throws Exception {
        File[] listFiles = file.listFiles();
        Assert.assertNotNull(listFiles);
        int length = listFiles.length;
        Assert.assertTrue(length > 0);
        if (z) {
            length = 1;
        }
        URI uploadSegmentHttpURI = FileUploadDownloadClient.getUploadSegmentHttpURI("localhost", this._controllerPort);
        FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient();
        try {
            if (length == 1) {
                File file2 = listFiles[0];
                if (z2) {
                    changeCrcInSegmentZKMetadata(str, file2.toString());
                }
                Assert.assertEquals(fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, file2.getName(), file2, str, TableType.REALTIME).getStatusCode(), 200);
            } else {
                ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(length);
                ArrayList arrayList = new ArrayList(length);
                for (File file3 : listFiles) {
                    arrayList.add(newFixedThreadPool.submit(() -> {
                        return Integer.valueOf(fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, file3.getName(), file3, str, TableType.REALTIME).getStatusCode());
                    }));
                }
                newFixedThreadPool.shutdown();
                Iterator it2 = arrayList.iterator();
                while (it2.hasNext()) {
                    Assert.assertEquals(((Integer) ((Future) it2.next()).get()).intValue(), 200);
                }
            }
            fileUploadDownloadClient.close();
        } catch (Throwable th) {
            try {
                fileUploadDownloadClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void changeCrcInSegmentZKMetadata(String str, String str2) {
        String substring = str2.substring(str2.indexOf("mytable_"), str2.indexOf(".tar.gz"));
        String tableNameWithType = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(str);
        SegmentZKMetadata segmentZKMetadata = this._helixResourceManager.getSegmentZKMetadata(tableNameWithType, substring);
        segmentZKMetadata.setCrc(111L);
        this._helixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTest
    public long getCountStarResult() {
        return super.getCountStarResult() * 2;
    }

    @Override // org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest
    @BeforeClass
    public void setUp() throws Exception {
        System.out.println(String.format("Using random seed: %s, isDirectAlloc: %s, isConsumerDirConfigured: %s, enableSplitCommit: %s, enableLeadControllerResource: %s", Long.valueOf(RANDOM_SEED), Boolean.valueOf(this._isDirectAlloc), Boolean.valueOf(this._isConsumerDirConfigured), Boolean.valueOf(this._enableSplitCommit), Boolean.valueOf(this._enableLeadControllerResource)));
        FileUtils.deleteQuietly(new File(CONSUMER_DIRECTORY));
        super.setUp();
    }

    @Override // org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest
    @AfterClass
    public void tearDown() throws Exception {
        FileUtils.deleteDirectory(new File(CONSUMER_DIRECTORY));
        super.tearDown();
    }
}
