package org.apache.pinot.server.starter.helix;

import java.util.HashSet;
import java.util.Set;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.class */
public abstract class IngestionBasedConsumptionStatusChecker {
    protected final InstanceDataManager _instanceDataManager;
    protected final Set<String> _consumingSegments;
    protected final Logger _logger = LoggerFactory.getLogger(getClass());
    private final Set<String> _caughtUpSegments = new HashSet();

    public IngestionBasedConsumptionStatusChecker(InstanceDataManager instanceDataManager, Set<String> set) {
        this._instanceDataManager = instanceDataManager;
        this._consumingSegments = set;
    }

    public int getNumConsumingSegmentsNotReachedIngestionCriteria() {
        for (String str : this._consumingSegments) {
            if (!this._caughtUpSegments.contains(str)) {
                TableDataManager tableDataManager = getTableDataManager(str);
                if (tableDataManager == null) {
                    this._logger.info("TableDataManager is not yet setup for segment {}. Will check consumption status later", str);
                } else {
                    SegmentDataManager segmentDataManager = null;
                    try {
                        segmentDataManager = tableDataManager.acquireSegment(str);
                        if (segmentDataManager == null) {
                            this._logger.info("SegmentDataManager is not yet setup for segment {}. Will check consumption status later", str);
                            if (segmentDataManager != null) {
                                tableDataManager.releaseSegment(segmentDataManager);
                            }
                        } else if (segmentDataManager instanceof RealtimeSegmentDataManager) {
                            if (isSegmentCaughtUp(str, (RealtimeSegmentDataManager) segmentDataManager)) {
                                this._caughtUpSegments.add(str);
                            }
                            if (segmentDataManager != null) {
                                tableDataManager.releaseSegment(segmentDataManager);
                            }
                        } else {
                            this._logger.info("Segment {} is already committed and is considered caught up.", str);
                            this._caughtUpSegments.add(str);
                            if (segmentDataManager != null) {
                                tableDataManager.releaseSegment(segmentDataManager);
                            }
                        }
                    } catch (Throwable th) {
                        if (segmentDataManager != null) {
                            tableDataManager.releaseSegment(segmentDataManager);
                        }
                        throw th;
                    }
                }
            }
        }
        return this._consumingSegments.size() - this._caughtUpSegments.size();
    }

    protected abstract boolean isSegmentCaughtUp(String str, RealtimeSegmentDataManager realtimeSegmentDataManager);

    private TableDataManager getTableDataManager(String str) {
        return this._instanceDataManager.getTableDataManager(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(new LLCSegmentName(str).getTableName()));
    }
}
