package org.apache.pinot.plugin.inputformat.protobuf;

import com.google.common.base.Preconditions;
import com.google.protobuf.Message;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordExtractor;
import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/plugin/inputformat/protobuf/KafkaConfluentSchemaRegistryProtoBufMessageDecoder.class */
public class KafkaConfluentSchemaRegistryProtoBufMessageDecoder implements StreamMessageDecoder<byte[]> {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConfluentSchemaRegistryProtoBufMessageDecoder.class);
    private static final String SCHEMA_REGISTRY_REST_URL = "schema.registry.rest.url";
    private static final String SCHEMA_REGISTRY_OPTS_PREFIX = "schema.registry.";
    public static final String CACHED_SCHEMA_MAP_CAPACITY = "cached.schema.map.capacity";
    public static final String DEFAULT_CACHED_SCHEMA_MAP_CAPACITY = "1000";
    private KafkaProtobufDeserializer<Message> _deserializer;
    private RecordExtractor<Message> _protoBufRecordExtractor;
    private String _topicName;

    private RestService createRestService(String str, Map<String, String> map) {
        RestService restService = new RestService(str);
        ConfigDef configDef = new ConfigDef();
        SslConfigs.addClientSslSupport(configDef);
        Map configKeys = configDef.configKeys();
        HashMap hashMap = new HashMap();
        for (String str2 : map.keySet()) {
            if (!str2.equals(SCHEMA_REGISTRY_REST_URL) && str2.startsWith(SCHEMA_REGISTRY_OPTS_PREFIX)) {
                String str3 = map.get(str2);
                String substring = str2.substring(SCHEMA_REGISTRY_OPTS_PREFIX.length());
                if (configKeys.containsKey(substring)) {
                    if (((ConfigDef.ConfigKey) configKeys.get(substring)).type == ConfigDef.Type.PASSWORD) {
                        hashMap.put(substring, new Password(str3));
                    } else {
                        hashMap.put(substring, str3);
                    }
                }
            }
        }
        if (!hashMap.isEmpty()) {
            DefaultSslEngineFactory defaultSslEngineFactory = new DefaultSslEngineFactory();
            defaultSslEngineFactory.configure(hashMap);
            restService.setSslSocketFactory(defaultSslEngineFactory.sslContext().getSocketFactory());
        }
        return restService;
    }

    public void init(Map<String, String> map, Set<String> set, String str) throws Exception {
        Preconditions.checkState(map.containsKey(SCHEMA_REGISTRY_REST_URL), "Missing required property '%s'", SCHEMA_REGISTRY_REST_URL);
        String str2 = map.get(SCHEMA_REGISTRY_REST_URL);
        ProtobufSchemaProvider protobufSchemaProvider = new ProtobufSchemaProvider();
        this._deserializer = new KafkaProtobufDeserializer<>(new CachedSchemaRegistryClient(createRestService(str2, map), Integer.parseInt(map.getOrDefault(CACHED_SCHEMA_MAP_CAPACITY, DEFAULT_CACHED_SCHEMA_MAP_CAPACITY)), Collections.singletonList(protobufSchemaProvider), map, (Map) null));
        Preconditions.checkNotNull(str, "Topic must be provided");
        this._topicName = str;
        this._protoBufRecordExtractor = (RecordExtractor) PluginManager.get().createInstance(ProtoBufRecordExtractor.class.getName());
        this._protoBufRecordExtractor.init(set, (RecordExtractorConfig) null);
    }

    public GenericRow decode(byte[] bArr, GenericRow genericRow) {
        try {
            return this._protoBufRecordExtractor.extract(this._deserializer.deserialize(this._topicName, bArr), genericRow);
        } catch (RuntimeException e) {
            ignoreOrRethrowException(e);
            return null;
        }
    }

    public GenericRow decode(byte[] bArr, int i, int i2, GenericRow genericRow) {
        return decode(Arrays.copyOfRange(bArr, i, i + i2), genericRow);
    }

    private void ignoreOrRethrowException(RuntimeException runtimeException) {
        if (!isUnknownMagicByte(runtimeException) && !isUnknownMagicByte(runtimeException.getCause())) {
            throw runtimeException;
        }
        LOGGER.error("Caught exception while decoding row in topic {}, discarding row", this._topicName, runtimeException);
    }

    private boolean isUnknownMagicByte(Throwable th) {
        return th != null && (th instanceof SerializationException) && th.getMessage() != null && th.getMessage().toLowerCase().contains("unknown magic byte");
    }
}
