package org.apache.pinot.controller.cursors;

import com.fasterxml.jackson.core.type.TypeReference;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hc.client5.http.classic.methods.HttpDelete;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.common.http.MultiHttpRequest;
import org.apache.pinot.common.http.MultiHttpRequestResponse;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.response.CursorResponse;
import org.apache.pinot.common.response.broker.CursorResponseNative;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.api.resources.InstanceInfo;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.TimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/controller/cursors/ResponseStoreCleaner.class */
public class ResponseStoreCleaner extends ControllerPeriodicTask<Void> {
    private static final Logger LOGGER = LoggerFactory.getLogger(ResponseStoreCleaner.class);
    private static final int TIMEOUT_MS = 3000;
    private static final String QUERY_RESULT_STORE = "%s://%s:%d/responseStore";
    private static final String DELETE_QUERY_RESULT = "%s://%s:%d/responseStore/%s";
    public static final String CLEAN_AT_TIME = "response.store.cleaner.clean.at.ms";
    private final ControllerConf _controllerConf;
    private final Executor _executor;
    private final PoolingHttpClientConnectionManager _connectionManager;
    private final AuthProvider _authProvider;

    public ResponseStoreCleaner(ControllerConf controllerConf, PinotHelixResourceManager pinotHelixResourceManager, LeadControllerManager leadControllerManager, ControllerMetrics controllerMetrics, Executor executor, PoolingHttpClientConnectionManager poolingHttpClientConnectionManager) {
        super("ResponseStoreCleaner", getFrequencyInSeconds(controllerConf), getInitialDelayInSeconds(controllerConf), pinotHelixResourceManager, leadControllerManager, controllerMetrics);
        this._controllerConf = controllerConf;
        this._executor = executor;
        this._connectionManager = poolingHttpClientConnectionManager;
        this._authProvider = AuthProviderUtils.extractAuthProvider(controllerConf, ControllerConf.CONTROLLER_BROKER_AUTH_PREFIX);
    }

    private static long getInitialDelayInSeconds(ControllerConf controllerConf) {
        long periodicTaskInitialDelayInSeconds = controllerConf.getPeriodicTaskInitialDelayInSeconds();
        String property = controllerConf.getProperty("controller.cluster.response.store.cleaner.initialDelay");
        if (property != null) {
            periodicTaskInitialDelayInSeconds = TimeUnit.SECONDS.convert(TimeUtils.convertPeriodToMillis(property).longValue(), TimeUnit.MILLISECONDS);
        }
        return periodicTaskInitialDelayInSeconds;
    }

    private static long getFrequencyInSeconds(ControllerConf controllerConf) {
        long convert = TimeUnit.SECONDS.convert(TimeUtils.convertPeriodToMillis("1h").longValue(), TimeUnit.MILLISECONDS);
        String property = controllerConf.getProperty("controller.cluster.response.store.cleaner.frequencyPeriod");
        if (property != null) {
            convert = TimeUnit.SECONDS.convert(TimeUtils.convertPeriodToMillis(property).longValue(), TimeUnit.MILLISECONDS);
        }
        return convert;
    }

    @Override // org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask
    protected void processTables(List<String> list, Properties properties) {
        long currentTimeMillis = System.currentTimeMillis();
        String property = properties.getProperty(CLEAN_AT_TIME);
        if (property != null) {
            currentTimeMillis = Long.parseLong(property);
        }
        doClean(currentTimeMillis);
    }

