package org.apache.pinot.segment.local.upsert;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.local.upsert.UpsertUtils;
import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.pinot.shaded.com.google.common.base.Preconditions;
import org.apache.pinot.spi.config.table.HashFunction;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.class */
public abstract class BasePartitionUpsertMetadataManager implements PartitionUpsertMetadataManager {
    protected static final long OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS = TimeUnit.MINUTES.toNanos(1);
    protected final String _tableNameWithType;
    protected final int _partitionId;
    protected final List<String> _primaryKeyColumns;
    protected final String _comparisonColumn;
    protected final HashFunction _hashFunction;
    protected final PartialUpsertHandler _partialUpsertHandler;
    protected final boolean _enableSnapshot;
    protected final ServerMetrics _serverMetrics;
    protected final Logger _logger;

    @VisibleForTesting
    public final Set<IndexSegment> _replacedSegments = ConcurrentHashMap.newKeySet();
    protected volatile boolean _closed = false;
    protected long _lastOutOfOrderEventReportTimeNs = Long.MIN_VALUE;
    protected int _numOutOfOrderEvents = 0;

    /* JADX INFO: Access modifiers changed from: protected */
    public BasePartitionUpsertMetadataManager(String str, int i, List<String> list, String str2, HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean z, ServerMetrics serverMetrics) {
        this._tableNameWithType = str;
        this._partitionId = i;
        this._primaryKeyColumns = list;
        this._comparisonColumn = str2;
        this._hashFunction = hashFunction;
        this._partialUpsertHandler = partialUpsertHandler;
        this._enableSnapshot = z;
        this._serverMetrics = serverMetrics;
        this._logger = LoggerFactory.getLogger(str + "-" + i + "-" + getClass().getSimpleName());
    }

    @Override // org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager
    public List<String> getPrimaryKeyColumns() {
        return this._primaryKeyColumns;
    }

