package org.apache.pinot.core.operator.combine;

import io.grpc.stub.StreamObserver;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
import org.apache.pinot.core.plan.CombinePlanNode;
import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
import org.apache.pinot.core.plan.maker.PlanMaker;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.stream.RowMetadata;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.class */
public class SelectionCombineOperatorTest {
    private static final String SEGMENT_NAME_PREFIX = "testSegment_";
    private static final String REALTIME_TABLE_NAME = "testTable_REALTIME";
    private static final int NUM_RECORDS_PER_SEGMENT = 100;
    private List<IndexSegment> _indexSegments;
    private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "SelectionCombineOperatorTest");
    private static final int NUM_SEGMENTS = CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY * 2;
    private static final int NUM_CONSUMING_SEGMENTS = NUM_SEGMENTS / 2;
    private static final String RAW_TABLE_NAME = "testTable";
    private static final TableConfig TABLE_CONFIG = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
    private static final String INT_COLUMN = "intColumn";
    private static final Schema SCHEMA = new Schema.SchemaBuilder().addSingleValueDimension(INT_COLUMN, FieldSpec.DataType.INT).build();
    private static final PlanMaker PLAN_MAKER = new InstancePlanMakerImplV2();
    private static final ExecutorService EXECUTOR = Executors.newCachedThreadPool();

    @BeforeClass
    public void setUp() throws Exception {
        FileUtils.deleteDirectory(TEMP_DIR);
        this._indexSegments = new ArrayList(NUM_SEGMENTS);
        for (int i = 0; i < NUM_SEGMENTS / 2; i++) {
            this._indexSegments.add(createOfflineSegment(i));
        }
        for (int i2 = NUM_CONSUMING_SEGMENTS; i2 < NUM_SEGMENTS; i2++) {
            this._indexSegments.add(createRealtimeSegment(i2));
        }
    }

    private IndexSegment createRealtimeSegment(int i) throws Exception {
        RealtimeSegmentStatsHistory realtimeSegmentStatsHistory = (RealtimeSegmentStatsHistory) Mockito.mock(RealtimeSegmentStatsHistory.class);
        Mockito.when(Integer.valueOf(realtimeSegmentStatsHistory.getEstimatedCardinality(ArgumentMatchers.anyString()))).thenReturn(200);
        Mockito.when(Integer.valueOf(realtimeSegmentStatsHistory.getEstimatedAvgColSize(ArgumentMatchers.anyString()))).thenReturn(32);
        String str = "testSegment_" + i;
        MutableSegmentImpl mutableSegmentImpl = new MutableSegmentImpl(new RealtimeSegmentConfig.Builder().setTableNameWithType(REALTIME_TABLE_NAME).setSegmentName(str).setSchema(SCHEMA).setCapacity(100000).setAvgNumMultiValues(2).setNoDictionaryColumns(Collections.emptySet()).setJsonIndexColumns(Collections.emptySet()).setVarLengthDictionaryColumns(Collections.emptySet()).setInvertedIndexColumns(Collections.emptySet()).setSegmentZKMetadata(new SegmentZKMetadata(str)).setMemoryManager(new DirectMemoryManager(str)).setStatsHistory(realtimeSegmentStatsHistory).setAggregateMetrics(false).setNullHandlingEnabled(true).setIngestionAggregationConfigs(Collections.emptyList()).build(), (ServerMetrics) null);
        int i2 = (i * NUM_RECORDS_PER_SEGMENT) / 2;
        for (int i3 = 0; i3 < NUM_RECORDS_PER_SEGMENT; i3++) {
            GenericRow genericRow = new GenericRow();
            genericRow.putValue(INT_COLUMN, Integer.valueOf(i2 + i3));
            mutableSegmentImpl.index(genericRow, (RowMetadata) null);
        }
        return mutableSegmentImpl;
    }

    private IndexSegment createOfflineSegment(int i) throws Exception {
        int i2 = (i * NUM_RECORDS_PER_SEGMENT) / 2;
        ArrayList arrayList = new ArrayList(NUM_RECORDS_PER_SEGMENT);
        for (int i3 = 0; i3 < NUM_RECORDS_PER_SEGMENT; i3++) {
            GenericRow genericRow = new GenericRow();
            genericRow.putValue(INT_COLUMN, Integer.valueOf(i2 + i3));
            arrayList.add(genericRow);
        }
        SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
        segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
        String str = "testSegment_" + i;
        segmentGeneratorConfig.setSegmentName(str);
        segmentGeneratorConfig.setOutDir(TEMP_DIR.getPath());
        SegmentIndexCreationDriverImpl segmentIndexCreationDriverImpl = new SegmentIndexCreationDriverImpl();
        segmentIndexCreationDriverImpl.init(segmentGeneratorConfig, new GenericRowRecordReader(arrayList));
        segmentIndexCreationDriverImpl.build();
        return ImmutableSegmentLoader.load(new File(TEMP_DIR, str), ReadMode.mmap);
    }

    @Test
    public void testSelectionLimit0() {
        SelectionResultsBlock combineResult = getCombineResult("SELECT * FROM testTable LIMIT 0");
        Assert.assertEquals(combineResult.getDataSchema(), new DataSchema(new String[]{INT_COLUMN}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
        Assert.assertTrue(combineResult.getRows().isEmpty());
        Assert.assertEquals(combineResult.getNumDocsScanned(), 0L);
        Assert.assertEquals(combineResult.getNumEntriesScannedInFilter(), 0L);
        Assert.assertEquals(combineResult.getNumEntriesScannedPostFilter(), 0L);
        Assert.assertEquals(combineResult.getNumSegmentsProcessed(), NUM_SEGMENTS);
        Assert.assertEquals(combineResult.getNumSegmentsMatched(), 0);
        Assert.assertEquals(combineResult.getNumConsumingSegmentsProcessed(), 0);
        Assert.assertEquals(combineResult.getNumConsumingSegmentsMatched(), 0);
        Assert.assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
    }

    @Test
    public void testSelectionOnly() {
        SelectionResultsBlock combineResult = getCombineResult("SELECT * FROM testTable");
        Assert.assertEquals(combineResult.getDataSchema(), new DataSchema(new String[]{INT_COLUMN}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
        Assert.assertEquals(combineResult.getRows().size(), 10);
        long numDocsScanned = combineResult.getNumDocsScanned();
        Assert.assertTrue(numDocsScanned >= 10 && numDocsScanned <= ((long) (CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY * 10)));
        Assert.assertEquals(combineResult.getNumEntriesScannedInFilter(), 0L);
        Assert.assertEquals(combineResult.getNumEntriesScannedPostFilter(), numDocsScanned);
        Assert.assertEquals(combineResult.getNumSegmentsProcessed(), NUM_SEGMENTS);
        Assert.assertEquals(combineResult.getNumConsumingSegmentsProcessed(), NUM_CONSUMING_SEGMENTS);
        int numSegmentsMatched = combineResult.getNumSegmentsMatched();
        Assert.assertTrue(numSegmentsMatched >= 1 && numSegmentsMatched <= CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY);
        int numConsumingSegmentsMatched = combineResult.getNumConsumingSegmentsMatched();
        if (NUM_SEGMENTS <= 10) {
            Assert.assertEquals(numConsumingSegmentsMatched, 0, "numSegments: " + NUM_SEGMENTS);
        } else {
            Assert.assertTrue(numConsumingSegmentsMatched >= 0 && numConsumingSegmentsMatched <= CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY, String.format("numConsumingSegmentsMatched: %d, maxThreadsPerQuery: %d, numSegments: %d", Integer.valueOf(combineResult.getNumConsumingSegmentsMatched()), Integer.valueOf(CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY), Integer.valueOf(NUM_SEGMENTS)));
        }
        Assert.assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
        SelectionResultsBlock combineResult2 = getCombineResult("SELECT * FROM testTable LIMIT 10000");
        Assert.assertEquals(combineResult2.getDataSchema(), new DataSchema(new String[]{INT_COLUMN}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
        Assert.assertEquals(combineResult2.getRows().size(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
        long numDocsScanned2 = combineResult2.getNumDocsScanned();
        Assert.assertEquals(numDocsScanned2, NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
        Assert.assertEquals(combineResult2.getNumEntriesScannedInFilter(), 0L);
        Assert.assertEquals(combineResult2.getNumEntriesScannedPostFilter(), numDocsScanned2);
        Assert.assertEquals(combineResult2.getNumSegmentsProcessed(), NUM_SEGMENTS);
        Assert.assertEquals(combineResult2.getNumSegmentsMatched(), NUM_SEGMENTS);
        Assert.assertEquals(combineResult2.getNumConsumingSegmentsProcessed(), NUM_CONSUMING_SEGMENTS);
        Assert.assertEquals(combineResult2.getNumConsumingSegmentsMatched(), NUM_CONSUMING_SEGMENTS);
        Assert.assertEquals(combineResult2.getNumTotalDocs(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
    }

    @Test
    public void testSelectionOrderBy() {
        SelectionResultsBlock combineResult = getCombineResult("SELECT * FROM testTable ORDER BY intColumn");
        Assert.assertEquals(combineResult.getDataSchema(), new DataSchema(new String[]{INT_COLUMN}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
        PriorityQueue priorityQueue = (PriorityQueue) combineResult.getRows();
        Assert.assertNotNull(priorityQueue);
        Assert.assertEquals(priorityQueue.size(), 10);
        int i = 9;
        while (!priorityQueue.isEmpty()) {
            int i2 = i;
            i--;
            Assert.assertEquals(((Integer) ((Object[]) priorityQueue.poll())[0]).intValue(), i2);
        }
        long numDocsScanned = combineResult.getNumDocsScanned();
        Assert.assertTrue(numDocsScanned >= 10 && numDocsScanned <= ((long) (CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY * 10)));
        Assert.assertEquals(combineResult.getNumEntriesScannedInFilter(), 0L);
        Assert.assertEquals(combineResult.getNumEntriesScannedPostFilter(), numDocsScanned);
        Assert.assertEquals(combineResult.getNumSegmentsProcessed(), NUM_SEGMENTS);
        Assert.assertEquals(combineResult.getNumConsumingSegmentsProcessed(), NUM_CONSUMING_SEGMENTS);
        Assert.assertEquals(combineResult.getNumConsumingSegmentsMatched(), 0);
        int numSegmentsMatched = combineResult.getNumSegmentsMatched();
        Assert.assertTrue(numSegmentsMatched >= 1 && numSegmentsMatched <= CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY);
        Assert.assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
        SelectionResultsBlock combineResult2 = getCombineResult("SELECT * FROM testTable ORDER BY intColumn DESC");
        Assert.assertEquals(combineResult2.getDataSchema(), new DataSchema(new String[]{INT_COLUMN}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
        PriorityQueue priorityQueue2 = (PriorityQueue) combineResult2.getRows();
        Assert.assertNotNull(priorityQueue2);
        Assert.assertEquals(priorityQueue2.size(), 10);
        int i3 = ((NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT) / 2) + 40;
        while (!priorityQueue2.isEmpty()) {
            int i4 = i3;
            i3++;
            Assert.assertEquals(((Integer) ((Object[]) priorityQueue2.poll())[0]).intValue(), i4);
        }
        long numDocsScanned2 = combineResult2.getNumDocsScanned();
        Assert.assertTrue(numDocsScanned2 >= 100 && numDocsScanned2 <= ((long) (CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY * NUM_RECORDS_PER_SEGMENT)));
        Assert.assertEquals(combineResult2.getNumEntriesScannedInFilter(), 0L);
        Assert.assertEquals(combineResult2.getNumEntriesScannedPostFilter(), numDocsScanned2);
        Assert.assertEquals(combineResult2.getNumSegmentsProcessed(), NUM_SEGMENTS);
        Assert.assertEquals(combineResult2.getNumConsumingSegmentsProcessed(), NUM_CONSUMING_SEGMENTS);
        Assert.assertEquals(combineResult2.getNumConsumingSegmentsMatched(), NUM_CONSUMING_SEGMENTS);
        int numSegmentsMatched2 = combineResult2.getNumSegmentsMatched();
        Assert.assertTrue(numSegmentsMatched2 >= 1 && numSegmentsMatched2 <= CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY);
        Assert.assertEquals(combineResult2.getNumTotalDocs(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
        SelectionResultsBlock combineResult3 = getCombineResult("SELECT * FROM testTable ORDER BY intColumn DESC LIMIT 10000");
        Assert.assertEquals(combineResult3.getDataSchema(), new DataSchema(new String[]{INT_COLUMN}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
        PriorityQueue priorityQueue3 = (PriorityQueue) combineResult3.getRows();
        Assert.assertNotNull(priorityQueue3);
        Assert.assertEquals(priorityQueue3.size(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
        long numDocsScanned3 = combineResult3.getNumDocsScanned();
        Assert.assertEquals(numDocsScanned3, NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
        Assert.assertEquals(combineResult3.getNumEntriesScannedInFilter(), 0L);
        Assert.assertEquals(combineResult3.getNumEntriesScannedPostFilter(), numDocsScanned3);
        Assert.assertEquals(combineResult3.getNumSegmentsProcessed(), NUM_SEGMENTS);
        Assert.assertEquals(combineResult3.getNumSegmentsMatched(), NUM_SEGMENTS);
        Assert.assertEquals(combineResult3.getNumConsumingSegmentsProcessed(), NUM_CONSUMING_SEGMENTS);
        Assert.assertEquals(combineResult3.getNumConsumingSegmentsMatched(), NUM_CONSUMING_SEGMENTS);
        Assert.assertEquals(combineResult3.getNumTotalDocs(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
    }

    private SelectionResultsBlock getCombineResult(String str) {
        QueryContext queryContext = QueryContextConverterUtils.getQueryContext(str);
        ArrayList arrayList = new ArrayList(NUM_SEGMENTS);
        Iterator<IndexSegment> it = this._indexSegments.iterator();
        while (it.hasNext()) {
            arrayList.add(PLAN_MAKER.makeSegmentPlanNode(it.next(), queryContext));
        }
        queryContext.setEndTimeMs(System.currentTimeMillis() + 15000);
        return new CombinePlanNode(arrayList, queryContext, EXECUTOR, (StreamObserver) null).run().nextBlock();
    }

    @AfterClass
    public void tearDown() throws IOException {
        Iterator<IndexSegment> it = this._indexSegments.iterator();
        while (it.hasNext()) {
            it.next().destroy();
        }
        FileUtils.deleteDirectory(TEMP_DIR);
    }
}
