package org.apache.pinot.controller.helix;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.hc.client5.http.impl.io.BasicHttpClientConnectionManager;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/controller/helix/RealtimeConsumerMonitor.class */
public class RealtimeConsumerMonitor extends ControllerPeriodicTask<Context> {
    private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeConsumerMonitor.class);
    private static final int DEFAULT_TIMEOUT_MS = 10000;
    private final ConsumingSegmentInfoReader _consumingSegmentInfoReader;

    /* loaded from: input_file:org/apache/pinot/controller/helix/RealtimeConsumerMonitor$Context.class */
    public static final class Context {
    }

    @VisibleForTesting
    public RealtimeConsumerMonitor(ControllerConf controllerConf, PinotHelixResourceManager pinotHelixResourceManager, LeadControllerManager leadControllerManager, ControllerMetrics controllerMetrics, ConsumingSegmentInfoReader consumingSegmentInfoReader) {
        super("RealtimeConsumerMonitor", controllerConf.getRealtimeConsumerMonitorRunFrequency(), controllerConf.getRealtimeConsumerMonitorInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager, controllerMetrics);
        this._consumingSegmentInfoReader = consumingSegmentInfoReader;
    }

    public RealtimeConsumerMonitor(ControllerConf controllerConf, PinotHelixResourceManager pinotHelixResourceManager, LeadControllerManager leadControllerManager, ControllerMetrics controllerMetrics, ExecutorService executorService) {
        this(controllerConf, pinotHelixResourceManager, leadControllerManager, controllerMetrics, new ConsumingSegmentInfoReader(executorService, new BasicHttpClientConnectionManager(), pinotHelixResourceManager));
    }

    protected void setUpTask() {
        LOGGER.info("Setting up RealtimeConsumerMonitor task");
    }

    @Override // org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask
    protected void processTable(String str) {
        if (TableType.REALTIME.equals(TableNameBuilder.getTableTypeFromTableName(str))) {
            try {
                ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap consumingSegmentsInfo = this._consumingSegmentInfoReader.getConsumingSegmentsInfo(str, 10000);
                HashMap hashMap = new HashMap();
                HashMap hashMap2 = new HashMap();
                Iterator<List<ConsumingSegmentInfoReader.ConsumingSegmentInfo>> it = consumingSegmentsInfo._segmentToConsumingInfoMap.values().iterator();
                while (it.hasNext()) {
                    it.next().forEach(consumingSegmentInfo -> {
                        consumingSegmentInfo._partitionOffsetInfo._recordsLagMap.forEach((str2, str3) -> {
                            if ("NOT_CALCULATED".equals(str3)) {
                                return;
                            }
                            try {
                                ((List) hashMap.computeIfAbsent(str2, str2 -> {
                                    return new ArrayList();
                                })).add(Long.valueOf(Long.parseLong(str3)));
                            } catch (NumberFormatException e) {
                            }
                        });
                        consumingSegmentInfo._partitionOffsetInfo._availabilityLagMap.forEach((str4, str5) -> {
                            if ("NOT_CALCULATED".equals(str5)) {
                                return;
                            }
                            try {
                                ((List) hashMap2.computeIfAbsent(str4, str4 -> {
                                    return new ArrayList();
                                })).add(Long.valueOf(Long.parseLong(str5)));
                            } catch (NumberFormatException e) {
                            }
                        });
                    });
                }
                hashMap.forEach((str2, list) -> {
                    this._controllerMetrics.setValueOfPartitionGauge(str, Integer.parseInt(str2), ControllerGauge.MAX_RECORDS_LAG, ((Long) Collections.max(list)).longValue());
                });
                hashMap2.forEach((str3, list2) -> {
                    this._controllerMetrics.setValueOfPartitionGauge(str, Integer.parseInt(str3), ControllerGauge.MAX_RECORD_AVAILABILITY_LAG_MS, ((Long) Collections.max(list2)).longValue());
                });
            } catch (Exception e) {
                LOGGER.error("Failed to fetch consuming segments info. Unable to update table consumption status metrics");
            }
        }
    }
}
