package org.apache.pinot.plugin.stream.kafka20.server;

import com.google.common.base.Function;
import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import javax.annotation.Nullable;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.utils.Time;
import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
import org.apache.pinot.spi.config.table.RoutingConfig;
import org.apache.pinot.spi.stream.StreamDataServerStartable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;

/* loaded from: input_file:org/apache/pinot/plugin/stream/kafka20/server/KafkaDataServerStartable.class */
public class KafkaDataServerStartable implements StreamDataServerStartable {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) KafkaDataServerStartable.class);
    private static final String ZOOKEEPER_CONNECT = "zookeeper.connect";
    private static final String LOG_DIRS = "log.dirs";
    private static final String PORT = "port";
    private KafkaServer _serverStartable;
    private int _port;
    private String _zkStr;
    private String _logDirPath;
    private AdminClient _adminClient;

    @Override // org.apache.pinot.spi.stream.StreamDataServerStartable
    public void init(Properties properties) {
        this._port = ((Integer) properties.get("port")).intValue();
        this._zkStr = properties.getProperty("zookeeper.connect");
        this._logDirPath = properties.getProperty(LOG_DIRS);
        int indexOf = this._zkStr.indexOf(47);
        if (indexOf != -1) {
            String substring = this._zkStr.substring(0, indexOf);
            String substring2 = this._zkStr.substring(indexOf);
            ZkClient zkClient = new ZkClient(substring);
            zkClient.createPersistent(substring2, true);
            zkClient.close();
        }
        new File(this._logDirPath).mkdirs();
        properties.put(KafkaStreamConfigProperties.HighLevelConsumer.ZK_SESSION_TIMEOUT_MS, "60000");
        this._serverStartable = new KafkaServer(new KafkaConfig(properties), Time.SYSTEM, Option.empty(), false);
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:" + this._port);
        hashMap.put("client.id", "Kafka2AdminClient-" + UUID.randomUUID().toString());
        hashMap.put("request.timeout.ms", 15000);
        this._adminClient = KafkaAdminClient.create((Map<String, Object>) hashMap);
    }

    @Override // org.apache.pinot.spi.stream.StreamDataServerStartable
    public void start() {
        this._serverStartable.startup();
    }

    @Override // org.apache.pinot.spi.stream.StreamDataServerStartable
    public void stop() {
        this._serverStartable.shutdown();
        FileUtils.deleteQuietly(new File(this._serverStartable.config().logDirs().mo18770apply(0)));
    }

    @Override // org.apache.pinot.spi.stream.StreamDataServerStartable
    public void createTopic(final String str, Properties properties) {
        this._adminClient.createTopics(Arrays.asList(new NewTopic(str, ((Integer) properties.get(RoutingConfig.PARTITION_SEGMENT_PRUNER_TYPE)).intValue(), (short) 1)));
        waitForCondition(new Function<Void, Boolean>() { // from class: org.apache.pinot.plugin.stream.kafka20.server.KafkaDataServerStartable.1
            @Override // com.google.common.base.Function
            @Nullable
            public Boolean apply(@Nullable Void r5) {
                try {
                    return Boolean.valueOf(KafkaDataServerStartable.this._adminClient.listTopics().names().get().contains(str));
                } catch (Exception e) {
                    KafkaDataServerStartable.LOGGER.warn("Could not fetch Kafka topics", (Throwable) e);
                    return null;
                }
            }
        }, 1000L, 30000L, "Kafka topic " + str + " is not created yet");
    }

    @Override // org.apache.pinot.spi.stream.StreamDataServerStartable
    public int getPort() {
        return this._port;
    }

    private static void waitForCondition(Function<Void, Boolean> function, long j, long j2, @Nullable String str) {
        long currentTimeMillis = System.currentTimeMillis() + j2;
        String str2 = str != null ? ", error message: " + str : "";
        while (System.currentTimeMillis() < currentTimeMillis) {
            try {
            } catch (Exception e) {
                LOGGER.error("Caught exception while checking the condition" + str2, (Throwable) e);
            }
            if (Boolean.TRUE.equals(function.apply(null))) {
                return;
            } else {
                Thread.sleep(j);
            }
        }
        Logger logger = LOGGER;
        logger.error("Failed to meet condition in " + j2 + "ms" + logger);
    }
}
