package org.apache.pinot.plugin.stream.pulsar.server;

import com.google.common.base.Preconditions;
import java.util.Base64;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.spi.stream.StreamDataProducer;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/plugin/stream/pulsar/server/PulsarDataProducer.class */
public class PulsarDataProducer implements StreamDataProducer {
    private static final Logger LOGGER = LoggerFactory.getLogger(PulsarDataProducer.class);
    public static final String BROKER_SERVICE_URL = "brokerServiceUrl";
    public static final String TOKEN = "token";
    private PulsarClient _pulsarClient;

    public void init(Properties properties) {
        String property = properties.getProperty(BROKER_SERVICE_URL);
        Preconditions.checkNotNull(property, "broker service url must be configured.");
        ClientBuilder serviceUrl = PulsarClient.builder().serviceUrl(property);
        String property2 = properties.getProperty(TOKEN);
        if (StringUtils.isNotEmpty(property2)) {
            serviceUrl.authentication(AuthenticationFactory.token(property2));
        }
        try {
            this._pulsarClient = serviceUrl.build();
        } catch (PulsarClientException e) {
            throw new IllegalArgumentException("Failed to create pulsar client", e);
        }
    }

    public void produce(String str, byte[] bArr) {
        try {
            Producer create = this._pulsarClient.newProducer().topic(str).create();
            try {
                create.send(bArr);
                if (create != null) {
                    create.close();
                }
            } finally {
            }
        } catch (PulsarClientException e) {
            LOGGER.error("Failed to produce message for topic: {}", str, e);
        }
    }

    public void produce(String str, byte[] bArr, byte[] bArr2) {
        try {
            Producer create = this._pulsarClient.newProducer().topic(str).create();
            try {
                create.newMessage().key(Base64.getEncoder().encodeToString(bArr)).value(bArr2).send();
                if (create != null) {
                    create.close();
                }
            } finally {
            }
        } catch (PulsarClientException e) {
            LOGGER.error("Failed to produce message for topic: {}", str, e);
        }
    }

    public void close() {
        try {
            this._pulsarClient.close();
        } catch (PulsarClientException e) {
            throw new RuntimeException("Failed to close pulsar client.", e);
        }
    }
}
