package org.apache.pinot.tools.streams;

import java.util.Properties;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.pinot.spi.stream.StreamDataProvider;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.tools.streams.RsvpSourceGenerator;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/tools/streams/MeetupRsvpStream.class */
public class MeetupRsvpStream {
    protected static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) MeetupRsvpStream.class);
    private static final String DEFAULT_TOPIC_NAME = "meetupRSVPEvents";
    protected String _topicName;
    protected PinotRealtimeSource _pinotRealtimeSource;

    public MeetupRsvpStream() throws Exception {
        this(DEFAULT_TOPIC_NAME, RsvpSourceGenerator.KeyColumn.NONE);
    }

    public MeetupRsvpStream(boolean z) throws Exception {
        this(DEFAULT_TOPIC_NAME, z ? RsvpSourceGenerator.KeyColumn.EVENT_ID : RsvpSourceGenerator.KeyColumn.NONE);
    }

    public MeetupRsvpStream(String str) throws Exception {
        this(str, RsvpSourceGenerator.KeyColumn.NONE);
    }

    public MeetupRsvpStream(RsvpSourceGenerator.KeyColumn keyColumn) throws Exception {
        this(DEFAULT_TOPIC_NAME, keyColumn);
    }

    public MeetupRsvpStream(String str, RsvpSourceGenerator.KeyColumn keyColumn) throws Exception {
        this._topicName = str;
        Properties properties = new Properties();
        properties.put("metadata.broker.list", KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
        properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
        properties.put("request.required.acks", CommonConstants.Helix.DEFAULT_FLAPPING_TIME_WINDOW_MS);
        this._pinotRealtimeSource = PinotRealtimeSource.builder().setGenerator(new RsvpSourceGenerator(keyColumn)).setProducer(StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties)).setRateLimiter(i -> {
            try {
                Thread.sleep(((int) (Math.log(ThreadLocalRandom.current().nextDouble()) / Math.log(0.999d))) + 1);
            } catch (InterruptedException e) {
                LOGGER.warn("Interrupted from sleep but will continue", (Throwable) e);
            }
        }).setTopic(this._topicName).build();
    }

    public void run() throws Exception {
        this._pinotRealtimeSource.run();
    }

    public void stopPublishing() {
        try {
            this._pinotRealtimeSource.close();
        } catch (Exception e) {
            LOGGER.error("Failed to close real time source. ignored and continue", (Throwable) e);
        }
    }
}
