package org.apache.pinot.core.data.manager.realtime;

import com.google.common.base.Preconditions;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.NamedThreadFactory;
import org.apache.pinot.common.utils.SegmentName;
import org.apache.pinot.common.utils.SegmentUtils;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.core.data.manager.BaseTableDataManager;
import org.apache.pinot.core.util.PeerServerSegmentFinder;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory;
import org.apache.pinot.segment.local.upsert.PartialUpsertHandler;
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager;
import org.apache.pinot.segment.local.utils.SchemaUtils;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.CommonConstants;

@ThreadSafe
/* loaded from: input_file:org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.class */
public class RealtimeTableDataManager extends BaseTableDataManager {
    private SegmentBuildTimeLeaseExtender _leaseExtender;
    private RealtimeSegmentStatsHistory _statsHistory;
    private final Semaphore _segmentBuildSemaphore;
    private static final String STATS_FILE_NAME = "segment-stats.ser";
    private static final String CONSUMERS_DIR = "consumers";
    private static final int MIN_INTERVAL_BETWEEN_STATS_UPDATES_MINUTES = 30;
    private UpsertConfig.Mode _upsertMode;
    private TableUpsertMetadataManager _tableUpsertMetadataManager;
    private List<String> _primaryKeyColumns;
    private String _upsertComparisonColumn;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final ExecutorService _segmentAsyncExecutorService = Executors.newSingleThreadExecutor(new NamedThreadFactory("SegmentAsyncExecutorService"));
    private final Map<Integer, Semaphore> _partitionGroupIdToSemaphoreMap = new ConcurrentHashMap();

    public RealtimeTableDataManager(Semaphore semaphore) {
        this._segmentBuildSemaphore = semaphore;
    }

