package org.apache.pinot.segment.local.upsert;

import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.collections.CollectionUtils;
import org.apache.helix.HelixManager;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.$internal.com.google.common.annotations.VisibleForTesting;
import org.apache.pinot.$internal.com.google.common.base.Preconditions;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.upsert.UpsertContext;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.class */
public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetadataManager {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) BaseTableUpsertMetadataManager.class);
    protected String _tableNameWithType;
    protected TableDataManager _tableDataManager;
    protected HelixManager _helixManager;
    protected ExecutorService _segmentPreloadExecutor;
    protected UpsertContext _context;
    private volatile boolean _isPreloading = false;

    @Override // org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager
    public void init(TableConfig tableConfig, Schema schema, TableDataManager tableDataManager, HelixManager helixManager, @Nullable ExecutorService executorService) {
        this._tableNameWithType = tableConfig.getTableName();
        this._tableDataManager = tableDataManager;
        this._helixManager = helixManager;
        this._segmentPreloadExecutor = executorService;
        UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
        Preconditions.checkArgument((upsertConfig == null || upsertConfig.getMode() == UpsertConfig.Mode.NONE) ? false : true, "Upsert must be enabled for table: %s", this._tableNameWithType);
        List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
        Preconditions.checkArgument(!CollectionUtils.isEmpty(primaryKeyColumns), "Primary key columns must be configured for upsert enabled table: %s", this._tableNameWithType);
        List<String> comparisonColumns = upsertConfig.getComparisonColumns();
        if (comparisonColumns == null) {
            comparisonColumns = Collections.singletonList(tableConfig.getValidationConfig().getTimeColumnName());
        }
        PartialUpsertHandler partialUpsertHandler = null;
        if (upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL) {
            Map<String, UpsertConfig.Strategy> partialUpsertStrategies = upsertConfig.getPartialUpsertStrategies();
            Preconditions.checkArgument(partialUpsertStrategies != null, "Partial-upsert strategies must be configured for partial-upsert enabled table: %s", this._tableNameWithType);
            partialUpsertHandler = new PartialUpsertHandler(schema, partialUpsertStrategies, upsertConfig.getDefaultPartialUpsertStrategy(), comparisonColumns);
        }
        String deleteRecordColumn = upsertConfig.getDeleteRecordColumn();
        HashFunction hashFunction = upsertConfig.getHashFunction();
        boolean isEnableSnapshot = upsertConfig.isEnableSnapshot();
        boolean isEnablePreload = upsertConfig.isEnablePreload();
        double metadataTTL = upsertConfig.getMetadataTTL();
        double deletedKeysTTL = upsertConfig.getDeletedKeysTTL();
        File tableDataDir = tableDataManager.getTableDataDir();
        this._context = new UpsertContext.Builder().setTableConfig(tableConfig).setSchema(schema).setPrimaryKeyColumns(primaryKeyColumns).setComparisonColumns(comparisonColumns).setDeleteRecordColumn(deleteRecordColumn).setHashFunction(hashFunction).setPartialUpsertHandler(partialUpsertHandler).setEnableSnapshot(isEnableSnapshot).setEnablePreload(isEnablePreload).setMetadataTTL(metadataTTL).setDeletedKeysTTL(deletedKeysTTL).setTableIndexDir(tableDataDir).build();
        LOGGER.info("Initialized {} for table: {} with primary key columns: {}, comparison columns: {}, delete record column: {}, hash function: {}, upsert mode: {}, enable snapshot: {}, enable preload: {}, metadata TTL: {}, deleted Keys TTL: {}, table index dir: {}", getClass().getSimpleName(), this._tableNameWithType, primaryKeyColumns, comparisonColumns, deleteRecordColumn, hashFunction, upsertConfig.getMode(), Boolean.valueOf(isEnableSnapshot), Boolean.valueOf(isEnablePreload), Double.valueOf(metadataTTL), Double.valueOf(deletedKeysTTL), tableDataDir);
        initCustomVariables();
        if (isEnableSnapshot && isEnablePreload) {
            try {
                if (executorService != null) {
                    try {
                        this._isPreloading = true;
                        preloadSegments();
                        this._isPreloading = false;
                    } catch (Exception e) {
                        LOGGER.warn("Failed to preload segments from table: {}, skipping", this._tableNameWithType, e);
                        if (e instanceof InterruptedException) {
                            Thread.currentThread().interrupt();
                        }
                        this._isPreloading = false;
                    }
                }
            } catch (Throwable th) {
                this._isPreloading = false;
                throw th;
            }
        }
    }

    protected void initCustomVariables() {
    }

    private void preloadSegments() throws Exception {
        LOGGER.info("Preload segments from table: {} for fast upsert metadata recovery", this._tableNameWithType);
        if (onPreloadStart()) {
            ZkHelixPropertyStore<ZNRecord> helixPropertyStore = this._helixManager.getHelixPropertyStore();
            String instanceId = getInstanceId();
            IndexLoadingConfig createIndexLoadingConfig = createIndexLoadingConfig();
            Map<String, Map<String, String>> segmentAssignment = getSegmentAssignment();
            ArrayList<Future> arrayList = new ArrayList();
            for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) {
                String key = entry.getKey();
                String str = entry.getValue().get(instanceId);
                if ("ONLINE".equals(str)) {
                    arrayList.add(this._segmentPreloadExecutor.submit(() -> {
                        try {
                            preloadSegment(key, createIndexLoadingConfig, helixPropertyStore);
                        } catch (Exception e) {
                            LOGGER.warn("Failed to preload segment: {} from table: {}, skipping", key, this._tableNameWithType, e);
                        }
                    }));
                } else {
                    LOGGER.info("Skip segment: {} as its ideal state: {} is not ONLINE", key, str);
                }
            }
            try {
                Iterator it2 = arrayList.iterator();
                while (it2.hasNext()) {
                    ((Future) it2.next()).get();
                }
                onPreloadFinish();
                LOGGER.info("Preloaded segments from table: {} for fast upsert metadata recovery", this._tableNameWithType);
            } finally {
                for (Future future : arrayList) {
                    if (!future.isDone()) {
                        future.cancel(true);
                    }
                }
            }
        }
    }

    protected boolean onPreloadStart() {
        return true;
    }

    protected void onPreloadFinish() {
    }

    @VisibleForTesting
    String getInstanceId() {
        return this._tableDataManager.getInstanceDataManagerConfig().getInstanceId();
    }

    @VisibleForTesting
    IndexLoadingConfig createIndexLoadingConfig() {
        return new IndexLoadingConfig(this._tableDataManager.getInstanceDataManagerConfig(), this._context.getTableConfig(), this._context.getSchema());
    }

    @VisibleForTesting
    Map<String, Map<String, String>> getSegmentAssignment() {
        IdealState tableIdealState = HelixHelper.getTableIdealState(this._helixManager, this._tableNameWithType);
        Preconditions.checkState(tableIdealState != null, "Failed to find ideal state for table: %s", this._tableNameWithType);
        return tableIdealState.getRecord().getMapFields();
    }

    private void preloadSegment(String str, IndexLoadingConfig indexLoadingConfig, ZkHelixPropertyStore<ZNRecord> zkHelixPropertyStore) {
        LOGGER.info("Preload segment: {} from table: {}", str, this._tableNameWithType);
        SegmentZKMetadata segmentZKMetadata = ZKMetadataProvider.getSegmentZKMetadata(zkHelixPropertyStore, this._tableNameWithType, str);
        Preconditions.checkState(segmentZKMetadata != null, "Failed to find ZK metadata for segment: %s, table: %s", str, this._tableNameWithType);
        if (!hasValidDocIdsSnapshot(str, segmentZKMetadata.getTier())) {
            LOGGER.info("Skip segment: {} as no validDocIds snapshot exists", str);
        } else {
            preloadSegmentWithSnapshot(str, indexLoadingConfig, segmentZKMetadata);
            LOGGER.info("Preloaded segment: {} from table: {}", str, this._tableNameWithType);
        }
    }

    @VisibleForTesting
    void preloadSegmentWithSnapshot(String str, IndexLoadingConfig indexLoadingConfig, SegmentZKMetadata segmentZKMetadata) {
        this._tableDataManager.tryLoadExistingSegment(str, indexLoadingConfig, segmentZKMetadata);
    }

    private boolean hasValidDocIdsSnapshot(String str, String str2) {
        try {
            return new File(SegmentDirectoryPaths.findSegmentDirectory(this._tableDataManager.getSegmentDataDir(str, str2, this._context.getTableConfig())), V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME).exists();
        } catch (Exception e) {
            return false;
        }
    }

    @Override // org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager
    public boolean isPreloading() {
        return this._isPreloading;
    }

    @Override // org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager
    public UpsertConfig.Mode getUpsertMode() {
        return this._context.getPartialUpsertHandler() == null ? UpsertConfig.Mode.FULL : UpsertConfig.Mode.PARTIAL;
    }
}
