package org.apache.pinot.integration.tests;

import com.google.common.base.Preconditions;
import groovyjarjarantlr.Version;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import org.apache.avro.reflect.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.helix.model.ExternalView;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.spi.config.table.CompletionConfig;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.BasePinotFS;
import org.apache.pinot.spi.filesystem.LocalPinotFS;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.class */
public class PeerDownloadLLCRealtimeClusterIntegrationTest extends BaseRealtimeClusterIntegrationTest {
    private static final String CONSUMER_DIRECTORY = "/tmp/consumer-test";
    private static final int NUM_SERVERS = 2;
    public static final int UPLOAD_FAILURE_MOD = 5;
    private final boolean _isDirectAlloc = true;
    private final boolean _isConsumerDirConfigured = true;
    private final boolean _enableSplitCommit = true;
    private final boolean _enableLeadControllerResource = RANDOM.nextBoolean();
    private static File _pinotFsRootDir;
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) PeerDownloadLLCRealtimeClusterIntegrationTest.class);
    private static final long RANDOM_SEED = System.currentTimeMillis();
    private static final Random RANDOM = new Random(RANDOM_SEED);

    /* loaded from: input_file:org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest$MockPinotFS.class */
    public static class MockPinotFS extends BasePinotFS {
        LocalPinotFS _localPinotFS = new LocalPinotFS();
        File _basePath;

        @Override // org.apache.pinot.spi.filesystem.PinotFS
        public void init(PinotConfiguration pinotConfiguration) {
            this._localPinotFS.init(pinotConfiguration);
            this._basePath = PeerDownloadLLCRealtimeClusterIntegrationTest._pinotFsRootDir;
        }

        @Override // org.apache.pinot.spi.filesystem.PinotFS
        public boolean mkdir(URI uri) throws IOException {
            try {
                return this._localPinotFS.mkdir(new URI(this._basePath + uri.getPath()));
            } catch (URISyntaxException e) {
                throw new IllegalArgumentException(e.getMessage());
            }
        }

        @Override // org.apache.pinot.spi.filesystem.PinotFS
        public boolean delete(URI uri, boolean z) throws IOException {
            try {
                return this._localPinotFS.delete(new URI(this._basePath + uri.getPath()), z);
            } catch (URISyntaxException e) {
                throw new IllegalArgumentException(e.getMessage());
            }
        }

        @Override // org.apache.pinot.spi.filesystem.BasePinotFS
        public boolean doMove(URI uri, URI uri2) throws IOException {
            try {
                PeerDownloadLLCRealtimeClusterIntegrationTest.LOGGER.warn("Moving from {} to {}", uri, uri2);
                return this._localPinotFS.doMove(new URI(this._basePath + uri.getPath()), new URI(this._basePath + uri2.getPath()));
            } catch (URISyntaxException e) {
                throw new IllegalArgumentException(e.getMessage());
            }
        }

        @Override // org.apache.pinot.spi.filesystem.PinotFS
        public boolean copyDir(URI uri, URI uri2) throws IOException {
            try {
                return this._localPinotFS.copyDir(new URI(this._basePath + uri.getPath()), new URI(this._basePath + uri2.getPath()));
            } catch (URISyntaxException e) {
                throw new IllegalArgumentException(e.getMessage());
            }
        }

        @Override // org.apache.pinot.spi.filesystem.PinotFS
        public boolean exists(URI uri) throws IOException {
            try {
                return this._localPinotFS.exists(new URI(this._basePath + uri.getPath()));
            } catch (URISyntaxException e) {
                throw new IllegalArgumentException(e.getMessage());
            }
        }

        @Override // org.apache.pinot.spi.filesystem.PinotFS
        public long length(URI uri) throws IOException {
            try {
                return this._localPinotFS.length(new URI(this._basePath + uri.getPath()));
            } catch (URISyntaxException e) {
                throw new IllegalArgumentException(e.getMessage());
            }
        }

        @Override // org.apache.pinot.spi.filesystem.PinotFS
        public String[] listFiles(URI uri, boolean z) throws IOException {
            try {
                return this._localPinotFS.listFiles(new URI(this._basePath + uri.getPath()), z);
            } catch (URISyntaxException e) {
                throw new IllegalArgumentException(e.getMessage());
            }
        }

        @Override // org.apache.pinot.spi.filesystem.PinotFS
        public void copyToLocalFile(URI uri, File file) throws Exception {
            this._localPinotFS.copyToLocalFile(new URI(this._basePath + uri.getPath()), file);
        }

        @Override // org.apache.pinot.spi.filesystem.PinotFS
        public void copyFromLocalFile(File file, URI uri) throws Exception {
            if (new LLCSegmentName(file.getName()).getSequenceNumber() % 5 == 0) {
                throw new IllegalArgumentException(file.getAbsolutePath());
            }
            try {
                this._localPinotFS.copyFromLocalFile(file, new URI(this._basePath + uri.getPath()));
            } catch (URISyntaxException e) {
                throw new IllegalArgumentException(e.getMessage());
            }
        }

        @Override // org.apache.pinot.spi.filesystem.PinotFS
        public boolean isDirectory(URI uri) throws IOException {
            try {
                return this._localPinotFS.isDirectory(new URI(this._basePath + uri.getPath()));
            } catch (URISyntaxException e) {
                throw new IllegalArgumentException(e.getMessage());
            }
        }

        @Override // org.apache.pinot.spi.filesystem.PinotFS
        public long lastModified(URI uri) throws IOException {
            try {
                return this._localPinotFS.lastModified(new URI(this._basePath + uri.getPath()));
            } catch (URISyntaxException e) {
                throw new IllegalArgumentException(e.getMessage());
            }
        }

        @Override // org.apache.pinot.spi.filesystem.PinotFS
        public boolean touch(URI uri) throws IOException {
            try {
                return this._localPinotFS.touch(new URI(this._basePath + uri.getPath()));
            } catch (URISyntaxException e) {
                throw new IllegalArgumentException(e.getMessage());
            }
        }

        @Override // org.apache.pinot.spi.filesystem.PinotFS
        public InputStream open(URI uri) throws IOException {
            try {
                return this._localPinotFS.open(new URI(this._basePath + uri.getPath()));
            } catch (URISyntaxException e) {
                throw new IllegalArgumentException(e.getMessage());
            }
        }
    }

    @Override // org.apache.pinot.integration.tests.BaseRealtimeClusterIntegrationTest
    @BeforeClass
    public void setUp() throws Exception {
        System.out.println(String.format("Using random seed: %s, isDirectAlloc: %s, isConsumerDirConfigured: %s, enableSplitCommit: %s, enableLeadControllerResource: %s", Long.valueOf(RANDOM_SEED), true, true, true, Boolean.valueOf(this._enableLeadControllerResource)));
        _pinotFsRootDir = new File(FileUtils.getTempDirectoryPath() + File.separator + System.currentTimeMillis() + "/");
        Preconditions.checkState(_pinotFsRootDir.mkdir(), "Failed to make a dir for " + _pinotFsRootDir.getPath());
        File file = new File(CONSUMER_DIRECTORY);
        if (file.exists()) {
            FileUtils.deleteDirectory(file);
        }
        super.setUp();
    }

    @Override // org.apache.pinot.integration.tests.BaseRealtimeClusterIntegrationTest
    @AfterClass
    public void tearDown() throws Exception {
        FileUtils.deleteDirectory(new File(CONSUMER_DIRECTORY));
        super.tearDown();
    }

    @Override // org.apache.pinot.integration.tests.ClusterTest
    public void startServer() throws Exception {
        startServers(2);
    }

    @Override // org.apache.pinot.controller.helix.ControllerTest
    public void addTableConfig(TableConfig tableConfig) throws IOException {
        SegmentsValidationAndRetentionConfig segmentsValidationAndRetentionConfig = new SegmentsValidationAndRetentionConfig();
        segmentsValidationAndRetentionConfig.setCompletionConfig(new CompletionConfig("DOWNLOAD"));
        segmentsValidationAndRetentionConfig.setReplicasPerPartition(String.valueOf(2));
        segmentsValidationAndRetentionConfig.setPeerSegmentDownloadScheme("http");
        tableConfig.setValidationConfig(segmentsValidationAndRetentionConfig);
        tableConfig.getValidationConfig().setTimeColumnName(getTimeColumnName());
        sendPostRequest(this._controllerRequestURLBuilder.forTableCreate(), tableConfig.toJsonString());
    }

    @Override // org.apache.pinot.controller.helix.ControllerTest
    public void startController() throws Exception {
        Map<String, Object> defaultControllerConfiguration = getDefaultControllerConfiguration();
        defaultControllerConfiguration.put(ControllerConf.ALLOW_HLC_TABLES, false);
        defaultControllerConfiguration.put(ControllerConf.ENABLE_SPLIT_COMMIT, true);
        defaultControllerConfiguration.put(ControllerConf.DATA_DIR, "mockfs://" + getHelixClusterName());
        defaultControllerConfiguration.put(ControllerConf.LOCAL_TEMP_DIR, FileUtils.getTempDirectory().getAbsolutePath());
        defaultControllerConfiguration.put("pinot.controller.storage.factory.class.mockfs", "org.apache.pinot.integration.tests.PeerDownloadLLCRealtimeClusterIntegrationTest$MockPinotFS");
        startController(defaultControllerConfiguration);
        enableResourceConfigForLeadControllerResource(this._enableLeadControllerResource);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTest
    @Nullable
    public String getLoadMode() {
        return "MMAP";
    }

    @Override // org.apache.pinot.integration.tests.BaseRealtimeClusterIntegrationTest, org.apache.pinot.integration.tests.ClusterTest
    protected void overrideServerConf(PinotConfiguration pinotConfiguration) {
        pinotConfiguration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION, true);
        pinotConfiguration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_DIRECT_ALLOCATION, true);
        pinotConfiguration.setProperty("pinot.server.storage.factory.class.mockfs", "org.apache.pinot.integration.tests.PeerDownloadLLCRealtimeClusterIntegrationTest$MockPinotFS");
        pinotConfiguration.setProperty("pinot.server.instance.segment.store.uri", "mockfs://" + getHelixClusterName());
        pinotConfiguration.setProperty("pinot.server.segment.fetcher.protocols", "file,http");
        pinotConfiguration.setProperty(CommonConstants.Server.CONFIG_OF_CONSUMER_DIR, CONSUMER_DIRECTORY);
        pinotConfiguration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_SPLIT_COMMIT, true);
        pinotConfiguration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_COMMIT_END_WITH_METADATA, true);
    }

    @Test
    public void testConsumerDirectoryExists() {
        Assert.assertEquals(new File(CONSUMER_DIRECTORY, "mytable_REALTIME").exists(), true, "The off heap consumer directory does not exist");
    }

    @Test
    public void testSegmentFlushSize() {
        Iterator<SegmentZKMetadata> it2 = ZKMetadataProvider.getSegmentsZKMetadata(this._propertyStore, TableNameBuilder.REALTIME.tableNameWithType(getTableName())).iterator();
        while (it2.hasNext()) {
            Assert.assertEquals(it2.next().getSizeThresholdToFlushSegment(), getRealtimeSegmentFlushSize() / getNumKafkaPartitions());
        }
    }

    @Test
    public void testSegmentDownloadURLs() {
        for (SegmentZKMetadata segmentZKMetadata : ZKMetadataProvider.getSegmentsZKMetadata(this._propertyStore, TableNameBuilder.REALTIME.tableNameWithType(getTableName()))) {
            String downloadUrl = segmentZKMetadata.getDownloadUrl();
            if (segmentZKMetadata.getTotalDocs() < 0) {
                Assert.assertNull(downloadUrl);
            } else if (new LLCSegmentName(segmentZKMetadata.getSegmentName()).getSequenceNumber() % 5 == 0) {
                Assert.assertTrue(downloadUrl.isEmpty());
            } else {
                Assert.assertTrue(downloadUrl.startsWith("mockfs://"));
            }
        }
    }

    @Test
    public void testAllSegmentsAreOnlineOrConsuming() {
        ExternalView externalViewForResource = HelixHelper.getExternalViewForResource(this._helixAdmin, getHelixClusterName(), TableNameBuilder.REALTIME.tableNameWithType(getTableName()));
        Assert.assertEquals(Version.version, externalViewForResource.getReplicas());
        Iterator<String> it2 = externalViewForResource.getPartitionSet().iterator();
        while (it2.hasNext()) {
            Map<String, String> stateMap = externalViewForResource.getStateMap(it2.next());
            Assert.assertEquals(2, stateMap.size());
            for (Map.Entry<String, String> entry : stateMap.entrySet()) {
                Assert.assertTrue("ONLINE".equalsIgnoreCase(entry.getValue()) || "CONSUMING".equalsIgnoreCase(entry.getValue()));
            }
        }
    }

    @Test(expectedExceptions = {IOException.class})
    public void testAddHLCTableShouldFail() throws IOException {
        sendPostRequest(this._controllerRequestURLBuilder.forTableCreate(), new TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setStreamConfigs(Collections.singletonMap("stream.kafka.consumer.type", "HIGHLEVEL")).build().toJsonString());
    }
}
