package org.apache.pinot.controller.helix.core.realtime.segment;

import com.google.common.annotations.VisibleForTesting;
import java.time.Clock;
import java.util.Random;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.TimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:org/apache/pinot/controller/helix/core/realtime/segment/SizeBasedSegmentFlushThresholdComputer.class */
class SizeBasedSegmentFlushThresholdComputer {
    static final int MINIMUM_NUM_ROWS_THRESHOLD = 10000;
    static final double CURRENT_SEGMENT_RATIO_WEIGHT = 0.1d;
    static final double PREVIOUS_SEGMENT_RATIO_WEIGHT = 0.9d;
    static final double ROWS_MULTIPLIER_WHEN_TIME_THRESHOLD_HIT = 1.1d;
    private static final Logger LOGGER = LoggerFactory.getLogger(SizeBasedSegmentFlushThresholdComputer.class);
    private static final Random RANDOM = new Random();
    private final Clock _clock;
    private long _timeConsumedForLastSegment;
    private int _rowsConsumedForLastSegment;
    private long _sizeForLastSegment;
    private int _rowsThresholdForLastSegment;
    private double _segmentRowsToSizeRatio;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SizeBasedSegmentFlushThresholdComputer() {
        this(Clock.systemUTC());
    }

    @VisibleForTesting
    SizeBasedSegmentFlushThresholdComputer(Clock clock) {
        this._clock = clock;
    }

    @VisibleForTesting
    public void setSizeForLastSegment(long j) {
        this._sizeForLastSegment = j;
    }

    @VisibleForTesting
    void setSegmentRowsToSizeRatio(double d) {
        this._segmentRowsToSizeRatio = d;
    }

