package org.apache.pinot.segment.local.upsert;

import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.$internal.com.google.common.annotations.VisibleForTesting;
import org.apache.pinot.$internal.com.google.common.base.Preconditions;
import org.apache.pinot.$internal.com.google.common.util.concurrent.AtomicDouble;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import org.apache.pinot.segment.local.upsert.UpsertUtils;
import org.apache.pinot.segment.local.utils.HashUtils;
import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.MutableSegment;
import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.utils.BooleanUtils;
import org.roaringbitmap.PeekableIntIterator;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.class */
public abstract class BasePartitionUpsertMetadataManager implements PartitionUpsertMetadataManager {
    protected static final long OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS = TimeUnit.MINUTES.toNanos(1);
    protected final String _tableNameWithType;
    protected final int _partitionId;
    protected final UpsertContext _context;
    protected final List<String> _primaryKeyColumns;
    protected final List<String> _comparisonColumns;
    protected final String _deleteRecordColumn;
    protected final HashFunction _hashFunction;
    protected final PartialUpsertHandler _partialUpsertHandler;
    protected final boolean _enableSnapshot;
    protected final double _metadataTTL;
    protected final double _deletedKeysTTL;
    protected final File _tableIndexDir;
    protected final ServerMetrics _serverMetrics;
    protected final Logger _logger;
    protected final ReadWriteLock _snapshotLock;
    protected final AtomicDouble _largestSeenComparisonValue;
    private boolean _stopped;
    private boolean _closed;
    protected final Set<IndexSegment> _trackedSegments = ConcurrentHashMap.newKeySet();
    protected volatile boolean _gotFirstConsumingSegment = false;
    protected long _lastOutOfOrderEventReportTimeNs = Long.MIN_VALUE;
    protected int _numOutOfOrderEvents = 0;
    private int _numPendingOperations = 1;

    /* JADX INFO: Access modifiers changed from: protected */
    public BasePartitionUpsertMetadataManager(String str, int i, UpsertContext upsertContext) {
        this._tableNameWithType = str;
        this._partitionId = i;
        this._context = upsertContext;
        this._primaryKeyColumns = upsertContext.getPrimaryKeyColumns();
        this._comparisonColumns = upsertContext.getComparisonColumns();
        this._deleteRecordColumn = upsertContext.getDeleteRecordColumn();
        this._hashFunction = upsertContext.getHashFunction();
        this._partialUpsertHandler = upsertContext.getPartialUpsertHandler();
        this._enableSnapshot = upsertContext.isSnapshotEnabled();
        this._snapshotLock = this._enableSnapshot ? new ReentrantReadWriteLock() : null;
        this._metadataTTL = upsertContext.getMetadataTTL();
        this._deletedKeysTTL = upsertContext.getDeletedKeysTTL();
        this._tableIndexDir = upsertContext.getTableIndexDir();
        this._serverMetrics = ServerMetrics.get();
        this._logger = LoggerFactory.getLogger(str + "-" + i + "-" + getClass().getSimpleName());
        if (this._metadataTTL > 0.0d) {
            this._largestSeenComparisonValue = new AtomicDouble(loadWatermark());
        } else {
            this._largestSeenComparisonValue = new AtomicDouble(Double.MIN_VALUE);
            deleteWatermark();
        }
    }

    @Override // org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager
    public List<String> getPrimaryKeyColumns() {
        return this._primaryKeyColumns;
    }

    @Nullable
    protected MutableRoaringBitmap getQueryableDocIds(IndexSegment indexSegment, MutableRoaringBitmap mutableRoaringBitmap) {
        if (this._deleteRecordColumn == null) {
            return null;
        }
        MutableRoaringBitmap mutableRoaringBitmap2 = new MutableRoaringBitmap();
        try {
            PinotSegmentColumnReader pinotSegmentColumnReader = new PinotSegmentColumnReader(indexSegment, this._deleteRecordColumn);
            try {
                PeekableIntIterator intIterator = mutableRoaringBitmap.getIntIterator();
                while (intIterator.hasNext()) {
                    int next = intIterator.next();
                    if (!BooleanUtils.toBoolean(pinotSegmentColumnReader.getValue(next))) {
                        mutableRoaringBitmap2.add(next);
                    }
                }
                pinotSegmentColumnReader.close();
            } finally {
            }
        } catch (IOException e) {
            this._logger.error("Failed to close column reader for delete record column: {} for segment: {} ", this._deleteRecordColumn, indexSegment.getSegmentName(), e);
        }
        return mutableRoaringBitmap2;
    }

