package org.apache.pinot.query.runtime.executor;

import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.common.util.concurrent.Monitor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.core.util.trace.TraceRunnable;
import org.apache.pinot.query.mailbox.MailboxIdentifier;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.operator.OpChain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/query/runtime/executor/OpChainSchedulerService.class */
public class OpChainSchedulerService extends AbstractExecutionThreadService {
    private static final Logger LOGGER = LoggerFactory.getLogger(OpChainSchedulerService.class);
    private final OpChainScheduler _scheduler;
    private final ExecutorService _workerPool;
    private final long _pollIntervalMs;
    private final Monitor _monitor;
    private final Monitor.Guard _hasNextOrClosing;

    public OpChainSchedulerService(OpChainScheduler opChainScheduler, ExecutorService executorService) {
        this(opChainScheduler, executorService, -1L);
    }

    public OpChainSchedulerService(OpChainScheduler opChainScheduler, ExecutorService executorService, long j) {
        this._monitor = new Monitor();
        this._hasNextOrClosing = new Monitor.Guard(this._monitor) { // from class: org.apache.pinot.query.runtime.executor.OpChainSchedulerService.1
            public boolean isSatisfied() {
                return OpChainSchedulerService.this._scheduler.hasNext() || !OpChainSchedulerService.this.isRunning();
            }
        };
        this._scheduler = opChainScheduler;
        this._workerPool = executorService;
        this._pollIntervalMs = j;
    }

    protected void triggerShutdown() {
        LOGGER.info("Triggered shutdown on OpChainScheduler...");
        this._monitor.enter();
        this._monitor.leave();
    }

    protected void run() throws Exception {
        while (isRunning()) {
            if (enterMonitor()) {
                try {
                    if (!isRunning()) {
                        return;
                    }
                    final OpChain next = this._scheduler.next();
                    LOGGER.trace("({}): Scheduling", next);
                    this._workerPool.submit((Runnable) new TraceRunnable() { // from class: org.apache.pinot.query.runtime.executor.OpChainSchedulerService.2
                        public void runJob() {
                            try {
                                OpChainSchedulerService.LOGGER.trace("({}): Executing", next);
                                next.getStats().executing();
                                TransferableBlock transferableBlock = (TransferableBlock) next.getRoot().nextBlock();
                                while (!transferableBlock.isNoOpBlock() && !transferableBlock.isEndOfStreamBlock()) {
                                    transferableBlock = (TransferableBlock) next.getRoot().nextBlock();
                                }
                                if (!transferableBlock.isEndOfStreamBlock()) {
                                    OpChainSchedulerService.this.register(next, false);
                                } else if (transferableBlock.isErrorBlock()) {
                                    OpChainSchedulerService.LOGGER.error("({}): Completed erroneously {} {}", new Object[]{next, next.getStats(), transferableBlock.getDataBlock().getExceptions()});
                                } else {
                                    OpChainSchedulerService.LOGGER.debug("({}): Completed {}", next, next.getStats());
                                }
                            } catch (Exception e) {
                                OpChainSchedulerService.LOGGER.error("({}): Failed to execute operator chain! {}", new Object[]{next, next.getStats(), e});
                            }
                        }
                    });
                } finally {
                    this._monitor.leave();
                }
            }
        }
    }

    private boolean enterMonitor() throws InterruptedException {
        if (this._pollIntervalMs >= 0) {
            return this._monitor.enterWhen(this._hasNextOrClosing, this._pollIntervalMs, TimeUnit.MILLISECONDS);
        }
        this._monitor.enterWhen(this._hasNextOrClosing);
        return true;
    }

    public final void register(OpChain opChain) {
        register(opChain, true);
        LOGGER.debug("({}): Scheduler is now handling operator chain listening to mailboxes {}. There are a total of {} chains awaiting execution.", new Object[]{opChain, opChain.getReceivingMailbox(), Integer.valueOf(this._scheduler.size())});
        opChain.getStats().startExecutionTimer();
    }

    public final void register(OpChain opChain, boolean z) {
        this._monitor.enter();
        try {
            LOGGER.trace("({}): Registered operator chain (new: {}). Total: {}", new Object[]{opChain, Boolean.valueOf(z), Integer.valueOf(this._scheduler.size())});
            this._scheduler.register(opChain, z);
            opChain.getStats().queued();
        } finally {
            this._monitor.leave();
        }
    }

    public final void onDataAvailable(MailboxIdentifier mailboxIdentifier) {
        this._monitor.enter();
        try {
            LOGGER.trace("Notified onDataAvailable for mailbox {}", mailboxIdentifier);
            this._scheduler.onDataAvailable(mailboxIdentifier);
        } finally {
            this._monitor.leave();
        }
    }

    public ExecutorService getWorkerPool() {
        return this._workerPool;
    }
}