    @Override // org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager
    public void addSegment(ImmutableSegment immutableSegment) {
        MutableRoaringBitmap mutableRoaringBitmap;
        String segmentName = immutableSegment.getSegmentName();
        this._logger.info("Adding segment: {}, current primary key count: {}", segmentName, Long.valueOf(getNumPrimaryKeys()));
        if (immutableSegment instanceof EmptyIndexSegment) {
            this._logger.info("Skip adding empty segment: {}", segmentName);
            return;
        }
        Preconditions.checkArgument(immutableSegment instanceof ImmutableSegmentImpl, "Got unsupported segment implementation: {} for segment: {}, table: {}", immutableSegment.getClass(), segmentName, this._tableNameWithType);
        ImmutableSegmentImpl immutableSegmentImpl = (ImmutableSegmentImpl) immutableSegment;
        if (this._enableSnapshot) {
            mutableRoaringBitmap = immutableSegmentImpl.loadValidDocIdsFromSnapshot();
            if (mutableRoaringBitmap != null && mutableRoaringBitmap.isEmpty()) {
                this._logger.info("Skip adding segment: {} without valid doc, current primary key count: {}", immutableSegment.getSegmentName(), Long.valueOf(getNumPrimaryKeys()));
                return;
            }
        } else {
            mutableRoaringBitmap = null;
            immutableSegmentImpl.deleteValidDocIdsSnapshot();
        }
        try {
            UpsertUtils.RecordInfoReader recordInfoReader = new UpsertUtils.RecordInfoReader(immutableSegment, this._primaryKeyColumns, this._comparisonColumn);
            try {
                addSegment(immutableSegmentImpl, null, mutableRoaringBitmap != null ? UpsertUtils.getRecordInfoIterator(recordInfoReader, mutableRoaringBitmap) : UpsertUtils.getRecordInfoIterator(recordInfoReader, immutableSegment.getSegmentMetadata().getTotalDocs()));
                recordInfoReader.close();
                long numPrimaryKeys = getNumPrimaryKeys();
                this._serverMetrics.setValueOfPartitionGauge(this._tableNameWithType, this._partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT, numPrimaryKeys);
                this._logger.info("Finished adding segment: {}, current primary key count: {}", segmentName, Long.valueOf(numPrimaryKeys));
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException(String.format("Caught exception while adding segment: %s, table: %s", segmentName, this._tableNameWithType), e);
        }
    }

    @VisibleForTesting
    public void addSegment(ImmutableSegmentImpl immutableSegmentImpl, @Nullable ThreadSafeMutableRoaringBitmap threadSafeMutableRoaringBitmap, Iterator<RecordInfo> it2) {
        Lock segmentLock = SegmentLocks.getSegmentLock(this._tableNameWithType, immutableSegmentImpl.getSegmentName());
        segmentLock.lock();
        if (threadSafeMutableRoaringBitmap == null) {
            try {
                threadSafeMutableRoaringBitmap = new ThreadSafeMutableRoaringBitmap();
            } catch (Throwable th) {
                segmentLock.unlock();
                throw th;
            }
        }
        addOrReplaceSegment(immutableSegmentImpl, threadSafeMutableRoaringBitmap, it2, null, null);
        segmentLock.unlock();
    }

    protected abstract long getNumPrimaryKeys();

    protected abstract void addOrReplaceSegment(ImmutableSegmentImpl immutableSegmentImpl, ThreadSafeMutableRoaringBitmap threadSafeMutableRoaringBitmap, Iterator<RecordInfo> it2, @Nullable IndexSegment indexSegment, @Nullable MutableRoaringBitmap mutableRoaringBitmap);

    @Override // org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager
    public void replaceSegment(ImmutableSegment immutableSegment, IndexSegment indexSegment) {
        String segmentName = immutableSegment.getSegmentName();
        Preconditions.checkArgument(segmentName.equals(indexSegment.getSegmentName()), "Cannot replace segment with different name for table: {}, old segment: {}, new segment: {}", this._tableNameWithType, indexSegment.getSegmentName(), segmentName);
        Logger logger = this._logger;
        Object[] objArr = new Object[3];
        objArr[0] = indexSegment instanceof ImmutableSegment ? "immutable" : "mutable";
        objArr[1] = segmentName;
        objArr[2] = Long.valueOf(getNumPrimaryKeys());
        logger.info("Replacing {} segment: {}, current primary key count: {}", objArr);
        if (immutableSegment instanceof EmptyIndexSegment) {
            this._logger.info("Skip adding empty segment: {}", segmentName);
            replaceSegment(immutableSegment, null, null, indexSegment);
            return;
        }
        try {
            UpsertUtils.RecordInfoReader recordInfoReader = new UpsertUtils.RecordInfoReader(immutableSegment, this._primaryKeyColumns, this._comparisonColumn);
            try {
                replaceSegment(immutableSegment, null, UpsertUtils.getRecordInfoIterator(recordInfoReader, immutableSegment.getSegmentMetadata().getTotalDocs()), indexSegment);
                recordInfoReader.close();
                long numPrimaryKeys = getNumPrimaryKeys();
                this._serverMetrics.setValueOfPartitionGauge(this._tableNameWithType, this._partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT, numPrimaryKeys);
                this._logger.info("Finished replacing segment: {}, current primary key count: {}", segmentName, Long.valueOf(numPrimaryKeys));
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException(String.format("Caught exception while replacing segment: %s, table: %s", segmentName, this._tableNameWithType), e);
        }
    }

    @VisibleForTesting
    public void replaceSegment(ImmutableSegment immutableSegment, @Nullable ThreadSafeMutableRoaringBitmap threadSafeMutableRoaringBitmap, @Nullable Iterator<RecordInfo> it2, IndexSegment indexSegment) {
        String segmentName = immutableSegment.getSegmentName();
        Lock segmentLock = SegmentLocks.getSegmentLock(this._tableNameWithType, segmentName);
        segmentLock.lock();
        try {
            MutableRoaringBitmap mutableRoaringBitmap = indexSegment.getValidDocIds() != null ? indexSegment.getValidDocIds().getMutableRoaringBitmap() : null;
            if (it2 != null) {
                Preconditions.checkArgument(immutableSegment instanceof ImmutableSegmentImpl, "Got unsupported segment implementation: {} for segment: {}, table: {}", immutableSegment.getClass(), segmentName, this._tableNameWithType);
                if (threadSafeMutableRoaringBitmap == null) {
                    threadSafeMutableRoaringBitmap = new ThreadSafeMutableRoaringBitmap();
                }
                addOrReplaceSegment((ImmutableSegmentImpl) immutableSegment, threadSafeMutableRoaringBitmap, it2, indexSegment, mutableRoaringBitmap);
            }
            if (mutableRoaringBitmap != null && !mutableRoaringBitmap.isEmpty()) {
                int cardinality = mutableRoaringBitmap.getCardinality();
                if (this._partialUpsertHandler != null) {
                    this._logger.warn("Found {} primary keys not replaced when replacing segment: {} for partial-upsert table. This can potentially cause inconsistency between replicas", Integer.valueOf(cardinality), segmentName);
                    this._serverMetrics.addMeteredTableValue(this._tableNameWithType, ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED, cardinality);
                } else {
                    this._logger.info("Found {} primary keys not replaced when replacing segment: {}", Integer.valueOf(cardinality), segmentName);
                }
                removeSegment(indexSegment, mutableRoaringBitmap);
            }
            if (indexSegment instanceof EmptyIndexSegment) {
                return;
            }
            this._replacedSegments.add(indexSegment);
        } finally {
            segmentLock.unlock();
        }
    }

    protected abstract void removeSegment(IndexSegment indexSegment, MutableRoaringBitmap mutableRoaringBitmap);

    @Override // org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager
    public void removeSegment(IndexSegment indexSegment) {
        String segmentName = indexSegment.getSegmentName();
        Logger logger = this._logger;
        Object[] objArr = new Object[3];
        objArr[0] = indexSegment instanceof ImmutableSegment ? "immutable" : "mutable";
        objArr[1] = segmentName;
        objArr[2] = Long.valueOf(getNumPrimaryKeys());
        logger.info("Removing {} segment: {}, current primary key count: {}", objArr);
        if (this._replacedSegments.remove(indexSegment)) {
            this._logger.info("Skip removing replaced segment: {}", segmentName);
            return;
        }
        Lock segmentLock = SegmentLocks.getSegmentLock(this._tableNameWithType, segmentName);
        segmentLock.lock();
        try {
            MutableRoaringBitmap mutableRoaringBitmap = indexSegment.getValidDocIds() != null ? indexSegment.getValidDocIds().getMutableRoaringBitmap() : null;
            if (this._enableSnapshot && (indexSegment instanceof ImmutableSegmentImpl) && mutableRoaringBitmap != null) {
                ((ImmutableSegmentImpl) indexSegment).persistValidDocIdsSnapshot(mutableRoaringBitmap);
            }
            if (this._closed) {
                this._logger.info("Skip removing segment: {} because metadata manager is already closed", indexSegment);
                segmentLock.unlock();
                return;
            }
            if (mutableRoaringBitmap == null || mutableRoaringBitmap.isEmpty()) {
                this._logger.info("Skip removing segment without valid docs: {}", segmentName);
                segmentLock.unlock();
                return;
            }
            this._logger.info("Removing {} primary keys for segment: {}", Integer.valueOf(mutableRoaringBitmap.getCardinality()), segmentName);
            removeSegment(indexSegment, mutableRoaringBitmap);
            segmentLock.unlock();
            long numPrimaryKeys = getNumPrimaryKeys();
            this._serverMetrics.setValueOfPartitionGauge(this._tableNameWithType, this._partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT, numPrimaryKeys);
            this._logger.info("Finished removing segment: {}, current primary key count: {}", segmentName, Long.valueOf(numPrimaryKeys));
        } catch (Throwable th) {
            segmentLock.unlock();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleOutOfOrderEvent(Object obj, Object obj2) {
        this._serverMetrics.addMeteredTableValue(this._tableNameWithType, ServerMeter.PARTIAL_UPSERT_OUT_OF_ORDER, 1L);
        this._numOutOfOrderEvents++;
        long nanoTime = System.nanoTime();
        if (nanoTime - this._lastOutOfOrderEventReportTimeNs > OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS) {
            this._logger.warn("Skipped {} out-of-order events for partial-upsert table (the last event has current comparison value: {}, record comparison value: {})", Integer.valueOf(this._numOutOfOrderEvents), obj, obj2);
            this._lastOutOfOrderEventReportTimeNs = nanoTime;
            this._numOutOfOrderEvents = 0;
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this._logger.info("Closing the metadata manager, current primary key count: {}", Long.valueOf(getNumPrimaryKeys()));
        this._closed = true;
    }
}
