package org.apache.pinot.server.starter.helix;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.class */
public abstract class IngestionBasedConsumptionStatusChecker {
    private final InstanceDataManager _instanceDataManager;
    private final Map<String, Set<String>> _consumingSegmentsByTable;
    private final Function<String, Set<String>> _consumingSegmentsSupplier;
    protected final Logger _logger = LoggerFactory.getLogger(getClass());
    private final Map<String, Set<String>> _caughtUpSegmentsByTable = new HashMap();

    public IngestionBasedConsumptionStatusChecker(InstanceDataManager instanceDataManager, Map<String, Set<String>> map, Function<String, Set<String>> function) {
        this._instanceDataManager = instanceDataManager;
        this._consumingSegmentsByTable = map;
        this._consumingSegmentsSupplier = function;
    }

    public synchronized int getNumConsumingSegmentsNotReachedIngestionCriteria() {
        HashSet<String> hashSet = new HashSet();
        Iterator<Map.Entry<String, Set<String>>> it = this._consumingSegmentsByTable.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, Set<String>> next = it.next();
            String key = next.getKey();
            TableDataManager tableDataManager = this._instanceDataManager.getTableDataManager(key);
            if (tableDataManager == null) {
                this._logger.info("No tableDataManager for table: {}. Refresh table's consuming segments", key);
                hashSet.add(key);
            } else {
                Set<String> value = next.getValue();
                Set<String> computeIfAbsent = this._caughtUpSegmentsByTable.computeIfAbsent(key, str -> {
                    return new HashSet();
                });
                for (String str2 : value) {
                    if (!computeIfAbsent.contains(str2)) {
                        SegmentDataManager acquireSegment = tableDataManager.acquireSegment(str2);
                        if (acquireSegment == null) {
                            this._logger.info("No segmentDataManager for segment: {} from table: {}. Refresh table's consuming segments", str2, key);
                            hashSet.add(key);
                        } else {
                            try {
                                if (!(acquireSegment instanceof RealtimeSegmentDataManager)) {
                                    this._logger.info("Segment: {} from table: {} is already committed. Refresh table's consuming segments.", str2, key);
                                    hashSet.add(key);
                                    tableDataManager.releaseSegment(acquireSegment);
                                } else if (isSegmentCaughtUp(str2, (RealtimeSegmentDataManager) acquireSegment)) {
                                    computeIfAbsent.add(str2);
                                }
                            } finally {
                                tableDataManager.releaseSegment(acquireSegment);
                            }
                        }
                    }
                }
                if (value.size() - computeIfAbsent.size() == 0) {
                    this._logger.info("Consuming segments from table: {} have all caught up", key);
                    it.remove();
                    this._caughtUpSegmentsByTable.remove(key);
                }
            }
        }
        if (!hashSet.isEmpty()) {
            for (String str3 : hashSet) {
                Set<String> apply = this._consumingSegmentsSupplier.apply(str3);
                if (apply == null || apply.isEmpty()) {
                    this._consumingSegmentsByTable.remove(str3);
                    this._caughtUpSegmentsByTable.remove(str3);
                    this._logger.info("Found no consuming segments from table: {}, which is probably removed", str3);
                } else {
                    this._consumingSegmentsByTable.put(str3, apply);
                    this._caughtUpSegmentsByTable.computeIfAbsent(str3, str4 -> {
                        return new HashSet();
                    }).retainAll(apply);
                    this._logger.info("Updated consumingSegments: {} and caughtUpSegments: {} for table: {}, as consuming segments were missing or committed", new Object[]{apply, this._caughtUpSegmentsByTable.get(str3), str3});
                }
            }
        }
        int i = 0;
        for (Map.Entry<String, Set<String>> entry : this._consumingSegmentsByTable.entrySet()) {
            i += entry.getValue().size() - this._caughtUpSegmentsByTable.computeIfAbsent(entry.getKey(), str5 -> {
                return new HashSet();
            }).size();
        }
        return i;
    }

    protected abstract boolean isSegmentCaughtUp(String str, RealtimeSegmentDataManager realtimeSegmentDataManager);
}
