/*
 * Decompiled with CFR 0.152.
 */
package XdepsXdatabricksX240X9088.org.apache.zookeeper.server;

import XcoreXdatabricksX240X9088.foe;
import XcoreXdatabricksX240X9088.goe;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.common.Time;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.Request;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.RequestProcessor;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ServerMetrics;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ZooKeeperCriticalThread;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ZooKeeperServer;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ZooKeeperThread;
import java.io.Flushable;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class SyncRequestProcessor
extends ZooKeeperCriticalThread
implements RequestProcessor {
    private static final foe LOG = goe.a(SyncRequestProcessor.class);
    private static final Request REQUEST_OF_DEATH = Request.requestOfDeath;
    private static int snapCount = ZooKeeperServer.getSnapCount();
    private static long snapSizeInBytes = ZooKeeperServer.getSnapSizeInBytes();
    private int randRoll;
    private long randSize;
    private final BlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
    private final Semaphore snapThreadMutex = new Semaphore(1);
    private final ZooKeeperServer zks;
    private final RequestProcessor nextProcessor;
    private final Queue<Request> toFlush;
    private long lastFlushTime;

    public SyncRequestProcessor(ZooKeeperServer zooKeeperServer, RequestProcessor requestProcessor) {
        super("SyncThread:" + zooKeeperServer.getServerId(), zooKeeperServer.getZooKeeperServerListener());
        this.zks = zooKeeperServer;
        this.nextProcessor = requestProcessor;
        this.toFlush = new ArrayDeque<Request>(zooKeeperServer.getMaxBatchSize());
    }

    public static void setSnapCount(int n2) {
        snapCount = n2;
    }

    public static int getSnapCount() {
        return snapCount;
    }

    private long getRemainingDelay() {
        long l2 = this.zks.getFlushDelay();
        long l3 = Time.currentElapsedTime() - this.lastFlushTime;
        if (l3 < l2) {
            return l2 - l3;
        }
        return 0L;
    }

    private boolean shouldFlush() {
        long l2 = this.zks.getFlushDelay();
        long l3 = this.zks.getMaxBatchSize();
        if (l2 > 0L && this.getRemainingDelay() == 0L) {
            return true;
        }
        return l3 > 0L && (long)this.toFlush.size() >= l3;
    }

    public static void setSnapSizeInBytes(long l2) {
        snapSizeInBytes = l2;
    }

    private boolean shouldSnapshot() {
        int n2 = this.zks.getZKDatabase().getTxnCount();
        long l2 = this.zks.getZKDatabase().getTxnSize();
        return n2 > snapCount / 2 + this.randRoll || snapSizeInBytes > 0L && l2 > snapSizeInBytes / 2L + this.randSize;
    }

    private void resetSnapshotStats() {
        this.randRoll = ThreadLocalRandom.current().nextInt(snapCount / 2);
        this.randSize = Math.abs(ThreadLocalRandom.current().nextLong() % (snapSizeInBytes / 2L));
    }

    @Override
    public void run() {
        try {
            this.resetSnapshotStats();
            this.lastFlushTime = Time.currentElapsedTime();
            while (true) {
                ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(this.queuedRequests.size());
                long l2 = Math.min(this.zks.getMaxWriteQueuePollTime(), this.getRemainingDelay());
                Request request = this.queuedRequests.poll(l2, TimeUnit.MILLISECONDS);
                if (request == null) {
                    this.flush();
                    request = this.queuedRequests.take();
                }
                if (request != REQUEST_OF_DEATH) {
                    long l3 = Time.currentElapsedTime();
                    ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(l3 - request.syncQueueStartTime);
                    if (!request.isThrottled() && this.zks.getZKDatabase().append(request)) {
                        if (this.shouldSnapshot()) {
                            this.resetSnapshotStats();
                            this.zks.getZKDatabase().rollLog();
                            if (!this.snapThreadMutex.tryAcquire()) {
                                LOG.d("Too busy to snap, skipping");
                            } else {
                                new ZooKeeperThread("Snapshot Thread"){

                                    @Override
                                    public void run() {
                                        try {
                                            SyncRequestProcessor.this.zks.takeSnapshot();
                                        }
                                        catch (Exception exception) {
                                            LOG.c("Unexpected exception", exception);
                                        }
                                        finally {
                                            SyncRequestProcessor.this.snapThreadMutex.release();
                                        }
                                    }
                                }.start();
                            }
                        }
                    } else if (this.toFlush.isEmpty()) {
                        if (this.nextProcessor == null) continue;
                        this.nextProcessor.processRequest(request);
                        if (!(this.nextProcessor instanceof Flushable)) continue;
                        ((Flushable)((Object)this.nextProcessor)).flush();
                        continue;
                    }
                    this.toFlush.add(request);
                    if (this.shouldFlush()) {
                        this.flush();
                    }
                    ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - l3);
                    continue;
                }
                break;
            }
        }
        catch (Throwable throwable) {
            this.handleException(this.getName(), throwable);
        }
        LOG.c("SyncRequestProcessor exited!");
    }

    private void flush() throws IOException, RequestProcessor.RequestProcessorException {
        if (this.toFlush.isEmpty()) {
            return;
        }
        ServerMetrics.getMetrics().BATCH_SIZE.add(this.toFlush.size());
        long l2 = Time.currentElapsedTime();
        this.zks.getZKDatabase().commit();
        ServerMetrics.getMetrics().SYNC_PROCESSOR_FLUSH_TIME.add(Time.currentElapsedTime() - l2);
        if (this.nextProcessor == null) {
            this.toFlush.clear();
        } else {
            while (!this.toFlush.isEmpty()) {
                Request request = this.toFlush.remove();
                long l3 = Time.currentElapsedTime() - request.syncQueueStartTime;
                ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(l3);
                this.nextProcessor.processRequest(request);
            }
            if (this.nextProcessor instanceof Flushable) {
                ((Flushable)((Object)this.nextProcessor)).flush();
            }
        }
        this.lastFlushTime = Time.currentElapsedTime();
    }

    @Override
    public void shutdown() {
        LOG.c("Shutting down");
        this.queuedRequests.add(REQUEST_OF_DEATH);
        try {
            this.join();
            this.flush();
        }
        catch (InterruptedException interruptedException) {
            LOG.d("Interrupted while wating for {} to finish", (Object)this);
            Thread.currentThread().interrupt();
        }
        catch (IOException iOException) {
            LOG.d("Got IO exception during shutdown");
        }
        catch (RequestProcessor.RequestProcessorException requestProcessorException) {
            LOG.d("Got request processor exception during shutdown");
        }
        if (this.nextProcessor != null) {
            this.nextProcessor.shutdown();
        }
    }

    @Override
    public void processRequest(Request request) {
        Objects.requireNonNull(request, "Request cannot be null");
        request.syncQueueStartTime = Time.currentElapsedTime();
        this.queuedRequests.add(request);
        ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUED.add(1L);
    }
}

