package org.apache.pinot.plugin.stream.pulsar;

import java.util.Set;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamDecoderProvider;
import org.apache.pinot.spi.stream.StreamLevelConsumer;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
import org.apache.pulsar.client.api.Reader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.class */
public class PulsarStreamLevelConsumer implements StreamLevelConsumer {
    private Logger _logger;
    private StreamMessageDecoder _messageDecoder;
    private StreamConfig _streamConfig;
    private PulsarConfig _pulsarStreamLevelStreamConfig;
    private Reader<byte[]> _reader;
    private long _lastLogTime = 0;
    private long _lastCount = 0;
    private long _currentCount = 0;

    public PulsarStreamLevelConsumer(String str, String str2, StreamConfig streamConfig, Set<String> set, String str3) {
        this._streamConfig = streamConfig;
        this._pulsarStreamLevelStreamConfig = new PulsarConfig(streamConfig, str3);
        this._messageDecoder = StreamDecoderProvider.create(streamConfig, set);
        this._logger = LoggerFactory.getLogger(PulsarConfig.class.getName() + "_" + str2 + "_" + streamConfig.getTopicName());
        this._logger.info("PulsarStreamLevelConsumer: streamConfig : {}", this._streamConfig);
    }

    @Override // org.apache.pinot.spi.stream.StreamLevelConsumer
    public void start() throws Exception {
        this._reader = PulsarStreamLevelConsumerManager.acquirePulsarConsumerForConfig(this._pulsarStreamLevelStreamConfig);
    }

    @Override // org.apache.pinot.spi.stream.StreamLevelConsumer
    public GenericRow next(GenericRow genericRow) {
        try {
            if (!this._reader.hasMessageAvailable()) {
                return null;
            }
            GenericRow decode = this._messageDecoder.decode(this._reader.readNext().getData(), genericRow);
            this._currentCount++;
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis - this._lastLogTime > 60000 || this._currentCount - this._lastCount >= 100000) {
                if (this._lastCount == 0) {
                    this._logger.info("Consumed {} events from kafka stream {}", Long.valueOf(this._currentCount), this._streamConfig.getTopicName());
                } else {
                    this._logger.info("Consumed {} events from kafka stream {} (rate:{}/s)", Long.valueOf(this._currentCount - this._lastCount), this._streamConfig.getTopicName(), Float.valueOf((((float) (this._currentCount - this._lastCount)) * 1000.0f) / ((float) (currentTimeMillis - this._lastLogTime))));
                }
                this._lastCount = this._currentCount;
                this._lastLogTime = currentTimeMillis;
            }
            return decode;
        } catch (Exception e) {
            this._logger.warn("Caught exception while consuming events", (Throwable) e);
            return null;
        }
    }

    @Override // org.apache.pinot.spi.stream.StreamLevelConsumer
    public void commit() {
    }

    @Override // org.apache.pinot.spi.stream.StreamLevelConsumer
    public void shutdown() throws Exception {
        if (this._reader != null) {
            PulsarStreamLevelConsumerManager.releasePulsarConsumer(this._reader);
            this._reader = null;
        }
    }
}
