package org.apache.pinot.plugin.stream.kafka30;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.utils.Bytes;
import org.apache.pinot.plugin.stream.kafka.KafkaMessageBatch;
import org.apache.pinot.plugin.stream.kafka.KafkaStreamMessageMetadata;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.stream.BytesStreamMessage;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.PartitionGroupConsumer;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamMessageMetadata;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumer.class */
public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHandler implements PartitionGroupConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) KafkaPartitionLevelConsumer.class);
    private long _lastFetchedOffset;

    public KafkaPartitionLevelConsumer(String str, StreamConfig streamConfig, int i) {
        super(str, streamConfig, i);
        this._lastFetchedOffset = -1L;
    }

    /* renamed from: fetchMessages, reason: merged with bridge method [inline-methods] */
    public synchronized KafkaMessageBatch m2785fetchMessages(StreamPartitionMsgOffset streamPartitionMsgOffset, int i) {
        long offset = ((LongMsgOffset) streamPartitionMsgOffset).getOffset();
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Polling partition: {}, startOffset: {}, timeout: {}ms", this._topicPartition, Long.valueOf(offset), Integer.valueOf(i));
        }
        if (this._lastFetchedOffset < 0 || this._lastFetchedOffset != offset - 1) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Seeking to offset: {}", Long.valueOf(offset));
            }
            this._consumer.seek(this._topicPartition, offset);
        }
        List<ConsumerRecord<String, Bytes>> records = this._consumer.poll(Duration.ofMillis(i)).records(this._topicPartition);
        ArrayList arrayList = new ArrayList(records.size());
        long j = -1;
        long j2 = offset;
        StreamMessageMetadata streamMessageMetadata = null;
        if (!records.isEmpty()) {
            j = records.get(0).offset();
            this._lastFetchedOffset = records.get(records.size() - 1).offset();
            j2 = this._lastFetchedOffset + 1;
            for (ConsumerRecord<String, Bytes> consumerRecord : records) {
                StreamMessageMetadata extractMessageMetadata = extractMessageMetadata(consumerRecord);
                Bytes value = consumerRecord.value();
                if (value != null) {
                    String key = consumerRecord.key();
                    arrayList.add(new BytesStreamMessage(key != null ? key.getBytes(StandardCharsets.UTF_8) : null, value.get(), extractMessageMetadata));
                } else if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Tombstone message at offset: {}", Long.valueOf(consumerRecord.offset()));
                }
                streamMessageMetadata = extractMessageMetadata;
            }
        }
        return new KafkaMessageBatch(arrayList, records.size(), j2, j, streamMessageMetadata, j > offset);
    }

    private StreamMessageMetadata extractMessageMetadata(ConsumerRecord<String, Bytes> consumerRecord) {
        long timestamp = consumerRecord.timestamp();
        long offset = consumerRecord.offset();
        StreamMessageMetadata.Builder serializedValueSize = new StreamMessageMetadata.Builder().setRecordIngestionTimeMs(timestamp).setOffset(new LongMsgOffset(offset), new LongMsgOffset(offset + 1)).setSerializedValueSize(consumerRecord.serializedValueSize());
        if (this._config.isPopulateMetadata()) {
            Headers headers = consumerRecord.headers();
            if (headers != null) {
                GenericRow genericRow = new GenericRow();
                for (Header header : headers.toArray()) {
                    genericRow.putValue(header.key(), header.value());
                }
                serializedValueSize.setHeaders(genericRow);
            }
            serializedValueSize.setMetadata(Map.of(KafkaStreamMessageMetadata.RECORD_TIMESTAMP_KEY, String.valueOf(timestamp), KafkaStreamMessageMetadata.METADATA_OFFSET_KEY, String.valueOf(offset), KafkaStreamMessageMetadata.METADATA_PARTITION_KEY, String.valueOf(consumerRecord.partition())));
        }
        return serializedValueSize.build();
    }
}
