package org.apache.pinot.plugin.stream.pulsar;

import com.google.common.util.concurrent.Uninterruptibles;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumerManager.class */
public class PulsarStreamLevelConsumerManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(PulsarStreamLevelConsumerManager.class);
    private static final Long IN_USE = -1L;
    private static final long CONSUMER_SHUTDOWN_DELAY_MILLIS = TimeUnit.SECONDS.toMillis(60);
    private static final Map<ImmutableTriple<String, String, String>, Reader<byte[]>> CONSUMER_FOR_CONFIG_KEY = new HashMap();
    private static final IdentityHashMap<Reader<byte[]>, Long> CONSUMER_RELEASE_TIME = new IdentityHashMap<>();
    protected static PulsarClient _pulsarClient;
    protected static Reader<byte[]> _reader;

    private PulsarStreamLevelConsumerManager() {
    }

    public static Reader<byte[]> acquirePulsarConsumerForConfig(PulsarConfig pulsarConfig) {
        ImmutableTriple<String, String, String> immutableTriple = new ImmutableTriple<>(pulsarConfig.getPulsarTopicName(), pulsarConfig.getSubscriberId(), pulsarConfig.getBootstrapServers());
        synchronized (PulsarStreamLevelConsumerManager.class) {
            if (CONSUMER_FOR_CONFIG_KEY.containsKey(immutableTriple)) {
                Reader<byte[]> reader = CONSUMER_FOR_CONFIG_KEY.get(immutableTriple);
                if (CONSUMER_RELEASE_TIME.get(reader).equals(IN_USE)) {
                    throw new RuntimeException("Consumer " + reader + " already in use!");
                }
                LOGGER.info("Reusing pulsar consumer with id {}", reader);
                CONSUMER_RELEASE_TIME.put(reader, IN_USE);
                return reader;
            }
            LOGGER.info("Creating new pulsar consumer and iterator for topic {}", pulsarConfig.getPulsarTopicName());
            try {
                ClientBuilder serviceUrl = PulsarClient.builder().serviceUrl(pulsarConfig.getBootstrapServers());
                if (pulsarConfig.getTlsTrustCertsFilePath() != null) {
                    serviceUrl.tlsTrustCertsFilePath(pulsarConfig.getTlsTrustCertsFilePath());
                }
                if (pulsarConfig.getAuthenticationToken() != null) {
                    serviceUrl.authentication(AuthenticationFactory.token(pulsarConfig.getAuthenticationToken()));
                }
                _pulsarClient = serviceUrl.build();
                _reader = _pulsarClient.newReader().topic(pulsarConfig.getPulsarTopicName()).startMessageId(pulsarConfig.getInitialMessageId()).create();
                CONSUMER_FOR_CONFIG_KEY.put(immutableTriple, _reader);
                CONSUMER_RELEASE_TIME.put(_reader, IN_USE);
                LOGGER.info("Created consumer with id {} for topic {}", _reader, pulsarConfig.getPulsarTopicName());
                return _reader;
            } catch (PulsarClientException e) {
                LOGGER.error("Could not create pulsar consumer", e);
                return null;
            }
        }
    }

    /* JADX WARN: Type inference failed for: r0v8, types: [org.apache.pinot.plugin.stream.pulsar.PulsarStreamLevelConsumerManager$1] */
    public static void releasePulsarConsumer(final Reader<byte[]> reader) {
        synchronized (PulsarStreamLevelConsumerManager.class) {
            final long currentTimeMillis = System.currentTimeMillis() + CONSUMER_SHUTDOWN_DELAY_MILLIS;
            CONSUMER_RELEASE_TIME.put(reader, Long.valueOf(currentTimeMillis));
            LOGGER.info("Marking consumer with id {} for release at {}", reader, Long.valueOf(currentTimeMillis));
            new Thread() { // from class: org.apache.pinot.plugin.stream.pulsar.PulsarStreamLevelConsumerManager.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        Uninterruptibles.sleepUninterruptibly(PulsarStreamLevelConsumerManager.CONSUMER_SHUTDOWN_DELAY_MILLIS, TimeUnit.MILLISECONDS);
                        synchronized (PulsarStreamLevelConsumerManager.class) {
                            PulsarStreamLevelConsumerManager.LOGGER.info("Executing release check for consumer {} at {}, scheduled at {}", new Object[]{reader, Long.valueOf(System.currentTimeMillis()), Long.valueOf(currentTimeMillis)});
                            Iterator<Map.Entry<ImmutableTriple<String, String, String>, Reader<byte[]>>> it = PulsarStreamLevelConsumerManager.CONSUMER_FOR_CONFIG_KEY.entrySet().iterator();
                            while (it.hasNext()) {
                                Reader<byte[]> value = it.next().getValue();
                                Long l = PulsarStreamLevelConsumerManager.CONSUMER_RELEASE_TIME.get(value);
                                if (l.equals(PulsarStreamLevelConsumerManager.IN_USE) || l.longValue() >= System.currentTimeMillis()) {
                                    PulsarStreamLevelConsumerManager.LOGGER.info("Not releasing consumer {}, it has been reacquired", value);
                                } else {
                                    PulsarStreamLevelConsumerManager.LOGGER.info("Releasing consumer {}", value);
                                    try {
                                        value.close();
                                    } catch (Exception e) {
                                        PulsarStreamLevelConsumerManager.LOGGER.warn("Caught exception while shutting down Pulsar consumer with id {}", value, e);
                                    }
                                    it.remove();
                                    PulsarStreamLevelConsumerManager.CONSUMER_RELEASE_TIME.remove(value);
                                }
                            }
                        }
                    } catch (Exception e2) {
                        PulsarStreamLevelConsumerManager.LOGGER.warn("Caught exception in release of consumer {}", reader, e2);
                    }
                }
            }.start();
        }
    }

    public static void closeAllConsumers() {
        try {
            synchronized (PulsarStreamLevelConsumerManager.class) {
                LOGGER.info("Trying to shutdown all the pulsar consumers");
                Iterator<Reader<byte[]>> it = CONSUMER_FOR_CONFIG_KEY.values().iterator();
                while (it.hasNext()) {
                    Reader<byte[]> next = it.next();
                    LOGGER.info("Trying to shutdown consumer {}", next);
                    try {
                        next.close();
                    } catch (Exception e) {
                        LOGGER.warn("Caught exception while shutting down Pulsar consumer with id {}", next, e);
                    }
                    it.remove();
                }
                CONSUMER_FOR_CONFIG_KEY.clear();
                CONSUMER_RELEASE_TIME.clear();
            }
        } catch (Exception e2) {
            LOGGER.warn("Caught exception during shutting down all pulsar consumers", e2);
        }
    }
}
