package org.apache.pinot.plugin.stream.kafka30.utils;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.Collections;
import java.util.HashMap;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.Time;
import org.apache.pinot.plugin.stream.kafka.utils.EmbeddedZooKeeper;
import scala.Option;

/* loaded from: input_file:org/apache/pinot/plugin/stream/kafka30/utils/MiniKafkaCluster.class */
public final class MiniKafkaCluster implements Closeable {
    private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "MiniKafkaCluster-" + String.valueOf(UUID.randomUUID()));
    private final EmbeddedZooKeeper _zkServer = new EmbeddedZooKeeper(new File(TEMP_DIR, "zk"));
    private final KafkaServer _kafkaServer;
    private final String _kafkaServerAddress;
    private final AdminClient _adminClient;

    public MiniKafkaCluster(String str) throws IOException, InterruptedException {
        int availablePort = getAvailablePort();
        this._kafkaServer = new KafkaServer(new KafkaConfig(createBrokerConfig(str, availablePort)), Time.SYSTEM, Option.empty(), false);
        this._kafkaServerAddress = "localhost:" + availablePort;
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this._kafkaServerAddress);
        this._adminClient = AdminClient.create(properties);
    }

    private static int getAvailablePort() {
        try {
            ServerSocket serverSocket = new ServerSocket(0);
            try {
                int localPort = serverSocket.getLocalPort();
                serverSocket.close();
                return localPort;
            } finally {
            }
        } catch (IOException e) {
            throw new RuntimeException("Failed to find available port to use", e);
        }
    }

    private Properties createBrokerConfig(String str, int i) {
        Properties properties = new Properties();
        properties.put("broker.id", str);
        properties.put("host.name", "localhost");
        properties.put("port", Integer.toString(i));
        properties.put("listeners", "PLAINTEXT://localhost:" + i);
        properties.put("log.dir", new File(TEMP_DIR, "log").getPath());
        properties.put("zookeeper.connect", this._zkServer.getZkAddress());
        properties.put("zookeeper.session.timeout.ms", "30000");
        properties.put("controlled.shutdown.enable", "true");
        properties.put("delete.topic.enable", "true");
        properties.put("auto.create.topics.enable", "true");
        properties.put("offsets.topic.replication.factor", "1");
        properties.put("log.cleaner.dedupe.buffer.size", "2097152");
        return properties;
    }

    public void start() {
        this._kafkaServer.startup();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this._kafkaServer.shutdown();
        this._zkServer.close();
        FileUtils.deleteDirectory(TEMP_DIR);
    }

    public String getKafkaServerAddress() {
        return this._kafkaServerAddress;
    }

    public void createTopic(String str, int i, int i2) throws ExecutionException, InterruptedException {
        NewTopic newTopic = new NewTopic(str, i, (short) i2);
        int i3 = 5;
        while (i3 > 0) {
            try {
                this._adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
                return;
            } catch (ExecutionException e) {
                if (!(e.getCause() instanceof TimeoutException)) {
                    throw e;
                }
                i3--;
                TimeUnit.SECONDS.sleep(1L);
            }
        }
        throw new ExecutionException("Failed to create topic after retries", null);
    }

    public void deleteTopic(String str) throws ExecutionException, InterruptedException {
        this._adminClient.deleteTopics(Collections.singletonList(str)).all().get();
    }

    public void deleteRecordsBeforeOffset(String str, int i, long j) {
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition(str, i), RecordsToDelete.beforeOffset(j));
        this._adminClient.deleteRecords(hashMap);
    }
}