    public void doClean(long j) {
        List<InstanceConfig> allBrokerInstanceConfigs = this._pinotHelixResourceManager.getAllBrokerInstanceConfigs();
        HashMap hashMap = new HashMap();
        for (InstanceConfig instanceConfig : allBrokerInstanceConfigs) {
            hashMap.put(getInstanceKey(instanceConfig.getHostName(), instanceConfig.getPort()), new InstanceInfo(instanceConfig.getInstanceName(), instanceConfig.getHostName(), Integer.valueOf(Integer.parseInt(instanceConfig.getPort()))));
        }
        try {
            Map<String, String> makeAuthHeadersMap = AuthProviderUtils.makeAuthHeadersMap(this._authProvider);
            Map<String, List<CursorResponseNative>> allQueryResults = getAllQueryResults(hashMap, makeAuthHeadersMap);
            String controllerBrokerProtocol = this._controllerConf.getControllerBrokerProtocol();
            int controllerBrokerPortOverride = this._controllerConf.getControllerBrokerPortOverride();
            ArrayList arrayList = new ArrayList();
            for (Map.Entry<String, List<CursorResponseNative>> entry : allQueryResults.entrySet()) {
                for (CursorResponse cursorResponse : entry.getValue()) {
                    if (cursorResponse.getExpirationTimeMs() <= j) {
                        InstanceInfo instanceInfo = hashMap.get(entry.getKey());
                        arrayList.add(String.format(DELETE_QUERY_RESULT, controllerBrokerProtocol, instanceInfo.getHost(), Integer.valueOf(controllerBrokerPortOverride > 0 ? controllerBrokerPortOverride : instanceInfo.getPort().intValue()), cursorResponse.getRequestId()));
                    }
                }
                getResponseMap(makeAuthHeadersMap, arrayList, "DELETE", HttpDelete::new).forEach((str, str2) -> {
                    LOGGER.info("ResponseStore delete response - Broker: {}. Response: {}", str, str2);
                });
            }
        } catch (Exception e) {
            LOGGER.error(e.getMessage());
        }
    }

    private Map<String, List<CursorResponseNative>> getAllQueryResults(Map<String, InstanceInfo> map, Map<String, String> map2) throws Exception {
        String controllerBrokerProtocol = this._controllerConf.getControllerBrokerProtocol();
        int controllerBrokerPortOverride = this._controllerConf.getControllerBrokerPortOverride();
        ArrayList arrayList = new ArrayList();
        for (InstanceInfo instanceInfo : map.values()) {
            arrayList.add(String.format(QUERY_RESULT_STORE, controllerBrokerProtocol, instanceInfo.getHost(), Integer.valueOf(controllerBrokerPortOverride > 0 ? controllerBrokerPortOverride : instanceInfo.getPort().intValue())));
        }
        LOGGER.debug("Getting running queries via broker urls: {}", arrayList);
        return (Map) getResponseMap(map2, arrayList, "GET", HttpGet::new).entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            try {
                return (List) JsonUtils.stringToObject((String) entry.getValue(), new TypeReference<List<CursorResponseNative>>() { // from class: org.apache.pinot.controller.cursors.ResponseStoreCleaner.1
                });
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }));
    }

    private <T extends HttpUriRequestBase> Map<String, String> getResponseMap(Map<String, String> map, List<String> list, String str, Function<String, T> function) throws Exception {
        MultiHttpRequestResponse multiHttpRequestResponse;
        URI uri;
        int code;
        String entityUtils;
        ArrayList arrayList = new ArrayList(list.size());
        list.forEach(str2 -> {
            arrayList.add(Pair.of(str2, ""));
        });
        CompletionService execute = new MultiHttpRequest(this._executor, this._connectionManager).execute(arrayList, map, TIMEOUT_MS, str, function);
        HashMap hashMap = new HashMap();
        ArrayList arrayList2 = new ArrayList(list.size());
        for (int i = 0; i < list.size(); i++) {
            try {
                multiHttpRequestResponse = (MultiHttpRequestResponse) execute.take().get();
                try {
                    uri = multiHttpRequestResponse.getURI();
                    code = multiHttpRequestResponse.getResponse().getCode();
                    entityUtils = EntityUtils.toString(multiHttpRequestResponse.getResponse().getEntity());
                } finally {
                }
            } catch (Exception e) {
                LOGGER.error("Failed to execute {} op. ", str, e);
                arrayList2.add(e.getMessage());
            }
            if (code != 200) {
                throw new Exception(String.format("Unexpected status=%d and response='%s' from uri='%s'", Integer.valueOf(code), entityUtils, uri));
                break;
            }
            hashMap.put(getInstanceKey(uri.getHost(), Integer.toString(uri.getPort())), entityUtils);
            if (multiHttpRequestResponse != null) {
                multiHttpRequestResponse.close();
            }
        }
        if (arrayList2.isEmpty()) {
            return hashMap;
        }
        throw new Exception("Unexpected responses from brokers: " + StringUtils.join(arrayList2, ","));
    }

    private static String getInstanceKey(String str, String str2) {
        return str + ":" + str2;
    }
}
