package org.apache.pinot.integration.tests;

import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.helix.HelixAdmin;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.plugin.stream.kafka.KafkaMessageBatch;
import org.apache.pinot.plugin.stream.kafka30.KafkaConsumerFactory;
import org.apache.pinot.plugin.stream.kafka30.KafkaPartitionLevelConsumer;
import org.apache.pinot.spi.stream.PartitionGroupConsumer;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.Assert;

/* loaded from: input_file:org/apache/pinot/integration/tests/LLCRealtimeKafka3ClusterIntegrationTest.class */
public class LLCRealtimeKafka3ClusterIntegrationTest extends LLCRealtimeClusterIntegrationTest {

    /* loaded from: input_file:org/apache/pinot/integration/tests/LLCRealtimeKafka3ClusterIntegrationTest$ExceptingKafka3ConsumerFactory.class */
    public static class ExceptingKafka3ConsumerFactory extends KafkaConsumerFactory {
        public static final int PARTITION_FOR_EXCEPTIONS = 1;
        public static final int SEQ_NUM_FOR_CREATE_EXCEPTION = 1;
        public static final int SEQ_NUM_FOR_CONSUME_EXCEPTION = 3;
        private static HelixAdmin _helixAdmin;
        private static String _helixClusterName;
        private static String _tableName;

        /* loaded from: input_file:org/apache/pinot/integration/tests/LLCRealtimeKafka3ClusterIntegrationTest$ExceptingKafka3ConsumerFactory$ExceptingKafka3Consumer.class */
        public static class ExceptingKafka3Consumer extends KafkaPartitionLevelConsumer {
            private final boolean _exceptionDuringConsume;

            public ExceptingKafka3Consumer(String str, StreamConfig streamConfig, int i, boolean z) {
                super(str, streamConfig, i);
                this._exceptionDuringConsume = z;
            }

            /* renamed from: fetchMessages, reason: merged with bridge method [inline-methods] */
            public KafkaMessageBatch m14fetchMessages(StreamPartitionMsgOffset streamPartitionMsgOffset, int i) {
                if (this._exceptionDuringConsume) {
                    throw new RuntimeException("TestException during consumption");
                }
                return super.fetchMessages(streamPartitionMsgOffset, i);
            }
        }

        public static void init(String str, HelixAdmin helixAdmin, String str2) {
            _helixAdmin = helixAdmin;
            _helixClusterName = str;
            _tableName = str2;
        }

        public PartitionGroupConsumer createPartitionGroupConsumer(String str, PartitionGroupConsumptionStatus partitionGroupConsumptionStatus) {
            int partitionGroupId = partitionGroupConsumptionStatus.getPartitionGroupId();
            boolean z = false;
            int segmentSeqNum = getSegmentSeqNum(partitionGroupId);
            if (partitionGroupId == 1) {
                if (segmentSeqNum == 1) {
                    throw new RuntimeException("TestException during consumer creation");
                }
                if (segmentSeqNum == 3) {
                    z = true;
                }
            }
            return new ExceptingKafka3Consumer(str, this._streamConfig, partitionGroupId, z);
        }

        private int getSegmentSeqNum(int i) {
            IdealState resourceIdealState = _helixAdmin.getResourceIdealState(_helixClusterName, TableNameBuilder.REALTIME.tableNameWithType(_tableName));
            AtomicInteger atomicInteger = new AtomicInteger(-1);
            resourceIdealState.getPartitionSet().forEach(str -> {
                if (LLCSegmentName.isLLCSegment(str) && resourceIdealState.getInstanceStateMap(str).values().contains("CONSUMING")) {
                    LLCSegmentName lLCSegmentName = new LLCSegmentName(str);
                    if (lLCSegmentName.getPartitionGroupId() == i) {
                        atomicInteger.set(lLCSegmentName.getSequenceNumber());
                    }
                }
            });
            Assert.assertTrue(atomicInteger.get() >= 0, "No consuming segment found in partition: " + i);
            return atomicInteger.get();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pinot.integration.tests.LLCRealtimeClusterIntegrationTest
    public Map<String, String> getStreamConfigMap() {
        Map<String, String> streamConfigMap = super.getStreamConfigMap();
        streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamConfigMap.get("streamType"), "consumer.factory.class.name"), ExceptingKafka3ConsumerFactory.class.getName());
        ExceptingKafka3ConsumerFactory.init(getHelixClusterName(), this._helixAdmin, getTableName());
        return streamConfigMap;
    }
}