    @Override // org.apache.pinot.core.data.manager.BaseTableDataManager
    protected void doInit() {
        this._leaseExtender = SegmentBuildTimeLeaseExtender.getOrCreate(this._instanceId, this._serverMetrics, this._tableNameWithType);
        File file = new File(this._tableDataDir, STATS_FILE_NAME);
        try {
            this._statsHistory = RealtimeSegmentStatsHistory.deserialzeFrom(file);
        } catch (IOException | ClassNotFoundException e) {
            this._logger.error("Error reading history object for table {} from {}", this._tableNameWithType, file.getAbsolutePath(), e);
            File file2 = new File(this._tableDataDir, "segment-stats.ser." + UUID.randomUUID());
            try {
                FileUtils.moveFile(file, file2);
                this._logger.warn("Saved unreadable {} into {}. Creating a fresh instance", file.getAbsolutePath(), file2.getAbsolutePath());
                try {
                    this._statsHistory = RealtimeSegmentStatsHistory.deserialzeFrom(file);
                } catch (Exception e2) {
                    Utils.rethrowException(e2);
                }
            } catch (IOException e3) {
                this._logger.error("Could not move {} to {}", file.getAbsolutePath(), file2.getAbsolutePath(), e3);
                throw new RuntimeException(e);
            }
        }
        this._statsHistory.setMinIntervalBetweenUpdatesMillis(TimeUnit.MILLISECONDS.convert(30L, TimeUnit.MINUTES));
        File file3 = new File(getConsumerDir());
        TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, this._tableNameWithType);
        Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", this._tableNameWithType);
        this._upsertMode = tableConfig.getUpsertMode();
        if (isUpsertEnabled()) {
            UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
            if (!$assertionsDisabled && upsertConfig == null) {
                throw new AssertionError();
            }
            Schema tableSchema = ZKMetadataProvider.getTableSchema(this._propertyStore, this._tableNameWithType);
            Preconditions.checkState(tableSchema != null, "Failed to find schema for table: %s", this._tableNameWithType);
            PartialUpsertHandler partialUpsertHandler = null;
            if (isPartialUpsertEnabled()) {
                String comparisonColumn = upsertConfig.getComparisonColumn();
                if (comparisonColumn == null) {
                    comparisonColumn = tableConfig.getValidationConfig().getTimeColumnName();
                }
                partialUpsertHandler = new PartialUpsertHandler(this._helixManager, this._tableNameWithType, tableSchema, upsertConfig.getPartialUpsertStrategies(), upsertConfig.getDefaultPartialUpsertStrategy(), comparisonColumn);
            }
            this._tableUpsertMetadataManager = new TableUpsertMetadataManager(this._tableNameWithType, this._serverMetrics, partialUpsertHandler, upsertConfig.getHashFunction());
            this._primaryKeyColumns = tableSchema.getPrimaryKeyColumns();
            Preconditions.checkState(!CollectionUtils.isEmpty(this._primaryKeyColumns), "Primary key columns must be configured for upsert");
            String comparisonColumn2 = upsertConfig.getComparisonColumn();
            this._upsertComparisonColumn = comparisonColumn2 != null ? comparisonColumn2 : tableConfig.getValidationConfig().getTimeColumnName();
        }
        if (file3.exists()) {
            for (File file4 : file3.listFiles(new FilenameFilter() { // from class: org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager.1
                @Override // java.io.FilenameFilter
                public boolean accept(File file5, String str) {
                    return !str.equals(RealtimeTableDataManager.STATS_FILE_NAME);
                }
            })) {
                if (file4.delete()) {
                    this._logger.info("Deleted old file {}", file4.getAbsolutePath());
                } else {
                    this._logger.error("Cannot delete file {}", file4.getAbsolutePath());
                }
            }
        }
    }

    @Override // org.apache.pinot.core.data.manager.BaseTableDataManager
    protected void doStart() {
    }

    @Override // org.apache.pinot.core.data.manager.BaseTableDataManager
    protected void doShutdown() {
        this._segmentAsyncExecutorService.shutdown();
        Iterator<SegmentDataManager> it2 = this._segmentDataManagerMap.values().iterator();
        while (it2.hasNext()) {
            it2.next().destroy();
        }
        if (this._leaseExtender != null) {
            this._leaseExtender.shutDown();
        }
    }

    public RealtimeSegmentStatsHistory getStatsHistory() {
        return this._statsHistory;
    }

    public Semaphore getSegmentBuildSemaphore() {
        return this._segmentBuildSemaphore;
    }

    public String getConsumerDir() {
        String consumerDir = this._tableDataManagerConfig.getConsumerDir();
        File file = consumerDir != null ? new File(consumerDir, this._tableNameWithType) : new File(this._tableDataDir + File.separator + "consumers");
        if (!file.exists() && !file.mkdirs()) {
            this._logger.error("Failed to create consumer directory {}", file.getAbsolutePath());
        }
        return file.getAbsolutePath();
    }

    public boolean isUpsertEnabled() {
        return this._upsertMode != UpsertConfig.Mode.NONE;
    }

    public boolean isPartialUpsertEnabled() {
        return this._upsertMode == UpsertConfig.Mode.PARTIAL;
    }

    @Override // org.apache.pinot.core.data.manager.BaseTableDataManager, org.apache.pinot.segment.local.data.manager.TableDataManager
    public void addSegment(String str, TableConfig tableConfig, IndexLoadingConfig indexLoadingConfig) throws Exception {
        RealtimeSegmentDataManager hLRealtimeSegmentDataManager;
        SegmentDataManager segmentDataManager = this._segmentDataManagerMap.get(str);
        if (segmentDataManager != null) {
            this._logger.warn("Skipping adding existing segment: {} for table: {} with data manager class: {}", str, this._tableNameWithType, segmentDataManager.getClass().getSimpleName());
            return;
        }
        SegmentZKMetadata segmentZKMetadata = ZKMetadataProvider.getSegmentZKMetadata(this._propertyStore, this._tableNameWithType, str);
        Preconditions.checkNotNull(segmentZKMetadata);
        Schema tableSchema = ZKMetadataProvider.getTableSchema(this._propertyStore, this._tableNameWithType);
        Preconditions.checkNotNull(tableSchema);
        File file = new File(this._indexDir, str);
        LoaderUtils.reloadFailureRecovery(file);
        boolean isHighLevelConsumerSegmentName = SegmentName.isHighLevelConsumerSegmentName(str);
        if (segmentZKMetadata.getStatus().isCompleted()) {
            if (file.exists()) {
                try {
                    addSegment(ImmutableSegmentLoader.load(file, indexLoadingConfig, tableSchema));
                    return;
                } catch (Exception e) {
                    if (isHighLevelConsumerSegmentName) {
                        throw new RuntimeException("Failed to load local HLC segment: " + str, e);
                    }
                    this._logger.error("Caught exception while loading segment: {}, downloading a new copy", str, e);
                    FileUtils.deleteQuietly(file);
                }
            } else if (isHighLevelConsumerSegmentName) {
                throw new RuntimeException("Failed to find local copy for committed HLC segment: " + str);
            }
            downloadAndReplaceSegment(str, segmentZKMetadata, indexLoadingConfig, tableConfig);
            return;
        }
        FileUtils.deleteQuietly(file);
        if (!isValid(tableSchema, tableConfig.getIndexingConfig())) {
            this._logger.error("Not adding segment {}", str);
            throw new RuntimeException("Mismatching schema/table config for " + this._tableNameWithType);
        }
        VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSegmentSchema(tableSchema, str);
        if (isHighLevelConsumerSegmentName) {
            hLRealtimeSegmentDataManager = new HLRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, ZKMetadataProvider.getInstanceZKMetadata(this._propertyStore, this._instanceId), this, this._indexDir.getAbsolutePath(), indexLoadingConfig, tableSchema, this._serverMetrics);
        } else {
            LLCSegmentName lLCSegmentName = new LLCSegmentName(str);
            int partitionGroupId = lLCSegmentName.getPartitionGroupId();
            hLRealtimeSegmentDataManager = new LLRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, this, this._indexDir.getAbsolutePath(), indexLoadingConfig, tableSchema, lLCSegmentName, this._partitionGroupIdToSemaphoreMap.computeIfAbsent(Integer.valueOf(partitionGroupId), num -> {
                return new Semaphore(1);
            }), this._serverMetrics, this._tableUpsertMetadataManager != null ? this._tableUpsertMetadataManager.getOrCreatePartitionManager(partitionGroupId) : null);
        }
        this._logger.info("Initialized RealtimeSegmentDataManager - " + str);
        this._segmentDataManagerMap.put(str, hLRealtimeSegmentDataManager);
        this._serverMetrics.addValueToTableGauge(this._tableNameWithType, ServerGauge.SEGMENT_COUNT, 1L);
    }

    @Override // org.apache.pinot.core.data.manager.BaseTableDataManager, org.apache.pinot.segment.local.data.manager.TableDataManager
    public void addSegment(ImmutableSegment immutableSegment) {
        if (isUpsertEnabled()) {
            handleUpsert((ImmutableSegmentImpl) immutableSegment);
        }
        super.addSegment(immutableSegment);
    }

    private void handleUpsert(ImmutableSegmentImpl immutableSegmentImpl) {
        String segmentName = immutableSegmentImpl.getSegmentName();
        Integer realtimeSegmentPartitionId = SegmentUtils.getRealtimeSegmentPartitionId(segmentName, this._tableNameWithType, this._helixManager, this._primaryKeyColumns.get(0));
        Preconditions.checkNotNull(realtimeSegmentPartitionId, String.format("PartitionGroupId is not available for segment: '%s' (upsert-enabled table: %s)", segmentName, this._tableNameWithType));
        PartitionUpsertMetadataManager orCreatePartitionManager = this._tableUpsertMetadataManager.getOrCreatePartitionManager(realtimeSegmentPartitionId.intValue());
        immutableSegmentImpl.enableUpsert(orCreatePartitionManager, new ThreadSafeMutableRoaringBitmap());
        final HashMap hashMap = new HashMap();
        for (String str : this._primaryKeyColumns) {
            hashMap.put(str, new PinotSegmentColumnReader(immutableSegmentImpl, str));
        }
        hashMap.put(this._upsertComparisonColumn, new PinotSegmentColumnReader(immutableSegmentImpl, this._upsertComparisonColumn));
        final int totalDocs = immutableSegmentImpl.getSegmentMetadata().getTotalDocs();
        final int size = this._primaryKeyColumns.size();
        orCreatePartitionManager.addSegment(immutableSegmentImpl, new Iterator<PartitionUpsertMetadataManager.RecordInfo>() { // from class: org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager.2
            private int _docId = 0;

            @Override // java.util.Iterator
            public boolean hasNext() {
                return this._docId < totalDocs;
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.Iterator
            public PartitionUpsertMetadataManager.RecordInfo next() {
                Object[] objArr = new Object[size];
                for (int i = 0; i < size; i++) {
                    Object value = ((PinotSegmentColumnReader) hashMap.get(RealtimeTableDataManager.this._primaryKeyColumns.get(i))).getValue(this._docId);
                    if (value instanceof byte[]) {
                        value = new ByteArray((byte[]) value);
                    }
                    objArr[i] = value;
                }
                PrimaryKey primaryKey = new PrimaryKey(objArr);
                Object value2 = ((PinotSegmentColumnReader) hashMap.get(RealtimeTableDataManager.this._upsertComparisonColumn)).getValue(this._docId);
                Preconditions.checkState(value2 instanceof Comparable, "Upsert comparison column: %s must be comparable", RealtimeTableDataManager.this._upsertComparisonColumn);
                int i2 = this._docId;
                this._docId = i2 + 1;
                return new PartitionUpsertMetadataManager.RecordInfo(primaryKey, i2, (Comparable) value2);
            }
        });
    }

    @Override // org.apache.pinot.core.data.manager.BaseTableDataManager
    protected boolean allowDownload(String str, SegmentZKMetadata segmentZKMetadata) {
        return (SegmentName.isHighLevelConsumerSegmentName(str) || segmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.IN_PROGRESS || "".equals(segmentZKMetadata.getDownloadUrl())) ? false : true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void downloadAndReplaceSegment(String str, SegmentZKMetadata segmentZKMetadata, IndexLoadingConfig indexLoadingConfig, TableConfig tableConfig) {
        String downloadUrl = segmentZKMetadata.getDownloadUrl();
        if ("".equals(downloadUrl)) {
            if (!isPeerSegmentDownloadEnabled(tableConfig)) {
                throw new RuntimeException("Peer segment download not enabled for segment " + str);
            }
            downloadSegmentFromPeer(str, tableConfig.getValidationConfig().getPeerSegmentDownloadScheme(), indexLoadingConfig);
            return;
        }
        try {
            downloadSegmentFromDeepStore(str, indexLoadingConfig, downloadUrl);
        } catch (Exception e) {
            this._logger.warn("Download segment {} from deepstore uri {} failed.", str, downloadUrl, e);
            if (!isPeerSegmentDownloadEnabled(tableConfig)) {
                throw e;
            }
            downloadSegmentFromPeer(str, tableConfig.getValidationConfig().getPeerSegmentDownloadScheme(), indexLoadingConfig);
        }
    }

    private void downloadSegmentFromDeepStore(String str, IndexLoadingConfig indexLoadingConfig, String str2) {
        File file = new File(this._indexDir, str + ".tar.gz");
        try {
            try {
                SegmentFetcherFactory.fetchSegmentToLocal(str2, file);
                this._logger.info("Downloaded file from {} to {}; Length of downloaded file: {}", str2, file, Long.valueOf(file.length()));
                untarAndMoveSegment(str, indexLoadingConfig, file);
                FileUtils.deleteQuietly(file);
            } catch (Exception e) {
                this._logger.warn("Failed to download segment {} from deep store: ", str, e);
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            FileUtils.deleteQuietly(file);
            throw th;
        }
    }

    private void untarAndMoveSegment(String str, IndexLoadingConfig indexLoadingConfig, File file) throws IOException {
        File tmpSegmentDataDir = getTmpSegmentDataDir("tmp-" + str + "." + System.currentTimeMillis());
        try {
            File file2 = TarGzCompressionUtils.untar(file, tmpSegmentDataDir).get(0);
            this._logger.info("Uncompressed file {} into tmp dir {}", file, tmpSegmentDataDir);
            File file3 = new File(this._indexDir, str);
            FileUtils.deleteQuietly(file3);
            FileUtils.moveDirectory(file2, file3);
            this._logger.info("Replacing LLC Segment {}", str);
            replaceLLSegment(str, indexLoadingConfig);
            FileUtils.deleteQuietly(tmpSegmentDataDir);
        } catch (Throwable th) {
            FileUtils.deleteQuietly(tmpSegmentDataDir);
            throw th;
        }
    }

    private boolean isPeerSegmentDownloadEnabled(TableConfig tableConfig) {
        return "http".equalsIgnoreCase(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme()) || CommonConstants.HTTPS_PROTOCOL.equalsIgnoreCase(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme());
    }

    private void downloadSegmentFromPeer(String str, String str2, IndexLoadingConfig indexLoadingConfig) {
        File file = new File(this._indexDir, str + ".tar.gz");
        try {
            try {
                List<URI> peerServerURIs = PeerServerSegmentFinder.getPeerServerURIs(str, str2, this._helixManager);
                SegmentFetcherFactory.getSegmentFetcher(str2).fetchSegmentToLocal(peerServerURIs, file);
                this._logger.info("Fetched segment {} from: {} to: {} of size: {}", str, peerServerURIs, file, Long.valueOf(file.length()));
                untarAndMoveSegment(str, indexLoadingConfig, file);
                FileUtils.deleteQuietly(file);
            } catch (Exception e) {
                this._logger.warn("Download and move segment {} from peer with scheme {} failed.", str, str2, e);
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            FileUtils.deleteQuietly(file);
            throw th;
        }
    }

    public void replaceHLSegment(SegmentZKMetadata segmentZKMetadata, IndexLoadingConfig indexLoadingConfig) throws Exception {
        ZKMetadataProvider.setSegmentZKMetadata(this._propertyStore, this._tableNameWithType, segmentZKMetadata);
        addSegment(ImmutableSegmentLoader.load(new File(this._indexDir, segmentZKMetadata.getSegmentName()), indexLoadingConfig, ZKMetadataProvider.getTableSchema(this._propertyStore, this._tableNameWithType)));
    }

    public void replaceLLSegment(String str, IndexLoadingConfig indexLoadingConfig) {
        try {
            addSegment(ImmutableSegmentLoader.load(new File(this._indexDir, str), indexLoadingConfig, ZKMetadataProvider.getTableSchema(this._propertyStore, this._tableNameWithType)));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public String getServerInstance() {
        return this._instanceId;
    }

    private boolean isValid(Schema schema, IndexingConfig indexingConfig) {
        List<String> sortedColumn = indexingConfig.getSortedColumn();
        boolean z = true;
        if (CollectionUtils.isNotEmpty(sortedColumn)) {
            String str = sortedColumn.get(0);
            if (sortedColumn.size() > 1) {
                this._logger.warn("More than one sorted column configured. Using {}", str);
            }
            if (!schema.getFieldSpecFor(str).isSingleValueField()) {
                this._logger.error("Cannot configure multi-valued column {} as sorted column", str);
                z = false;
            }
        }
        try {
            SchemaUtils.validate(schema);
        } catch (Exception e) {
            this._logger.error("Caught exception while validating schema: {}", schema.getSchemaName(), e);
            z = false;
        }
        return z;
    }

    static {
        $assertionsDisabled = !RealtimeTableDataManager.class.desiredAssertionStatus();
    }
}
