/*
 * Decompiled with CFR 0.152.
 */
package XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum;

import XcoreXdatabricksX240X9088.foe;
import XcoreXdatabricksX240X9088.goe;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.common.Time;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ExitCode;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.Request;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.RequestProcessor;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ServerMetrics;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.WorkerService;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ZooKeeperCriticalThread;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ZooKeeperServerListener;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.util.ServiceUtils;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class CommitProcessor
extends ZooKeeperCriticalThread
implements RequestProcessor {
    private static final foe LOG = goe.a(CommitProcessor.class);
    public static final String ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS = "zookeeper.commitProcessor.numWorkerThreads";
    public static final String ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT = "zookeeper.commitProcessor.shutdownTimeout";
    public static final String ZOOKEEPER_COMMIT_PROC_MAX_READ_BATCH_SIZE = "zookeeper.commitProcessor.maxReadBatchSize";
    public static final String ZOOKEEPER_COMMIT_PROC_MAX_COMMIT_BATCH_SIZE = "zookeeper.commitProcessor.maxCommitBatchSize";
    protected LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue();
    protected final LinkedBlockingQueue<Request> queuedWriteRequests = new LinkedBlockingQueue();
    private AtomicInteger numReadQueuedRequests = new AtomicInteger(0);
    private AtomicInteger numWriteQueuedRequests = new AtomicInteger(0);
    protected final LinkedBlockingQueue<Request> committedRequests = new LinkedBlockingQueue();
    protected final Map<Long, Deque<Request>> pendingRequests = new HashMap<Long, Deque<Request>>(10000);
    protected final AtomicInteger numRequestsProcessing = new AtomicInteger(0);
    RequestProcessor nextProcessor;
    protected volatile boolean stoppedMainLoop = true;
    protected volatile boolean stopped = true;
    private long workerShutdownTimeoutMS;
    protected WorkerService workerPool;
    private Object emptyPoolSync = new Object();
    private static volatile int maxReadBatchSize;
    private static volatile int maxCommitBatchSize;
    boolean matchSyncs;

    public CommitProcessor(RequestProcessor requestProcessor, String string, boolean bl2, ZooKeeperServerListener zooKeeperServerListener) {
        super("CommitProcessor:" + string, zooKeeperServerListener);
        this.nextProcessor = requestProcessor;
        this.matchSyncs = bl2;
    }

    private boolean isProcessingRequest() {
        return this.numRequestsProcessing.get() != 0;
    }

    protected boolean needCommit(Request request) {
        if (request.isThrottled()) {
            return false;
        }
        switch (request.type) {
            case 1: 
            case 2: 
            case 5: 
            case 7: 
            case 13: 
            case 14: 
            case 15: 
            case 16: 
            case 19: 
            case 20: 
            case 21: {
                return true;
            }
            case 9: {
                return this.matchSyncs;
            }
            case -11: 
            case -10: {
                return !request.isLocalSession();
            }
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            int n2 = 0;
            boolean bl2 = false;
            do {
                Request object;
                CommitProcessor commitProcessor = this;
                synchronized (commitProcessor) {
                    bl2 = !this.committedRequests.isEmpty();
                    n2 = this.queuedRequests.size();
                    if (n2 == 0 && !bl2) {
                        while (!this.stopped && n2 == 0 && !bl2) {
                            this.wait();
                            bl2 = !this.committedRequests.isEmpty();
                            n2 = this.queuedRequests.size();
                        }
                    }
                }
                ServerMetrics.getMetrics().READS_QUEUED_IN_COMMIT_PROCESSOR.add(this.numReadQueuedRequests.get());
                ServerMetrics.getMetrics().WRITES_QUEUED_IN_COMMIT_PROCESSOR.add(this.numWriteQueuedRequests.get());
                ServerMetrics.getMetrics().COMMITS_QUEUED_IN_COMMIT_PROCESSOR.add(this.committedRequests.size());
                long l3 = Time.currentElapsedTime();
                int n3 = 0;
                while (!(this.stopped || n2 <= 0 || maxReadBatchSize >= 0 && n3 > maxReadBatchSize || (object = this.queuedRequests.poll()) == null)) {
                    --n2;
                    if (this.needCommit(object) || this.pendingRequests.containsKey(object.sessionId)) {
                        Deque deque = this.pendingRequests.computeIfAbsent(object.sessionId, l2 -> new ArrayDeque());
                        deque.addLast(object);
                        ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(deque.size());
                    } else {
                        ++n3;
                        this.numReadQueuedRequests.decrementAndGet();
                        this.sendToNextProcessor(object);
                    }
                    if (maxReadBatchSize >= 0 || this.pendingRequests.isEmpty() || this.committedRequests.isEmpty()) continue;
                    bl2 = true;
                    break;
                }
                ServerMetrics.getMetrics().READS_ISSUED_IN_COMMIT_PROC.add(n3);
                if (!bl2) {
                    boolean bl3 = bl2 = !this.committedRequests.isEmpty();
                }
                if (bl2 && !this.stopped) {
                    this.waitForEmptyPool();
                    if (this.stopped) {
                        return;
                    }
                    int n4 = maxCommitBatchSize;
                    HashSet<Long> hashSet = new HashSet<Long>();
                    long l4 = Time.currentElapsedTime();
                    int n5 = 0;
                    while (bl2 && !this.stopped && n4 > 0) {
                        object = this.committedRequests.peek();
                        if (object.isThrottled()) {
                            LOG.e("Throttled request in committed pool: {}. Exiting.", (Object)object);
                            ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
                        }
                        if (!this.queuedWriteRequests.isEmpty() && this.queuedWriteRequests.peek().sessionId == object.sessionId && this.queuedWriteRequests.peek().cxid == object.cxid) {
                            Deque<Request> deque = this.pendingRequests.get(object.sessionId);
                            ServerMetrics.getMetrics().PENDING_SESSION_QUEUE_SIZE.add(this.pendingRequests.size());
                            if (deque == null || deque.isEmpty() || !this.needCommit((Request)deque.peek())) break;
                            ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(deque.size());
                            Request request = (Request)deque.poll();
                            request.setHdr(object.getHdr());
                            request.setTxn(object.getTxn());
                            request.setTxnDigest(object.getTxnDigest());
                            request.zxid = object.zxid;
                            request.commitRecvTime = object.commitRecvTime;
                            object = request;
                            if (object.isThrottled()) {
                                LOG.e("Throttled request in committed & pending pool: {}. Exiting.", (Object)object);
                                ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
                            }
                            this.numWriteQueuedRequests.decrementAndGet();
                            this.queuedWriteRequests.poll();
                            hashSet.add(object.sessionId);
                        }
                        this.committedRequests.remove();
                        --n4;
                        ++n5;
                        this.processWrite(object);
                        bl2 = !this.committedRequests.isEmpty();
                    }
                    ServerMetrics.getMetrics().WRITE_BATCH_TIME_IN_COMMIT_PROCESSOR.add(Time.currentElapsedTime() - l4);
                    ServerMetrics.getMetrics().WRITES_ISSUED_IN_COMMIT_PROC.add(n5);
                    n3 = 0;
                    for (Long l5 : hashSet) {
                        Deque<Request> deque = this.pendingRequests.get(l5);
                        int n6 = 0;
                        while (!(this.stopped || deque.isEmpty() || this.needCommit(deque.peek()))) {
                            this.numReadQueuedRequests.decrementAndGet();
                            this.sendToNextProcessor(deque.poll());
                            ++n6;
                        }
                        ServerMetrics.getMetrics().READS_AFTER_WRITE_IN_SESSION_QUEUE.add(n6);
                        n3 += n6;
                        if (!deque.isEmpty()) continue;
                        this.pendingRequests.remove(l5);
                    }
                    ServerMetrics.getMetrics().SESSION_QUEUES_DRAINED.add(hashSet.size());
                    ServerMetrics.getMetrics().READ_ISSUED_FROM_SESSION_QUEUE.add(n3);
                }
                ServerMetrics.getMetrics().COMMIT_PROCESS_TIME.add(Time.currentElapsedTime() - l3);
                this.endOfIteration();
            } while (!this.stoppedMainLoop);
        }
        catch (Throwable throwable) {
            this.handleException(this.getName(), throwable);
        }
        LOG.c("CommitProcessor exited loop!");
    }

    protected void endOfIteration() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void waitForEmptyPool() throws InterruptedException {
        int n2 = this.numRequestsProcessing.get();
        if (n2 != 0) {
            ServerMetrics.getMetrics().CONCURRENT_REQUEST_PROCESSING_IN_COMMIT_PROCESSOR.add(n2);
        }
        long l2 = Time.currentElapsedTime();
        Object object = this.emptyPoolSync;
        synchronized (object) {
            while (!this.stopped && this.isProcessingRequest()) {
                this.emptyPoolSync.wait();
            }
        }
        ServerMetrics.getMetrics().TIME_WAITING_EMPTY_POOL_IN_COMMIT_PROCESSOR_READ.add(Time.currentElapsedTime() - l2);
    }

    @Override
    public void start() {
        int n2 = Runtime.getRuntime().availableProcessors();
        int n3 = Integer.getInteger(ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS, n2);
        this.workerShutdownTimeoutMS = Long.getLong(ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT, 5000L);
        CommitProcessor.initBatchSizes();
        LOG.c("Configuring CommitProcessor with {} worker threads.", n3 > 0 ? Integer.valueOf(n3) : "no");
        if (this.workerPool == null) {
            this.workerPool = new WorkerService("CommitProcWork", n3, true);
        }
        this.stopped = false;
        this.stoppedMainLoop = false;
        super.start();
    }

    private void sendToNextProcessor(Request request) {
        this.numRequestsProcessing.incrementAndGet();
        CommitWorkRequest commitWorkRequest = new CommitWorkRequest(request);
        this.workerPool.schedule(commitWorkRequest, request.sessionId);
    }

    private void processWrite(Request request) throws RequestProcessor.RequestProcessorException {
        CommitProcessor.processCommitMetrics(request, true);
        long l2 = Time.currentElapsedTime();
        this.nextProcessor.processRequest(request);
        ServerMetrics.getMetrics().WRITE_FINAL_PROC_TIME.add(Time.currentElapsedTime() - l2);
    }

    private static void initBatchSizes() {
        maxReadBatchSize = Integer.getInteger(ZOOKEEPER_COMMIT_PROC_MAX_READ_BATCH_SIZE, -1);
        maxCommitBatchSize = Integer.getInteger(ZOOKEEPER_COMMIT_PROC_MAX_COMMIT_BATCH_SIZE, 1);
        if (maxCommitBatchSize <= 0) {
            String string = "maxCommitBatchSize must be positive, was " + maxCommitBatchSize;
            throw new IllegalArgumentException(string);
        }
        LOG.c("Configuring CommitProcessor with readBatchSize {} commitBatchSize {}", (Object)maxReadBatchSize, (Object)maxCommitBatchSize);
    }

    private static void processCommitMetrics(Request request, boolean bl2) {
        if (bl2) {
            if (request.commitProcQueueStartTime != -1L && request.commitRecvTime != -1L) {
                long l2 = Time.currentElapsedTime();
                ServerMetrics.getMetrics().WRITE_COMMITPROC_TIME.add(l2 - request.commitProcQueueStartTime);
                ServerMetrics.getMetrics().LOCAL_WRITE_COMMITTED_TIME.add(l2 - request.commitRecvTime);
            } else if (request.commitRecvTime != -1L) {
                ServerMetrics.getMetrics().SERVER_WRITE_COMMITTED_TIME.add(Time.currentElapsedTime() - request.commitRecvTime);
            }
        } else if (request.commitProcQueueStartTime != -1L) {
            ServerMetrics.getMetrics().READ_COMMITPROC_TIME.add(Time.currentElapsedTime() - request.commitProcQueueStartTime);
        }
    }

    public static int getMaxReadBatchSize() {
        return maxReadBatchSize;
    }

    public static int getMaxCommitBatchSize() {
        return maxCommitBatchSize;
    }

    public static void setMaxReadBatchSize(int n2) {
        maxReadBatchSize = n2;
        LOG.c("Configuring CommitProcessor with readBatchSize {}", (Object)maxReadBatchSize);
    }

    public static void setMaxCommitBatchSize(int n2) {
        if (n2 > 0) {
            maxCommitBatchSize = n2;
            LOG.c("Configuring CommitProcessor with commitBatchSize {}", (Object)maxCommitBatchSize);
        }
    }

    private synchronized void wakeup() {
        this.notifyAll();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void wakeupOnEmpty() {
        Object object = this.emptyPoolSync;
        synchronized (object) {
            this.emptyPoolSync.notifyAll();
        }
    }

    public void commit(Request request) {
        if (this.stopped || request == null) {
            return;
        }
        LOG.b("Committing request:: {}", (Object)request);
        request.commitRecvTime = Time.currentElapsedTime();
        ServerMetrics.getMetrics().COMMITS_QUEUED.add(1L);
        this.committedRequests.add(request);
        this.wakeup();
    }

    @Override
    public void processRequest(Request request) {
        if (this.stopped) {
            return;
        }
        LOG.b("Processing request:: {}", (Object)request);
        request.commitProcQueueStartTime = Time.currentElapsedTime();
        this.queuedRequests.add(request);
        if (this.needCommit(request)) {
            this.queuedWriteRequests.add(request);
            this.numWriteQueuedRequests.incrementAndGet();
        } else {
            this.numReadQueuedRequests.incrementAndGet();
        }
        this.wakeup();
    }

    private void halt() {
        this.stoppedMainLoop = true;
        this.stopped = true;
        this.wakeupOnEmpty();
        this.wakeup();
        this.queuedRequests.clear();
        if (this.workerPool != null) {
            this.workerPool.stop();
        }
    }

    @Override
    public void shutdown() {
        LOG.c("Shutting down");
        this.halt();
        if (this.workerPool != null) {
            this.workerPool.join(this.workerShutdownTimeoutMS);
        }
        if (this.nextProcessor != null) {
            this.nextProcessor.shutdown();
        }
    }

    class CommitWorkRequest
    extends WorkerService.WorkRequest {
        private final Request request;

        CommitWorkRequest(Request request) {
            this.request = request;
        }

        @Override
        public void cleanup() {
            if (!CommitProcessor.this.stopped) {
                LOG.e("Exception thrown by downstream processor, unable to continue.");
                CommitProcessor.this.halt();
            }
        }

        @Override
        public void doWork() throws RequestProcessor.RequestProcessorException {
            try {
                CommitProcessor.processCommitMetrics(this.request, CommitProcessor.this.needCommit(this.request));
                long l2 = Time.currentElapsedTime();
                CommitProcessor.this.nextProcessor.processRequest(this.request);
                if (CommitProcessor.this.needCommit(this.request)) {
                    ServerMetrics.getMetrics().WRITE_FINAL_PROC_TIME.add(Time.currentElapsedTime() - l2);
                } else {
                    ServerMetrics.getMetrics().READ_FINAL_PROC_TIME.add(Time.currentElapsedTime() - l2);
                }
            }
            finally {
                if (CommitProcessor.this.numRequestsProcessing.decrementAndGet() == 0) {
                    CommitProcessor.this.wakeupOnEmpty();
                }
            }
        }
    }
}