    @VisibleForTesting
    double getSegmentRowsToSizeRatio() {
        return this._segmentRowsToSizeRatio;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void onSegmentCommit(CommittingSegmentDescriptor committingSegmentDescriptor, SegmentZKMetadata segmentZKMetadata) {
        String segmentName = segmentZKMetadata.getSegmentName();
        int totalDocs = (int) segmentZKMetadata.getTotalDocs();
        long segmentSizeBytes = committingSegmentDescriptor.getSegmentSizeBytes();
        if (totalDocs <= 0 || segmentSizeBytes <= 0 || "forceCommitMessageReceived".equals(committingSegmentDescriptor.getStopReason())) {
            if (segmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.DONE) {
                LOGGER.info("Skipping updating segment rows to size ratio for segment: {} with rows: {}, size: {} and stop reason: {}", new Object[]{segmentName, Integer.valueOf(totalDocs), Long.valueOf(segmentSizeBytes), committingSegmentDescriptor.getStopReason()});
            }
            if (this._segmentRowsToSizeRatio == 0.0d) {
                int sizeThresholdToFlushSegment = segmentZKMetadata.getSizeThresholdToFlushSegment();
                LOGGER.info("Segment rows to size ratio is not available, updating rows threshold to: {}", Integer.valueOf(sizeThresholdToFlushSegment));
                this._rowsThresholdForLastSegment = sizeThresholdToFlushSegment;
                return;
            }
            return;
        }
        long millis = this._clock.millis() - segmentZKMetadata.getCreationTime();
        int sizeThresholdToFlushSegment2 = segmentZKMetadata.getSizeThresholdToFlushSegment();
        this._timeConsumedForLastSegment = millis;
        this._rowsConsumedForLastSegment = totalDocs;
        this._sizeForLastSegment = segmentSizeBytes;
        this._rowsThresholdForLastSegment = sizeThresholdToFlushSegment2;
        double d = totalDocs / segmentSizeBytes;
        double d2 = this._segmentRowsToSizeRatio;
        if (d2 > 0.0d) {
            this._segmentRowsToSizeRatio = (CURRENT_SEGMENT_RATIO_WEIGHT * d) + (PREVIOUS_SEGMENT_RATIO_WEIGHT * d2);
        } else {
            this._segmentRowsToSizeRatio = d;
        }
        LOGGER.info("Updated with segment: {}, time: {}, rows: {}, size: {}, ratio: {}, threshold: {}. Segment rows to size ratio got updated from: {} to: {}", new Object[]{segmentName, TimeUtils.convertMillisToPeriod(Long.valueOf(millis)), Integer.valueOf(totalDocs), Long.valueOf(segmentSizeBytes), Double.valueOf(d), Integer.valueOf(sizeThresholdToFlushSegment2), Double.valueOf(d2), Double.valueOf(this._segmentRowsToSizeRatio)});
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized int computeThreshold(StreamConfig streamConfig, String str) {
        if (this._segmentRowsToSizeRatio == 0.0d) {
            if (this._rowsThresholdForLastSegment > 0) {
                LOGGER.info("Segment rows to size ratio is not available, using rows threshold: {} from previous segment for new segment: {}", Integer.valueOf(this._rowsThresholdForLastSegment), str);
                return this._rowsThresholdForLastSegment;
            }
            int flushAutotuneInitialRows = streamConfig.getFlushAutotuneInitialRows();
            LOGGER.info("This is the first segment, using initial rows threshold: {} for segment: {}", Integer.valueOf(flushAutotuneInitialRows), str);
            return flushAutotuneInitialRows;
        }
        long flushThresholdSegmentSizeBytes = streamConfig.getFlushThresholdSegmentSizeBytes();
        if (flushThresholdSegmentSizeBytes <= 0) {
            flushThresholdSegmentSizeBytes = 209715200;
        }
        if (this._rowsConsumedForLastSegment >= this._rowsThresholdForLastSegment || this._sizeForLastSegment >= flushThresholdSegmentSizeBytes) {
            long j = this._sizeForLastSegment < flushThresholdSegmentSizeBytes / 2 ? (long) (this._rowsConsumedForLastSegment * 1.5d) : ((double) this._sizeForLastSegment) > ((double) flushThresholdSegmentSizeBytes) * 1.5d ? this._rowsConsumedForLastSegment / 2 : (long) (flushThresholdSegmentSizeBytes * this._segmentRowsToSizeRatio);
            double flushThresholdVarianceFraction = streamConfig.getFlushThresholdVarianceFraction();
            if (flushThresholdVarianceFraction > 0.0d) {
                LOGGER.info("Applying variance: {} to segment: {} with target rows: {}", new Object[]{Double.valueOf(flushThresholdVarianceFraction), str, Long.valueOf(j)});
                j = (long) (j * ((1.0d - flushThresholdVarianceFraction) + (2.0d * flushThresholdVarianceFraction * RANDOM.nextDouble())));
            }
            int threshold = getThreshold(j);
            LOGGER.info("Setting segment size threshold for: {} to: {}", str, Integer.valueOf(threshold));
            return threshold;
        }
        long flushThresholdTimeMillis = streamConfig.getFlushThresholdTimeMillis();
        long j2 = this._rowsConsumedForLastSegment;
        StringBuilder append = new StringBuilder().append("Time threshold reached. ");
        if (flushThresholdTimeMillis < this._timeConsumedForLastSegment) {
            j2 = (flushThresholdTimeMillis * j2) / this._timeConsumedForLastSegment;
            append.append("Detected lower time threshold, adjusting numRowsConsumed to: ").append(j2).append(". ");
        }
        int threshold2 = getThreshold((long) (j2 * ROWS_MULTIPLIER_WHEN_TIME_THRESHOLD_HIT));
        append.append("Setting segment size threshold for: ").append(str).append(" to: ").append(threshold2);
        LOGGER.info(append.toString());
        return threshold2;
    }

    private int getThreshold(long j) {
        return j > 2147483647L ? PinotHelixSegmentOnlineOfflineStateModelGenerator.DEFAULT_STATE_TRANSITION_PRIORITY : Math.max((int) j, MINIMUM_NUM_ROWS_THRESHOLD);
    }
}
