package org.apache.pinot.plugin.stream.kafka20;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.time.Clock;
import java.time.Duration;
import java.util.Collections;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.stream.TransientConsumerException;
import org.apache.pinot.spi.utils.TimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.class */
public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHandler implements StreamMetadataProvider {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) KafkaStreamMetadataProvider.class);

    public KafkaStreamMetadataProvider(String str, StreamConfig streamConfig) {
        this(str, streamConfig, Integer.MIN_VALUE);
    }

    public KafkaStreamMetadataProvider(String str, StreamConfig streamConfig, int i) {
        super(str, streamConfig, i);
    }

    @Override // org.apache.pinot.spi.stream.StreamMetadataProvider
    public int fetchPartitionCount(long j) {
        try {
            return this._consumer.partitionsFor(this._topic, Duration.ofMillis(j)).size();
        } catch (TimeoutException e) {
            throw new TransientConsumerException(e);
        }
    }

    @Override // org.apache.pinot.spi.stream.StreamMetadataProvider
    public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long j) {
        long offset;
        Preconditions.checkNotNull(offsetCriteria);
        try {
            if (offsetCriteria.isLargest()) {
                offset = this._consumer.endOffsets(Collections.singletonList(this._topicPartition), Duration.ofMillis(j)).get(this._topicPartition).longValue();
            } else if (offsetCriteria.isSmallest()) {
                offset = this._consumer.beginningOffsets(Collections.singletonList(this._topicPartition), Duration.ofMillis(j)).get(this._topicPartition).longValue();
            } else if (offsetCriteria.isPeriod()) {
                OffsetAndTimestamp offsetAndTimestamp = this._consumer.offsetsForTimes(Collections.singletonMap(this._topicPartition, Long.valueOf(Clock.systemUTC().millis() - TimeUtils.convertPeriodToMillis(offsetCriteria.getOffsetString()).longValue()))).get(this._topicPartition);
                if (offsetAndTimestamp == null) {
                    offset = this._consumer.endOffsets(Collections.singletonList(this._topicPartition), Duration.ofMillis(j)).get(this._topicPartition).longValue();
                    Logger logger = LOGGER;
                    String str = this._topicPartition.topic();
                    this._topicPartition.partition();
                    logger.warn("initial offset type is period and its value evaluates to null hence proceeding with offset " + offset + "for topic " + logger + " partition " + str);
                } else {
                    offset = offsetAndTimestamp.offset();
                }
            } else {
                if (!offsetCriteria.isTimestamp()) {
                    throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria);
                }
                OffsetAndTimestamp offsetAndTimestamp2 = this._consumer.offsetsForTimes(Collections.singletonMap(this._topicPartition, TimeUtils.convertTimestampToMillis(offsetCriteria.getOffsetString()))).get(this._topicPartition);
                if (offsetAndTimestamp2 == null) {
                    offset = this._consumer.endOffsets(Collections.singletonList(this._topicPartition), Duration.ofMillis(j)).get(this._topicPartition).longValue();
                    Logger logger2 = LOGGER;
                    String str2 = this._topicPartition.topic();
                    this._topicPartition.partition();
                    logger2.warn("initial offset type is timestamp and its value evaluates to null hence proceeding with offset " + offset + "for topic " + logger2 + " partition " + str2);
                } else {
                    offset = offsetAndTimestamp2.offset();
                }
            }
            return new LongMsgOffset(offset);
        } catch (TimeoutException e) {
            throw new TransientConsumerException(e);
        }
    }

    @Override // org.apache.pinot.plugin.stream.kafka20.KafkaPartitionLevelConnectionHandler, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        super.close();
    }
}