    @Override // org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager
    public void addSegment(ImmutableSegment immutableSegment) {
        String segmentName = immutableSegment.getSegmentName();
        if (immutableSegment instanceof EmptyIndexSegment) {
            this._logger.info("Skip adding empty segment: {}", segmentName);
            return;
        }
        Preconditions.checkArgument(immutableSegment instanceof ImmutableSegmentImpl, "Got unsupported segment implementation: {} for segment: {}, table: {}", immutableSegment.getClass(), segmentName, this._tableNameWithType);
        ImmutableSegmentImpl immutableSegmentImpl = (ImmutableSegmentImpl) immutableSegment;
        if (this._deletedKeysTTL > 0.0d) {
            double doubleValue = ((Number) immutableSegment.getSegmentMetadata().getColumnMetadataMap().get(this._comparisonColumns.get(0)).getMaxValue()).doubleValue();
            this._largestSeenComparisonValue.getAndUpdate(d -> {
                return Math.max(d, doubleValue);
            });
        }
        if (this._metadataTTL > 0.0d && this._largestSeenComparisonValue.get() > 0.0d) {
            Preconditions.checkState(this._enableSnapshot, "Upsert TTL must have snapshot enabled");
            Preconditions.checkState(this._comparisonColumns.size() == 1, "Upsert TTL does not work with multiple comparison columns");
            if (((Number) immutableSegment.getSegmentMetadata().getColumnMetadataMap().get(this._comparisonColumns.get(0)).getMaxValue()).doubleValue() < this._largestSeenComparisonValue.get() - this._metadataTTL) {
                this._logger.info("Skip adding segment: {} because it's out of TTL", segmentName);
                MutableRoaringBitmap loadValidDocIdsFromSnapshot = immutableSegmentImpl.loadValidDocIdsFromSnapshot();
                if (loadValidDocIdsFromSnapshot != null) {
                    immutableSegmentImpl.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(loadValidDocIdsFromSnapshot), new ThreadSafeMutableRoaringBitmap(getQueryableDocIds(immutableSegment, loadValidDocIdsFromSnapshot)));
                    return;
                } else {
                    this._logger.warn("Failed to find snapshot from segment: {} which is out of TTL, treating all documents as valid", segmentName);
                    return;
                }
            }
        }
        if (!startOperation()) {
            this._logger.info("Skip adding segment: {} because metadata manager is already stopped", immutableSegment.getSegmentName());
            return;
        }
        if (this._enableSnapshot) {
            this._snapshotLock.readLock().lock();
        }
        try {
            doAddSegment(immutableSegmentImpl);
            this._trackedSegments.add(immutableSegment);
            if (this._enableSnapshot) {
                this._snapshotLock.readLock().unlock();
            }
            finishOperation();
        } catch (Throwable th) {
            if (this._enableSnapshot) {
                this._snapshotLock.readLock().unlock();
            }
            finishOperation();
            throw th;
        }
    }

    protected void doAddSegment(ImmutableSegmentImpl immutableSegmentImpl) {
        MutableRoaringBitmap mutableRoaringBitmap;
        String segmentName = immutableSegmentImpl.getSegmentName();
        this._logger.info("Adding segment: {}, current primary key count: {}", segmentName, Long.valueOf(getNumPrimaryKeys()));
        long currentTimeMillis = System.currentTimeMillis();
        if (this._enableSnapshot) {
            mutableRoaringBitmap = immutableSegmentImpl.loadValidDocIdsFromSnapshot();
            if (mutableRoaringBitmap != null && mutableRoaringBitmap.isEmpty()) {
                this._logger.info("Skip adding segment: {} without valid doc, current primary key count: {}", immutableSegmentImpl.getSegmentName(), Long.valueOf(getNumPrimaryKeys()));
                immutableSegmentImpl.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(), null);
                return;
            }
        } else {
            mutableRoaringBitmap = null;
            immutableSegmentImpl.deleteValidDocIdsSnapshot();
        }
        try {
            UpsertUtils.RecordInfoReader recordInfoReader = new UpsertUtils.RecordInfoReader(immutableSegmentImpl, this._primaryKeyColumns, this._comparisonColumns, this._deleteRecordColumn);
            try {
                addSegment(immutableSegmentImpl, null, null, mutableRoaringBitmap != null ? UpsertUtils.getRecordInfoIterator(recordInfoReader, mutableRoaringBitmap) : UpsertUtils.getRecordInfoIterator(recordInfoReader, immutableSegmentImpl.getSegmentMetadata().getTotalDocs()));
                recordInfoReader.close();
                long numPrimaryKeys = getNumPrimaryKeys();
                updatePrimaryKeyGauge(numPrimaryKeys);
                this._logger.info("Finished adding segment: {} in {}ms, current primary key count: {}", segmentName, Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Long.valueOf(numPrimaryKeys));
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException(String.format("Caught exception while adding segment: %s, table: %s", segmentName, this._tableNameWithType), e);
        }
    }

    protected abstract long getNumPrimaryKeys();

    protected void updatePrimaryKeyGauge(long j) {
        this._serverMetrics.setValueOfPartitionGauge(this._tableNameWithType, this._partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT, j);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void updatePrimaryKeyGauge() {
        updatePrimaryKeyGauge(getNumPrimaryKeys());
    }

    @Override // org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager
    public void preloadSegment(ImmutableSegment immutableSegment) {
        String segmentName = immutableSegment.getSegmentName();
        Preconditions.checkArgument(this._enableSnapshot, "Snapshot must be enabled to preload segment: %s, table: %s", segmentName, this._tableNameWithType);
        Preconditions.checkArgument(immutableSegment instanceof ImmutableSegmentImpl, "Got unsupported segment implementation: {} for segment: {}, table: {}", immutableSegment.getClass(), segmentName, this._tableNameWithType);
        if (!startOperation()) {
            this._logger.info("Skip preloading segment: {} because metadata manager is already stopped", segmentName);
            return;
        }
        this._snapshotLock.readLock().lock();
        try {
            doPreloadSegment((ImmutableSegmentImpl) immutableSegment);
            this._trackedSegments.add(immutableSegment);
        } finally {
            this._snapshotLock.readLock().unlock();
            finishOperation();
        }
    }

    protected void doPreloadSegment(ImmutableSegmentImpl immutableSegmentImpl) {
        String segmentName = immutableSegmentImpl.getSegmentName();
        this._logger.info("Preloading segment: {}, current primary key count: {}", segmentName, Long.valueOf(getNumPrimaryKeys()));
        long currentTimeMillis = System.currentTimeMillis();
        MutableRoaringBitmap loadValidDocIdsFromSnapshot = immutableSegmentImpl.loadValidDocIdsFromSnapshot();
        Preconditions.checkState(loadValidDocIdsFromSnapshot != null, "Snapshot of validDocIds is required to preload segment: %s, table: %s", segmentName, this._tableNameWithType);
        if (loadValidDocIdsFromSnapshot.isEmpty()) {
            this._logger.info("Skip preloading segment: {} without valid doc, current primary key count: {}", immutableSegmentImpl.getSegmentName(), Long.valueOf(getNumPrimaryKeys()));
            immutableSegmentImpl.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(), null);
            return;
        }
        try {
            UpsertUtils.RecordInfoReader recordInfoReader = new UpsertUtils.RecordInfoReader(immutableSegmentImpl, this._primaryKeyColumns, this._comparisonColumns, this._deleteRecordColumn);
            try {
                doPreloadSegment(immutableSegmentImpl, null, null, UpsertUtils.getRecordInfoIterator(recordInfoReader, loadValidDocIdsFromSnapshot));
                recordInfoReader.close();
                long numPrimaryKeys = getNumPrimaryKeys();
                updatePrimaryKeyGauge(numPrimaryKeys);
                this._logger.info("Finished preloading segment: {} in {}ms, current primary key count: {}", segmentName, Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Long.valueOf(numPrimaryKeys));
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException(String.format("Caught exception while preloading segment: %s, table: %s", segmentName, this._tableNameWithType), e);
        }
    }

    @VisibleForTesting
    void doPreloadSegment(ImmutableSegmentImpl immutableSegmentImpl, @Nullable ThreadSafeMutableRoaringBitmap threadSafeMutableRoaringBitmap, @Nullable ThreadSafeMutableRoaringBitmap threadSafeMutableRoaringBitmap2, Iterator<RecordInfo> it2) {
        if (threadSafeMutableRoaringBitmap == null) {
            threadSafeMutableRoaringBitmap = new ThreadSafeMutableRoaringBitmap();
        }
        if (threadSafeMutableRoaringBitmap2 == null && this._deleteRecordColumn != null) {
            threadSafeMutableRoaringBitmap2 = new ThreadSafeMutableRoaringBitmap();
        }
        addSegmentWithoutUpsert(immutableSegmentImpl, threadSafeMutableRoaringBitmap, threadSafeMutableRoaringBitmap2, it2);
    }

    @VisibleForTesting
    public void addSegment(ImmutableSegmentImpl immutableSegmentImpl, @Nullable ThreadSafeMutableRoaringBitmap threadSafeMutableRoaringBitmap, @Nullable ThreadSafeMutableRoaringBitmap threadSafeMutableRoaringBitmap2, Iterator<RecordInfo> it2) {
        Lock segmentLock = SegmentLocks.getSegmentLock(this._tableNameWithType, immutableSegmentImpl.getSegmentName());
        segmentLock.lock();
        if (threadSafeMutableRoaringBitmap == null) {
            try {
                threadSafeMutableRoaringBitmap = new ThreadSafeMutableRoaringBitmap();
            } catch (Throwable th) {
                segmentLock.unlock();
                throw th;
            }
        }
        if (threadSafeMutableRoaringBitmap2 == null && this._deleteRecordColumn != null) {
            threadSafeMutableRoaringBitmap2 = new ThreadSafeMutableRoaringBitmap();
        }
        addOrReplaceSegment(immutableSegmentImpl, threadSafeMutableRoaringBitmap, threadSafeMutableRoaringBitmap2, it2, null, null);
        segmentLock.unlock();
    }

    protected abstract void addOrReplaceSegment(ImmutableSegmentImpl immutableSegmentImpl, ThreadSafeMutableRoaringBitmap threadSafeMutableRoaringBitmap, @Nullable ThreadSafeMutableRoaringBitmap threadSafeMutableRoaringBitmap2, Iterator<RecordInfo> it2, @Nullable IndexSegment indexSegment, @Nullable MutableRoaringBitmap mutableRoaringBitmap);

    protected void addSegmentWithoutUpsert(ImmutableSegmentImpl immutableSegmentImpl, ThreadSafeMutableRoaringBitmap threadSafeMutableRoaringBitmap, @Nullable ThreadSafeMutableRoaringBitmap threadSafeMutableRoaringBitmap2, Iterator<RecordInfo> it2) {
        addOrReplaceSegment(immutableSegmentImpl, threadSafeMutableRoaringBitmap, threadSafeMutableRoaringBitmap2, it2, null, null);
    }

    @Override // org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager
    public boolean addRecord(MutableSegment mutableSegment, RecordInfo recordInfo) {
        this._gotFirstConsumingSegment = true;
        if (!startOperation()) {
            this._logger.debug("Skip adding record to segment: {} because metadata manager is already stopped", mutableSegment.getSegmentName());
            return false;
        }
        try {
            boolean doAddRecord = doAddRecord(mutableSegment, recordInfo);
            this._trackedSegments.add(mutableSegment);
            finishOperation();
            return doAddRecord;
        } catch (Throwable th) {
            finishOperation();
            throw th;
        }
    }

    protected abstract boolean doAddRecord(MutableSegment mutableSegment, RecordInfo recordInfo);

    @Override // org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager
    public void replaceSegment(ImmutableSegment immutableSegment, IndexSegment indexSegment) {
        if (!startOperation()) {
            this._logger.info("Skip replacing segment: {} because metadata manager is already stopped", immutableSegment.getSegmentName());
            return;
        }
        if (this._enableSnapshot) {
            this._snapshotLock.readLock().lock();
        }
        try {
            doReplaceSegment(immutableSegment, indexSegment);
            if (!(immutableSegment instanceof EmptyIndexSegment)) {
                this._trackedSegments.add(immutableSegment);
            }
            this._trackedSegments.remove(indexSegment);
        } finally {
            if (this._enableSnapshot) {
                this._snapshotLock.readLock().unlock();
            }
            finishOperation();
        }
    }

    protected void doReplaceSegment(ImmutableSegment immutableSegment, IndexSegment indexSegment) {
        String segmentName = immutableSegment.getSegmentName();
        Preconditions.checkArgument(segmentName.equals(indexSegment.getSegmentName()), "Cannot replace segment with different name for table: {}, old segment: {}, new segment: {}", this._tableNameWithType, indexSegment.getSegmentName(), segmentName);
        Logger logger = this._logger;
        Object[] objArr = new Object[3];
        objArr[0] = indexSegment instanceof ImmutableSegment ? "immutable" : "mutable";
        objArr[1] = segmentName;
        objArr[2] = Long.valueOf(getNumPrimaryKeys());
        logger.info("Replacing {} segment: {}, current primary key count: {}", objArr);
        long currentTimeMillis = System.currentTimeMillis();
        if (immutableSegment instanceof EmptyIndexSegment) {
            this._logger.info("Skip adding empty segment: {}", segmentName);
            replaceSegment(immutableSegment, null, null, null, indexSegment);
            return;
        }
        try {
            UpsertUtils.RecordInfoReader recordInfoReader = new UpsertUtils.RecordInfoReader(immutableSegment, this._primaryKeyColumns, this._comparisonColumns, this._deleteRecordColumn);
            try {
                replaceSegment(immutableSegment, null, null, UpsertUtils.getRecordInfoIterator(recordInfoReader, immutableSegment.getSegmentMetadata().getTotalDocs()), indexSegment);
                recordInfoReader.close();
                long numPrimaryKeys = getNumPrimaryKeys();
                updatePrimaryKeyGauge(numPrimaryKeys);
                this._logger.info("Finished replacing segment: {} in {}ms, current primary key count: {}", segmentName, Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Long.valueOf(numPrimaryKeys));
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException(String.format("Caught exception while replacing segment: %s, table: %s", segmentName, this._tableNameWithType), e);
        }
    }

    @VisibleForTesting
    public void replaceSegment(ImmutableSegment immutableSegment, @Nullable ThreadSafeMutableRoaringBitmap threadSafeMutableRoaringBitmap, @Nullable ThreadSafeMutableRoaringBitmap threadSafeMutableRoaringBitmap2, @Nullable Iterator<RecordInfo> it2, IndexSegment indexSegment) {
        String segmentName = immutableSegment.getSegmentName();
        Lock segmentLock = SegmentLocks.getSegmentLock(this._tableNameWithType, segmentName);
        segmentLock.lock();
        try {
            MutableRoaringBitmap mutableRoaringBitmap = indexSegment.getValidDocIds() != null ? indexSegment.getValidDocIds().getMutableRoaringBitmap() : null;
            if (it2 != null) {
                Preconditions.checkArgument(immutableSegment instanceof ImmutableSegmentImpl, "Got unsupported segment implementation: {} for segment: {}, table: {}", immutableSegment.getClass(), segmentName, this._tableNameWithType);
                if (threadSafeMutableRoaringBitmap == null) {
                    threadSafeMutableRoaringBitmap = new ThreadSafeMutableRoaringBitmap();
                }
                if (threadSafeMutableRoaringBitmap2 == null && this._deleteRecordColumn != null) {
                    threadSafeMutableRoaringBitmap2 = new ThreadSafeMutableRoaringBitmap();
                }
                addOrReplaceSegment((ImmutableSegmentImpl) immutableSegment, threadSafeMutableRoaringBitmap, threadSafeMutableRoaringBitmap2, it2, indexSegment, mutableRoaringBitmap);
            }
            if (mutableRoaringBitmap != null && !mutableRoaringBitmap.isEmpty()) {
                int cardinality = mutableRoaringBitmap.getCardinality();
                if (this._partialUpsertHandler != null) {
                    this._logger.warn("Found {} primary keys not replaced when replacing segment: {} for partial-upsert table. This can potentially cause inconsistency between replicas", Integer.valueOf(cardinality), segmentName);
                    this._serverMetrics.addMeteredTableValue(this._tableNameWithType, ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED, cardinality);
                } else {
                    this._logger.info("Found {} primary keys not replaced when replacing segment: {}", Integer.valueOf(cardinality), segmentName);
                }
                removeSegment(indexSegment, mutableRoaringBitmap);
            }
        } finally {
            segmentLock.unlock();
        }
    }

    protected abstract void removeSegment(IndexSegment indexSegment, MutableRoaringBitmap mutableRoaringBitmap);

    @Override // org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager
    public void removeSegment(IndexSegment indexSegment) {
        String segmentName = indexSegment.getSegmentName();
        if (!this._trackedSegments.contains(indexSegment)) {
            this._logger.info("Skip removing untracked (replaced or empty) segment: {}", segmentName);
            return;
        }
        boolean z = false;
        if (this._metadataTTL > 0.0d && this._largestSeenComparisonValue.get() > 0.0d && ((Number) indexSegment.getSegmentMetadata().getColumnMetadataMap().get(this._comparisonColumns.get(0)).getMaxValue()).doubleValue() < this._largestSeenComparisonValue.get() - this._metadataTTL) {
            this._logger.info("Skip removing segment: {} because it's out of TTL", segmentName);
            z = true;
        }
        if (!startOperation()) {
            this._logger.info("Skip removing segment: {} because metadata manager is already stopped", segmentName);
            return;
        }
        if (this._enableSnapshot) {
            this._snapshotLock.readLock().lock();
        }
        if (!z) {
            try {
                doRemoveSegment(indexSegment);
            } catch (Throwable th) {
                if (this._enableSnapshot) {
                    this._snapshotLock.readLock().unlock();
                }
                finishOperation();
                throw th;
            }
        }
        this._trackedSegments.remove(indexSegment);
        if (this._enableSnapshot) {
            this._snapshotLock.readLock().unlock();
        }
        finishOperation();
    }

    protected void doRemoveSegment(IndexSegment indexSegment) {
        String segmentName = indexSegment.getSegmentName();
        Logger logger = this._logger;
        Object[] objArr = new Object[3];
        objArr[0] = indexSegment instanceof ImmutableSegment ? "immutable" : "mutable";
        objArr[1] = segmentName;
        objArr[2] = Long.valueOf(getNumPrimaryKeys());
        logger.info("Removing {} segment: {}, current primary key count: {}", objArr);
        long currentTimeMillis = System.currentTimeMillis();
        MutableRoaringBitmap mutableRoaringBitmap = indexSegment.getValidDocIds() != null ? indexSegment.getValidDocIds().getMutableRoaringBitmap() : null;
        if (mutableRoaringBitmap == null || mutableRoaringBitmap.isEmpty()) {
            this._logger.info("Skip removing segment without valid docs: {}", segmentName);
            return;
        }
        Lock segmentLock = SegmentLocks.getSegmentLock(this._tableNameWithType, segmentName);
        segmentLock.lock();
        try {
            this._logger.info("Removing {} primary keys for segment: {}", Integer.valueOf(mutableRoaringBitmap.getCardinality()), segmentName);
            removeSegment(indexSegment, mutableRoaringBitmap);
            segmentLock.unlock();
            long numPrimaryKeys = getNumPrimaryKeys();
            updatePrimaryKeyGauge(numPrimaryKeys);
            this._logger.info("Finished removing segment: {} in {}ms, current primary key count: {}", segmentName, Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Long.valueOf(numPrimaryKeys));
        } catch (Throwable th) {
            segmentLock.unlock();
            throw th;
        }
    }

    @Override // org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager
    public GenericRow updateRecord(GenericRow genericRow, RecordInfo recordInfo) {
        if (this._partialUpsertHandler == null) {
            return genericRow;
        }
        if (!startOperation()) {
            this._logger.debug("Skip updating record because metadata manager is already stopped");
            return genericRow;
        }
        try {
            GenericRow doUpdateRecord = doUpdateRecord(genericRow, recordInfo);
            finishOperation();
            return doUpdateRecord;
        } catch (Throwable th) {
            finishOperation();
            throw th;
        }
    }

    protected abstract GenericRow doUpdateRecord(GenericRow genericRow, RecordInfo recordInfo);

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleOutOfOrderEvent(Object obj, Object obj2) {
        boolean z = this._partialUpsertHandler != null;
        this._serverMetrics.addMeteredTableValue(this._tableNameWithType, z ? ServerMeter.PARTIAL_UPSERT_OUT_OF_ORDER : ServerMeter.UPSERT_OUT_OF_ORDER, 1L);
        this._numOutOfOrderEvents++;
        long nanoTime = System.nanoTime();
        if (nanoTime - this._lastOutOfOrderEventReportTimeNs > OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS) {
            Logger logger = this._logger;
            Object[] objArr = new Object[5];
            objArr[0] = Integer.valueOf(this._numOutOfOrderEvents);
            objArr[1] = z ? "partial" : "full";
            objArr[2] = this._tableNameWithType;
            objArr[3] = obj;
            objArr[4] = obj2;
            logger.warn("Skipped {} out-of-order events for {} upsert table {} (the last event has current comparison value: {}, record comparison value: {})", objArr);
            this._lastOutOfOrderEventReportTimeNs = nanoTime;
            this._numOutOfOrderEvents = 0;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static Iterator<RecordInfo> resolveComparisonTies(Iterator<RecordInfo> it2, HashFunction hashFunction) {
        HashMap hashMap = new HashMap();
        while (it2.hasNext()) {
            RecordInfo next = it2.next();
            Comparable comparisonValue = next.getComparisonValue();
            hashMap.compute(HashUtils.hashPrimaryKey(next.getPrimaryKey(), hashFunction), (obj, recordInfo) -> {
                if (recordInfo != null && comparisonValue.compareTo(recordInfo.getComparisonValue()) < 0) {
                    return recordInfo;
                }
                return next;
            });
        }
        return hashMap.values().iterator();
    }

    @Override // org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager
    public void takeSnapshot() {
        if (this._enableSnapshot) {
            if (!this._gotFirstConsumingSegment) {
                this._logger.info("Skip taking snapshot before getting the first consuming segment");
                return;
            }
            if (!startOperation()) {
                this._logger.info("Skip taking snapshot because metadata manager is already stopped");
                return;
            }
            this._snapshotLock.writeLock().lock();
            try {
                long currentTimeMillis = System.currentTimeMillis();
                doTakeSnapshot();
                this._serverMetrics.addTimedTableValue(this._tableNameWithType, ServerTimer.UPSERT_SNAPSHOT_TIME_MS, System.currentTimeMillis() - currentTimeMillis, TimeUnit.MILLISECONDS);
                this._snapshotLock.writeLock().unlock();
                finishOperation();
            } catch (Throwable th) {
                this._snapshotLock.writeLock().unlock();
                finishOperation();
                throw th;
            }
        }
    }

    protected void doTakeSnapshot() {
        int size = this._trackedSegments.size();
        long j = 0;
        this._logger.info("Taking snapshot for {} segments", Integer.valueOf(size));
        long currentTimeMillis = System.currentTimeMillis();
        int i = 0;
        int i2 = 0;
        HashSet hashSet = new HashSet();
        for (IndexSegment indexSegment : this._trackedSegments) {
            if (indexSegment instanceof ImmutableSegmentImpl) {
                ImmutableSegmentImpl immutableSegmentImpl = (ImmutableSegmentImpl) indexSegment;
                if (immutableSegmentImpl.hasValidDocIdsSnapshotFile()) {
                    immutableSegmentImpl.persistValidDocIdsSnapshot();
                    i++;
                    j += immutableSegmentImpl.getValidDocIds().getMutableRoaringBitmap().getCardinality();
                } else {
                    hashSet.add(immutableSegmentImpl);
                }
            } else {
                i2++;
            }
        }
        Iterator it2 = hashSet.iterator();
        while (it2.hasNext()) {
            ((ImmutableSegmentImpl) it2.next()).persistValidDocIdsSnapshot();
            i++;
            j += r0.getValidDocIds().getMutableRoaringBitmap().getCardinality();
        }
        this._serverMetrics.setValueOfPartitionGauge(this._tableNameWithType, this._partitionId, ServerGauge.UPSERT_VALID_DOC_ID_SNAPSHOT_COUNT, i);
        this._serverMetrics.setValueOfPartitionGauge(this._tableNameWithType, this._partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_IN_SNAPSHOT_COUNT, j);
        this._logger.info("Finished taking snapshot for {} immutable segments with {} primary keys (out of {} total segments, {} are consuming segments) in {} ms", Integer.valueOf(i), Long.valueOf(j), Integer.valueOf(size), Integer.valueOf(i2), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    protected double loadWatermark() {
        File watermarkFile = getWatermarkFile();
        if (!watermarkFile.exists()) {
            return Double.MIN_VALUE;
        }
        try {
            double d = ByteBuffer.wrap(FileUtils.readFileToByteArray(watermarkFile)).getDouble();
            this._logger.info("Loaded watermark: {} from file for table: {} partition_id: {}", Double.valueOf(d), this._tableNameWithType, Integer.valueOf(this._partitionId));
            return d;
        } catch (Exception e) {
            this._logger.warn("Caught exception while loading watermark file: {}, skipping", watermarkFile);
            return Double.MIN_VALUE;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void persistWatermark(double d) {
        File watermarkFile = getWatermarkFile();
        try {
            if (watermarkFile.exists() && !FileUtils.deleteQuietly(watermarkFile)) {
                this._logger.warn("Cannot delete watermark file: {}, skipping", watermarkFile);
                return;
            }
            FileOutputStream fileOutputStream = new FileOutputStream(watermarkFile, false);
            try {
                DataOutputStream dataOutputStream = new DataOutputStream(fileOutputStream);
                try {
                    dataOutputStream.writeDouble(d);
                    dataOutputStream.close();
                    fileOutputStream.close();
                    this._logger.info("Persisted watermark: {} to file: {}", Double.valueOf(d), watermarkFile);
                } catch (Throwable th) {
                    try {
                        dataOutputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Exception e) {
            this._logger.warn("Caught exception while persisting watermark file: {}, skipping", watermarkFile);
        }
    }

    protected void deleteWatermark() {
        File watermarkFile = getWatermarkFile();
        if (!watermarkFile.exists() || FileUtils.deleteQuietly(watermarkFile)) {
            return;
        }
        this._logger.warn("Cannot delete watermark file: {}, skipping", watermarkFile);
    }

    protected File getWatermarkFile() {
        return new File(this._tableIndexDir, "ttl.watermark.partition." + this._partitionId);
    }

    @Override // org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager
    public void removeExpiredPrimaryKeys() {
        if (this._metadataTTL > 0.0d || this._deletedKeysTTL > 0.0d) {
            if (!startOperation()) {
                this._logger.info("Skip removing expired primary keys because metadata manager is already stopped");
                return;
            }
            try {
                long currentTimeMillis = System.currentTimeMillis();
                doRemoveExpiredPrimaryKeys();
                this._serverMetrics.addTimedTableValue(this._tableNameWithType, ServerTimer.UPSERT_REMOVE_EXPIRED_PRIMARY_KEYS_TIME_MS, System.currentTimeMillis() - currentTimeMillis, TimeUnit.MILLISECONDS);
                finishOperation();
            } catch (Throwable th) {
                finishOperation();
                throw th;
            }
        }
    }

    protected abstract void doRemoveExpiredPrimaryKeys();

    protected synchronized boolean startOperation() {
        if (this._stopped || this._numPendingOperations == 0) {
            return false;
        }
        this._numPendingOperations++;
        return true;
    }

    protected synchronized void finishOperation() {
        this._numPendingOperations--;
        if (this._numPendingOperations == 0) {
            notifyAll();
        }
    }

    @Override // org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager
    public synchronized void stop() {
        if (this._stopped) {
            this._logger.warn("Metadata manager is already stopped");
            return;
        }
        this._stopped = true;
        this._numPendingOperations--;
        this._logger.info("Stopped the metadata manager with {} pending operations, current primary key count: {}", Integer.valueOf(this._numPendingOperations), Long.valueOf(getNumPrimaryKeys()));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() throws IOException {
        Preconditions.checkState(this._stopped, "Must stop the metadata manager before closing it");
        if (this._closed) {
            this._logger.warn("Metadata manager is already closed");
            return;
        }
        this._closed = true;
        this._logger.info("Closing the metadata manager");
        while (this._numPendingOperations != 0) {
            this._logger.info("Waiting for {} pending operations to finish", Integer.valueOf(this._numPendingOperations));
            try {
                wait();
            } catch (InterruptedException e) {
                throw new RuntimeException(String.format("Interrupted while waiting for %d pending operations to finish", Integer.valueOf(this._numPendingOperations)), e);
            }
        }
        doClose();
        updatePrimaryKeyGauge(0L);
        this._logger.info("Closed the metadata manager");
    }

    protected void doClose() throws IOException {
    }
}
