package org.apache.pulsar.client.impl;

import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import lombok.Generated;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.client.impl.schema.LocalDateTimeSchema;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.shade.io.netty.util.ReferenceCountUtil;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/client/impl/GeoReplicationProducerImpl.class */
public class GeoReplicationProducerImpl extends ProducerImpl {

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) GeoReplicationProducerImpl.class);
    public static final String MSG_PROP_REPL_SOURCE_POSITION = "__MSG_PROP_REPL_SOURCE_POSITION";
    public static final String MSG_PROP_IS_REPL_MARKER = "__MSG_PROP_IS_REPL_MARKER";
    private long lastPersistedSourceLedgerId;
    private long lastPersistedSourceEntryId;
    private final boolean isPersistentTopic;

    public GeoReplicationProducerImpl(PulsarClientImpl pulsarClientImpl, String str, ProducerConfigurationData producerConfigurationData, CompletableFuture completableFuture, int i, Schema schema, ProducerInterceptors producerInterceptors, Optional optional) {
        super(pulsarClientImpl, str, producerConfigurationData, completableFuture, i, schema, producerInterceptors, optional);
        this.isPersistentTopic = TopicName.get(str).isPersistent();
    }

    private boolean isBrokerSupportsReplDedupByLidAndEid(ClientCnx clientCnx) {
        return clientCnx.isBrokerSupportsReplDedupByLidAndEid() && this.isPersistentTopic;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.client.impl.ProducerImpl
    public void ackReceived(ClientCnx clientCnx, long j, long j2, long j3, long j4) {
        if (!isBrokerSupportsReplDedupByLidAndEid(clientCnx)) {
            super.ackReceived(clientCnx, j, j2, j3, j4);
            return;
        }
        synchronized (this) {
            ProducerImpl.OpSendMsg peek = this.pendingMessages.peek();
            if (peek == null) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] Got ack for timed out msg {}:{}", this.topic, this.producerName, Long.valueOf(j), Long.valueOf(j2));
                }
            } else if (isReplicationMarker(j2)) {
                ackReceivedReplMarker(clientCnx, peek, j, j2, j3, j4);
            } else {
                ackReceivedReplicatedMsg(clientCnx, peek, j, j2, j3, j4);
            }
        }
    }

    private void ackReceivedReplicatedMsg(ClientCnx clientCnx, ProducerImpl.OpSendMsg opSendMsg, long j, long j2, long j3, long j4) {
        Long l = null;
        Long l2 = null;
        Iterator<KeyValue> it = opSendMsg.msg.getMessageBuilder().getPropertiesList().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            KeyValue next = it.next();
            if (next.getKey().equals(MSG_PROP_REPL_SOURCE_POSITION)) {
                if (next.getValue().contains(LocalDateTimeSchema.DELIMITER)) {
                    String[] split = next.getValue().split(LocalDateTimeSchema.DELIMITER);
                    if (split.length == 2 && StringUtils.isNumeric(split[0]) && StringUtils.isNumeric(split[1])) {
                        l = Long.valueOf(split[0]);
                        l2 = Long.valueOf(split[1]);
                    }
                }
            }
        }
        if (l != null && l2 != null && (l.longValue() < this.lastPersistedSourceLedgerId || (l.longValue() == this.lastPersistedSourceLedgerId && l2.longValue() <= this.lastPersistedSourceEntryId))) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Received an msg send receipt[pending send is repeated due to repl cursor rewind]: source entry {}:{}, pending send: {}:{}, latest persisted: {}:{}", this.topic, this.producerName, Long.valueOf(j), Long.valueOf(j2), l, l2, Long.valueOf(this.lastPersistedSourceLedgerId), Long.valueOf(this.lastPersistedSourceEntryId));
            }
            removeAndApplyCallback(opSendMsg, j, j2, j3, j4, false);
            ackReceived(clientCnx, j, j2, j3, j4);
            return;
        }
        if (j < this.lastPersistedSourceLedgerId || (j == this.lastPersistedSourceLedgerId && j2 <= this.lastPersistedSourceEntryId)) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Received an msg send receipt[repeated]: source entry {}:{}, latest persisted: {}:{}", this.topic, this.producerName, Long.valueOf(j), Long.valueOf(j2), Long.valueOf(this.lastPersistedSourceLedgerId), Long.valueOf(this.lastPersistedSourceEntryId));
            }
        } else {
            if (l == null || l2 == null || j != l.longValue() || j2 != l2.longValue()) {
                log.error("[{}] [{}] Received an msg send receipt[error]: source entry {}:{}, target entry: {}:{}, pending send: {}:{}, latest persisted: {}:{}, queue-size: {}", this.topic, this.producerName, Long.valueOf(j), Long.valueOf(j2), Long.valueOf(j3), Long.valueOf(j4), l, l2, Long.valueOf(this.lastPersistedSourceLedgerId), Long.valueOf(this.lastPersistedSourceEntryId), Integer.valueOf(this.pendingMessages.messagesCount()));
                clientCnx.channel().close();
                return;
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Received an msg send receipt[expected]: source entry {}:{}, target entry: {}:{}", this.topic, this.producerName, Long.valueOf(j), Long.valueOf(j2), Long.valueOf(j3), Long.valueOf(j4));
            }
            this.lastPersistedSourceLedgerId = j;
            this.lastPersistedSourceEntryId = j2;
            removeAndApplyCallback(opSendMsg, j, j2, j3, j4, false);
        }
    }

    protected void ackReceivedReplMarker(ClientCnx clientCnx, ProducerImpl.OpSendMsg opSendMsg, long j, long j2, long j3, long j4) {
        long j5 = LAST_SEQ_ID_PUBLISHED_UPDATER.get(this);
        if (j5 != 0 && j <= j5) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Received an repl marker send receipt[repeated]. seq: {}, seqPersisted: {}, isSourceMarker: {}, target entry: {}:{}", this.topic, this.producerName, Long.valueOf(j), Long.valueOf(j5), Long.valueOf(j2), Long.valueOf(j3), Long.valueOf(j4));
                return;
            }
            return;
        }
        boolean isReplicationMarker = isReplicationMarker(opSendMsg);
        if (isReplicationMarker && j == opSendMsg.sequenceId) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Received an repl marker send receipt[expected]. seq: {}, seqPersisted: {}, isReplMarker: {}, target entry: {}:{}", this.topic, this.producerName, Long.valueOf(j), Long.valueOf(j5), Long.valueOf(j2), Long.valueOf(j3), Long.valueOf(j4));
            }
            long highestSequenceId = getHighestSequenceId(opSendMsg);
            LAST_SEQ_ID_PUBLISHED_UPDATER.getAndUpdate(this, j6 -> {
                return Math.max(j6, highestSequenceId);
            });
            removeAndApplyCallback(opSendMsg, j, j2, j3, j4, true);
            return;
        }
        long j7 = LAST_SEQ_ID_PUSHED_UPDATER.get(this);
        Object[] objArr = new Object[10];
        objArr[0] = this.topic;
        objArr[1] = this.producerName;
        objArr[2] = Long.valueOf(j);
        objArr[3] = isReplicationMarker ? Long.valueOf(opSendMsg.sequenceId) : "unknown";
        objArr[4] = Long.valueOf(j5);
        objArr[5] = Long.valueOf(j7);
        objArr[6] = Long.valueOf(j2);
        objArr[7] = Long.valueOf(j3);
        objArr[8] = Long.valueOf(j4);
        objArr[9] = Integer.valueOf(this.pendingMessages.messagesCount());
        String format = String.format("[%s] [%s] Received an repl marker send receipt[error]. seq: %s, seqPending: %s. sequenceIdPersisted: %s, lastInProgressSend: %s, isSourceMarker: %s, target entry: %s:%s, queue-size: %s", objArr);
        if (j < j7) {
            log.warn(format);
        } else {
            log.error(format);
        }
        clientCnx.channel().close();
    }

    private void removeAndApplyCallback(ProducerImpl.OpSendMsg opSendMsg, long j, long j2, long j3, long j4, boolean z) {
        this.pendingMessages.remove();
        releaseSemaphoreForSendOp(opSendMsg);
        opSendMsg.setMessageId(j3, j4, this.partitionIndex);
        try {
            opSendMsg.sendComplete(null);
        } catch (Throwable th) {
            log.warn("[{}] [{}] Got exception while completing the callback for -- source-message: {}:{} -- target-msg: {}:{} -- isMarker: {}", this.topic, this.producerName, Long.valueOf(j), Long.valueOf(j2), Long.valueOf(j3), Long.valueOf(j4), Boolean.valueOf(z), th);
        }
        ReferenceCountUtil.safeRelease(opSendMsg.cmd);
        opSendMsg.recycle();
    }

    private boolean isReplicationMarker(ProducerImpl.OpSendMsg opSendMsg) {
        return opSendMsg.msg != null && opSendMsg.msg.getMessageBuilder().hasMarkerType() && Markers.isReplicationMarker(opSendMsg.msg.getMessageBuilder().getMarkerType());
    }

    private boolean isReplicationMarker(long j) {
        return Long.MIN_VALUE == j;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.client.impl.ProducerImpl
    public void updateLastSeqPushed(ProducerImpl.OpSendMsg opSendMsg) {
        if (isReplicationMarker(opSendMsg)) {
            super.updateLastSeqPushed(opSendMsg);
        }
    }
}
