/*
 * Decompiled with CFR 0.152.
 */
package XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum;

import XcoreXdatabricksX240X9088.foe;
import XcoreXdatabricksX240X9088.goe;
import XcoreXdatabricksX240X9088.hme;
import XcoreXdatabricksX240X9088.ime;
import XcoreXdatabricksX240X9088.mme;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.common.Time;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.Request;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.RequestRecord;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ServerMetrics;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.TxnLogProposalIterator;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ZKDatabase;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ZooKeeperThread;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ZooTrace;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.Leader;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.LearnerMaster;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.LearnerSyncRequest;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.LearnerSyncThrottler;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.ObserverMaster;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.QuorumPacket;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.QuorumPeer;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.StateSummary;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.SyncThrottleException;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.util.ConfigUtils;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.util.MessageTracker;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.util.ZxidUtils;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.security.sasl.SaslException;

public class LearnerHandler
extends ZooKeeperThread {
    private static final foe LOG = goe.a(LearnerHandler.class);
    public static final String LEADER_CLOSE_SOCKET_ASYNC = "zookeeper.leader.closeSocketAsync";
    public static final boolean closeSocketAsync = Boolean.parseBoolean(ConfigUtils.getPropertyBackwardCompatibleWay("zookeeper.leader.closeSocketAsync"));
    protected final Socket sock;
    AtomicBoolean sockBeingClosed = new AtomicBoolean(false);
    final LearnerMaster learnerMaster;
    volatile long tickOfNextAckDeadline;
    protected long sid = 0L;
    protected int version = 1;
    final LinkedBlockingQueue<QuorumPacket> queuedPackets = new LinkedBlockingQueue();
    private final AtomicLong queuedPacketsSize = new AtomicLong();
    protected final AtomicLong packetsReceived = new AtomicLong();
    protected final AtomicLong packetsSent = new AtomicLong();
    protected final AtomicLong requestsReceived = new AtomicLong();
    protected volatile long lastZxid = -1L;
    protected final Date established = new Date();
    private final int markerPacketInterval = 1000;
    private AtomicInteger packetCounter = new AtomicInteger();
    private SyncLimitCheck syncLimitCheck = new SyncLimitCheck();
    private hme ia;
    private ime oa;
    private final BufferedInputStream bufferedInput;
    private BufferedOutputStream bufferedOutput;
    protected final MessageTracker messageTracker;
    private volatile boolean sendingThreadStarted = false;
    public static final String FORCE_SNAP_SYNC = "zookeeper.forceSnapshotSync";
    private boolean forceSnapSync = false;
    private boolean needOpPacket = true;
    private long leaderLastZxid;
    private LearnerSyncThrottler syncThrottler = null;
    final QuorumPacket proposalOfDeath = new QuorumPacket();
    private QuorumPeer.LearnerType learnerType = QuorumPeer.LearnerType.PARTICIPANT;

    public Socket getSocket() {
        return this.sock;
    }

    long getSid() {
        return this.sid;
    }

    String getRemoteAddress() {
        return this.sock == null ? "<null>" : this.sock.getRemoteSocketAddress().toString();
    }

    int getVersion() {
        return this.version;
    }

    public synchronized long getLastZxid() {
        return this.lastZxid;
    }

    public Date getEstablished() {
        return (Date)this.established.clone();
    }

    protected void setOutputArchive(ime ime2) {
        this.oa = ime2;
    }

    protected void setBufferedOutput(BufferedOutputStream bufferedOutputStream) {
        this.bufferedOutput = bufferedOutputStream;
    }

    LearnerHandler(Socket socket, BufferedInputStream bufferedInputStream, LearnerMaster learnerMaster) throws IOException {
        super("LearnerHandler-" + socket.getRemoteSocketAddress());
        this.sock = socket;
        this.learnerMaster = learnerMaster;
        this.bufferedInput = bufferedInputStream;
        if (Boolean.getBoolean(FORCE_SNAP_SYNC)) {
            this.forceSnapSync = true;
            LOG.c("Forcing snapshot sync is enabled");
        }
        try {
            QuorumAuthServer quorumAuthServer = learnerMaster.getQuorumAuthServer();
            if (quorumAuthServer != null) {
                quorumAuthServer.authenticate(socket, new DataInputStream(bufferedInputStream));
            }
        }
        catch (IOException iOException) {
            LOG.e("Server failed to authenticate quorum learner, addr: {}, closing connection", (Object)socket.getRemoteSocketAddress(), (Object)iOException);
            this.closeSocket();
            throw new SaslException("Authentication failure: " + iOException.getMessage());
        }
        this.messageTracker = new MessageTracker(MessageTracker.BUFFERED_MESSAGE_SIZE);
    }

    @Override
    public String toString() {
        StringBuilder stringBuilder = new StringBuilder();
        stringBuilder.append("LearnerHandler ").append(this.sock);
        stringBuilder.append(" tickOfNextAckDeadline:").append(this.tickOfNextAckDeadline());
        stringBuilder.append(" synced?:").append(this.synced());
        stringBuilder.append(" queuedPacketLength:").append(this.queuedPackets.size());
        return stringBuilder.toString();
    }

    public QuorumPeer.LearnerType getLearnerType() {
        return this.learnerType;
    }

    private void sendPackets() throws InterruptedException {
        try {
            while (true) {
                QuorumPacket quorumPacket;
                if ((quorumPacket = this.queuedPackets.poll()) == null) {
                    this.bufferedOutput.flush();
                    quorumPacket = this.queuedPackets.take();
                }
                ServerMetrics.getMetrics().LEARNER_HANDLER_QP_SIZE.add(Long.toString(this.sid), this.queuedPackets.size());
                if (quorumPacket instanceof MarkerQuorumPacket) {
                    MarkerQuorumPacket markerQuorumPacket = (MarkerQuorumPacket)quorumPacket;
                    ServerMetrics.getMetrics().LEARNER_HANDLER_QP_TIME.add(Long.toString(this.sid), (System.nanoTime() - markerQuorumPacket.time) / 1000000L);
                    continue;
                }
                this.queuedPacketsSize.addAndGet(-LearnerHandler.packetSize(quorumPacket));
                if (quorumPacket != this.proposalOfDeath) {
                    if (quorumPacket.getType() == 2) {
                        this.syncLimitCheck.updateProposal(quorumPacket.getZxid(), System.nanoTime());
                    }
                    if (LOG.b()) {
                        long l2 = 16L;
                        if (quorumPacket.getType() == 5) {
                            l2 = 128L;
                        }
                        ZooTrace.logQuorumPacket(LOG, l2, 'o', quorumPacket);
                    }
                    if (quorumPacket.getZxid() > 0L) {
                        this.lastZxid = quorumPacket.getZxid();
                    }
                    this.oa.a(quorumPacket, "packet");
                    this.packetsSent.incrementAndGet();
                    this.messageTracker.trackSent(quorumPacket.getType());
                    continue;
                }
                break;
            }
        }
        catch (IOException iOException) {
            LOG.d("Exception while sending packets in LearnerHandler", iOException);
            this.closeSocket();
        }
    }

    public static String packetToString(QuorumPacket quorumPacket) {
        Object object;
        String string;
        String string2 = null;
        switch (quorumPacket.getType()) {
            case 3: {
                string = "ACK";
                break;
            }
            case 4: {
                string = "COMMIT";
                break;
            }
            case 11: {
                string = "FOLLOWERINFO";
                break;
            }
            case 10: {
                string = "NEWLEADER";
                break;
            }
            case 5: {
                string = "PING";
                break;
            }
            case 2: {
                string = "PROPOSAL";
                break;
            }
            case 1: {
                string = "REQUEST";
                break;
            }
            case 6: {
                string = "REVALIDATE";
                object = new ByteArrayInputStream(quorumPacket.getData());
                DataInputStream dataInputStream = new DataInputStream((InputStream)object);
                try {
                    long l2 = dataInputStream.readLong();
                    string2 = " sessionid = " + l2;
                }
                catch (IOException iOException) {
                    LOG.c("Unexpected exception", iOException);
                }
                break;
            }
            case 12: {
                string = "UPTODATE";
                break;
            }
            case 13: {
                string = "DIFF";
                break;
            }
            case 14: {
                string = "TRUNC";
                break;
            }
            case 15: {
                string = "SNAP";
                break;
            }
            case 18: {
                string = "ACKEPOCH";
                break;
            }
            case 7: {
                string = "SYNC";
                break;
            }
            case 8: {
                string = "INFORM";
                break;
            }
            case 9: {
                string = "COMMITANDACTIVATE";
                break;
            }
            case 19: {
                string = "INFORMANDACTIVATE";
                break;
            }
            default: {
                string = "UNKNOWN" + quorumPacket.getType();
            }
        }
        object = null;
        if (string != null) {
            object = string + " " + Long.toHexString(quorumPacket.getZxid()) + " " + string2;
        }
        return object;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            Object object;
            boolean bl2;
            long l2;
            Object object2;
            this.learnerMaster.addLearnerHandler(this);
            this.tickOfNextAckDeadline = this.learnerMaster.getTickOfInitialAckDeadline();
            this.ia = hme.a(this.bufferedInput);
            this.bufferedOutput = new BufferedOutputStream(this.sock.getOutputStream());
            this.oa = ime.a(this.bufferedOutput);
            QuorumPacket quorumPacket = new QuorumPacket();
            this.ia.a(quorumPacket, "packet");
            this.messageTracker.trackReceived(quorumPacket.getType());
            if (quorumPacket.getType() != 11 && quorumPacket.getType() != 16) {
                LOG.e("First packet {} is not FOLLOWERINFO or OBSERVERINFO!", (Object)quorumPacket.toString());
                return;
            }
            if (this.learnerMaster instanceof ObserverMaster && quorumPacket.getType() != 16) {
                throw new IOException("Non observer attempting to connect to ObserverMaster. type = " + quorumPacket.getType());
            }
            byte[] byArray = quorumPacket.getData();
            if (byArray != null) {
                object2 = ByteBuffer.wrap(byArray);
                if (byArray.length >= 8) {
                    this.sid = ((ByteBuffer)object2).getLong();
                }
                if (byArray.length >= 12) {
                    this.version = ((ByteBuffer)object2).getInt();
                }
                if (byArray.length >= 20 && (l2 = ((ByteBuffer)object2).getLong()) > this.learnerMaster.getQuorumVerifierVersion()) {
                    throw new IOException("Follower is ahead of the leader (has a later activated configuration)");
                }
            } else {
                this.sid = this.learnerMaster.getAndDecrementFollowerCounter();
            }
            if (((String)(object2 = this.learnerMaster.getPeerInfo(this.sid))).isEmpty()) {
                LOG.c("Follower sid: {} not in the current config {}", (Object)this.sid, (Object)Long.toHexString(this.learnerMaster.getQuorumVerifierVersion()));
            } else {
                LOG.c("Follower sid: {} : info : {}", (Object)this.sid, object2);
            }
            if (quorumPacket.getType() == 16) {
                this.learnerType = QuorumPeer.LearnerType.OBSERVER;
            }
            this.learnerMaster.registerLearnerHandlerBean(this, this.sock);
            l2 = ZxidUtils.getEpochFromZxid(quorumPacket.getZxid());
            StateSummary stateSummary = null;
            long l3 = quorumPacket.getZxid();
            long l4 = this.learnerMaster.getEpochToPropose(this.getSid(), l2);
            long l5 = ZxidUtils.makeZxid(l4, 0L);
            if (this.getVersion() < 65536) {
                long l6 = ZxidUtils.getEpochFromZxid(l3);
                stateSummary = new StateSummary(l6, l3);
                this.learnerMaster.waitForEpochAck(this.getSid(), stateSummary);
            } else {
                byte[] byArray2 = new byte[4];
                ByteBuffer.wrap(byArray2).putInt(65536);
                QuorumPacket quorumPacket2 = new QuorumPacket(17, l5, byArray2, null);
                this.oa.a(quorumPacket2, "packet");
                this.messageTracker.trackSent(17);
                this.bufferedOutput.flush();
                QuorumPacket quorumPacket3 = new QuorumPacket();
                this.ia.a(quorumPacket3, "packet");
                this.messageTracker.trackReceived(quorumPacket3.getType());
                if (quorumPacket3.getType() != 18) {
                    LOG.e("{} is not ACKEPOCH", (Object)quorumPacket3.toString());
                    return;
                }
                ByteBuffer byteBuffer = ByteBuffer.wrap(quorumPacket3.getData());
                stateSummary = new StateSummary(byteBuffer.getInt(), quorumPacket3.getZxid());
                this.learnerMaster.waitForEpochAck(this.getSid(), stateSummary);
            }
            long l7 = stateSummary.getLastZxid();
            boolean bl3 = this.syncFollower(l7, this.learnerMaster);
            boolean bl4 = bl2 = this.getLearnerType() != QuorumPeer.LearnerType.OBSERVER;
            if (bl3) {
                this.syncThrottler = this.learnerMaster.getLearnerSnapSyncThrottler();
                this.syncThrottler.beginSync(bl2);
                ServerMetrics.getMetrics().INFLIGHT_SNAP_COUNT.add(this.syncThrottler.getSyncInProgress());
                try {
                    long l8 = this.learnerMaster.getZKDatabase().getDataTreeLastProcessedZxid();
                    this.oa.a(new QuorumPacket(15, l8, null, null), "packet");
                    this.messageTracker.trackSent(15);
                    this.bufferedOutput.flush();
                    LOG.c("Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, send zxid of db as 0x{}, {} concurrent snapshot sync, snapshot sync was {} from throttle", Long.toHexString(l7), Long.toHexString(this.leaderLastZxid), Long.toHexString(l8), this.syncThrottler.getSyncInProgress(), bl2 ? "exempt" : "not exempt");
                    this.learnerMaster.getZKDatabase().serializeSnapshot(this.oa);
                    this.oa.a("BenWasHere", "signature");
                    this.bufferedOutput.flush();
                }
                finally {
                    ServerMetrics.getMetrics().SNAP_COUNT.add(1L);
                }
            } else {
                this.syncThrottler = this.learnerMaster.getLearnerDiffSyncThrottler();
                this.syncThrottler.beginSync(bl2);
                ServerMetrics.getMetrics().INFLIGHT_DIFF_COUNT.add(this.syncThrottler.getSyncInProgress());
                ServerMetrics.getMetrics().DIFF_COUNT.add(1L);
            }
            LOG.b("Sending NEWLEADER message to {}", (Object)this.sid);
            if (this.getVersion() < 65536) {
                object = new QuorumPacket(10, l5, null, null);
                this.oa.a((mme)object, "packet");
            } else {
                object = new QuorumPacket(10, l5, this.learnerMaster.getQuorumVerifierBytes(), null);
                this.queuedPackets.add((QuorumPacket)object);
            }
            this.bufferedOutput.flush();
            this.startSendingPackets();
            quorumPacket = new QuorumPacket();
            this.ia.a(quorumPacket, "packet");
            this.messageTracker.trackReceived(quorumPacket.getType());
            if (quorumPacket.getType() != 3) {
                LOG.e("Next packet was supposed to be an ACK, but received packet: {}", (Object)LearnerHandler.packetToString(quorumPacket));
                return;
            }
            try {
                LOG.b("Received NEWLEADER-ACK message from {}", (Object)this.sid);
                this.learnerMaster.waitForNewLeaderAck(this.getSid(), quorumPacket.getZxid());
                this.syncLimitCheck.start();
                this.syncThrottler.endSync();
                if (bl3) {
                    ServerMetrics.getMetrics().INFLIGHT_SNAP_COUNT.add(this.syncThrottler.getSyncInProgress());
                } else {
                    ServerMetrics.getMetrics().INFLIGHT_DIFF_COUNT.add(this.syncThrottler.getSyncInProgress());
                }
                this.syncThrottler = null;
                this.sock.setSoTimeout(this.learnerMaster.syncTimeout());
                this.learnerMaster.waitForStartup();
                LOG.b("Sending UPTODATE message to {}", (Object)this.sid);
                this.queuedPackets.add(new QuorumPacket(12, -1L, null, null));
                block23: while (true) {
                    quorumPacket = new QuorumPacket();
                    this.ia.a(quorumPacket, "packet");
                    this.messageTracker.trackReceived(quorumPacket.getType());
                    if (LOG.b()) {
                        long l9 = 16L;
                        if (quorumPacket.getType() == 5) {
                            l9 = 128L;
                        }
                        ZooTrace.logQuorumPacket(LOG, l9, 'i', quorumPacket);
                    }
                    this.tickOfNextAckDeadline = this.learnerMaster.getTickOfNextAckDeadline();
                    this.packetsReceived.incrementAndGet();
                    switch (quorumPacket.getType()) {
                        case 3: {
                            if (this.learnerType == QuorumPeer.LearnerType.OBSERVER) {
                                LOG.b("Received ACK from Observer {}", (Object)this.sid);
                            }
                            this.syncLimitCheck.updateAck(quorumPacket.getZxid());
                            this.learnerMaster.processAck(this.sid, quorumPacket.getZxid(), this.sock.getLocalSocketAddress());
                            continue block23;
                        }
                        case 5: {
                            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(quorumPacket.getData());
                            DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream);
                            while (true) {
                                if (dataInputStream.available() <= 0) continue block23;
                                long l10 = dataInputStream.readLong();
                                int n2 = dataInputStream.readInt();
                                this.learnerMaster.touch(l10, n2);
                            }
                        }
                        case 6: {
                            ServerMetrics.getMetrics().REVALIDATE_COUNT.add(1L);
                            this.learnerMaster.revalidateSession(quorumPacket, this);
                            continue block23;
                        }
                        case 1: {
                            ByteBuffer byteBuffer = ByteBuffer.wrap(quorumPacket.getData());
                            long l11 = byteBuffer.getLong();
                            int n3 = byteBuffer.getInt();
                            int n4 = byteBuffer.getInt();
                            byteBuffer = byteBuffer.slice();
                            Request request = n4 == 9 ? new LearnerSyncRequest(this, l11, n3, n4, RequestRecord.fromBytes(byteBuffer), quorumPacket.getAuthinfo()) : new Request(null, l11, n3, n4, RequestRecord.fromBytes(byteBuffer), quorumPacket.getAuthinfo());
                            request.setOwner(this);
                            this.learnerMaster.submitLearnerRequest(request);
                            this.requestsReceived.incrementAndGet();
                            continue block23;
                        }
                    }
                    LOG.d("unexpected quorum packet, type: {}", (Object)LearnerHandler.packetToString(quorumPacket));
                }
            }
            catch (IOException iOException) {
                LOG.d("Unexpected exception in LearnerHandler: ", iOException);
                this.closeSocket();
            }
            catch (InterruptedException interruptedException) {
                LOG.d("Unexpected exception in LearnerHandler.", interruptedException);
            }
            catch (SyncThrottleException syncThrottleException) {
                LOG.d("too many concurrent sync.", syncThrottleException);
                this.syncThrottler = null;
            }
            catch (Exception exception) {
                LOG.d("Unexpected exception in LearnerHandler.", exception);
                throw exception;
            }
        }
        finally {
            if (this.syncThrottler != null) {
                this.syncThrottler.endSync();
                this.syncThrottler = null;
            }
            String string = this.getRemoteAddress();
            LOG.d("******* GOODBYE {} ********", (Object)string);
            this.messageTracker.dumpToLog(string);
            this.shutdown();
        }
    }

    protected void startSendingPackets() {
        if (!this.sendingThreadStarted) {
            new Thread(){

                @Override
                public void run() {
                    Thread.currentThread().setName("Sender-" + LearnerHandler.this.sock.getRemoteSocketAddress());
                    try {
                        LearnerHandler.this.sendPackets();
                    }
                    catch (InterruptedException interruptedException) {
                        LOG.c("Unexpected interruption", interruptedException);
                    }
                }
            }.start();
            this.sendingThreadStarted = true;
        } else {
            LOG.e("Attempting to start sending thread after it already started");
        }
    }

    protected boolean shouldSendMarkerPacketForLogging() {
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean syncFollower(long l2, LearnerMaster learnerMaster) {
        boolean bl2 = (l2 & 0xFFFFFFFFL) == 0L;
        long l3 = l2;
        boolean bl3 = true;
        ZKDatabase zKDatabase = learnerMaster.getZKDatabase();
        boolean bl4 = zKDatabase.isTxnLogSyncEnabled();
        ReentrantReadWriteLock reentrantReadWriteLock = zKDatabase.getLogLock();
        ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
        try {
            readLock.lock();
            long l4 = zKDatabase.getmaxCommittedLog();
            long l5 = zKDatabase.getminCommittedLog();
            long l6 = zKDatabase.getDataTreeLastProcessedZxid();
            LOG.c("Synchronizing with Learner sid: {} maxCommittedLog=0x{} minCommittedLog=0x{} lastProcessedZxid=0x{} peerLastZxid=0x{}", this.getSid(), Long.toHexString(l4), Long.toHexString(l5), Long.toHexString(l6), Long.toHexString(l2));
            if (zKDatabase.getCommittedLog().isEmpty()) {
                l5 = l6;
                l4 = l6;
            }
            if (this.forceSnapSync) {
                LOG.d("Forcing snapshot sync - should not see this in production");
            } else if (l6 == l2) {
                LOG.c("Sending DIFF zxid=0x{} for peer sid: {}", (Object)Long.toHexString(l2), (Object)this.getSid());
                this.queueOpPacket(13, l2);
                this.needOpPacket = false;
                bl3 = false;
            } else if (l2 > l4 && !bl2) {
                LOG.b("Sending TRUNC to follower zxidToSend=0x{} for peer sid:{}", (Object)Long.toHexString(l4), (Object)this.getSid());
                this.queueOpPacket(14, l4);
                l3 = l4;
                this.needOpPacket = false;
                bl3 = false;
            } else if (l4 >= l2 && l5 <= l2) {
                LOG.c("Using committedLog for peer sid: {}", (Object)this.getSid());
                Iterator<Leader.Proposal> iterator = zKDatabase.getCommittedLog().iterator();
                l3 = this.queueCommittedProposals(iterator, l2, null, l4);
                bl3 = false;
            } else if (l2 < l5 && bl4) {
                Iterator<Leader.Proposal> iterator;
                long l7 = zKDatabase.calculateTxnLogSizeLimit();
                Iterator<Leader.Proposal> iterator2 = zKDatabase.getProposalsFromTxnLog(l2, l7);
                if (iterator2.hasNext()) {
                    LOG.c("Use txnlog and committedLog for peer sid: {}", (Object)this.getSid());
                    l3 = this.queueCommittedProposals(iterator2, l2, l5, l4);
                    if (l3 < l5) {
                        LOG.c("Detected gap between end of txnlog: 0x{} and start of committedLog: 0x{}", (Object)Long.toHexString(l3), (Object)Long.toHexString(l5));
                        l3 = l2;
                        this.queuedPackets.clear();
                        this.needOpPacket = true;
                    } else {
                        LOG.b("Queueing committedLog 0x{}", (Object)Long.toHexString(l3));
                        iterator = zKDatabase.getCommittedLog().iterator();
                        l3 = this.queueCommittedProposals(iterator, l3, null, l4);
                        bl3 = false;
                    }
                }
                if (iterator2 instanceof TxnLogProposalIterator) {
                    iterator = (TxnLogProposalIterator)iterator2;
                    iterator.close();
                }
            } else {
                LOG.d("Unhandled scenario for peer sid: {} maxCommittedLog=0x{} minCommittedLog=0x{} lastProcessedZxid=0x{} peerLastZxid=0x{} txnLogSyncEnabled={}", this.getSid(), Long.toHexString(l4), Long.toHexString(l5), Long.toHexString(l6), Long.toHexString(l2), bl4);
            }
            if (bl3) {
                l3 = zKDatabase.getDataTreeLastProcessedZxid();
            }
            LOG.b("Start forwarding 0x{} for peer sid: {}", (Object)Long.toHexString(l3), (Object)this.getSid());
            this.leaderLastZxid = learnerMaster.startForwarding(this, l3);
        }
        finally {
            readLock.unlock();
        }
        if (this.needOpPacket && !bl3) {
            LOG.e("Unhandled scenario for peer sid: {} fall back to use snapshot", (Object)this.getSid());
            bl3 = true;
        }
        return bl3;
    }

    protected long queueCommittedProposals(Iterator<Leader.Proposal> iterator, long l2, Long l3, Long l4) {
        boolean bl2 = (l2 & 0xFFFFFFFFL) == 0L;
        long l5 = l2;
        long l6 = -1L;
        while (iterator.hasNext()) {
            Leader.Proposal proposal = iterator.next();
            long l7 = proposal.getZxid();
            if (l3 != null && l7 > l3) break;
            if (l7 < l2) {
                l6 = l7;
                continue;
            }
            if (this.needOpPacket) {
                if (l7 == l2) {
                    LOG.c("Sending DIFF zxid=0x{}  for peer sid: {}", (Object)Long.toHexString(l4), (Object)this.getSid());
                    this.queueOpPacket(13, l4);
                    this.needOpPacket = false;
                    continue;
                }
                if (bl2) {
                    LOG.c("Sending DIFF zxid=0x{}  for peer sid: {}", (Object)Long.toHexString(l4), (Object)this.getSid());
                    this.queueOpPacket(13, l4);
                    this.needOpPacket = false;
                } else if (l7 > l2) {
                    if (ZxidUtils.getEpochFromZxid(l7) != ZxidUtils.getEpochFromZxid(l2)) {
                        LOG.d("Cannot send TRUNC to peer sid: " + this.getSid() + " peer zxid is from different epoch");
                        return l5;
                    }
                    LOG.c("Sending TRUNC zxid=0x{}  for peer sid: {}", (Object)Long.toHexString(l6), (Object)this.getSid());
                    this.queueOpPacket(14, l6);
                    this.needOpPacket = false;
                }
            }
            if (l7 <= l5) continue;
            this.queuePacket(proposal.getQuorumPacket());
            this.queueOpPacket(4, l7);
            l5 = l7;
        }
        if (this.needOpPacket && bl2) {
            LOG.c("Sending DIFF zxid=0x{}  for peer sid: {}", (Object)Long.toHexString(l4), (Object)this.getSid());
            this.queueOpPacket(13, l4);
            this.needOpPacket = false;
        }
        return l5;
    }

    public void shutdown() {
        try {
            this.queuedPackets.clear();
            this.queuedPackets.put(this.proposalOfDeath);
        }
        catch (InterruptedException interruptedException) {
            LOG.c("Ignoring unexpected exception", interruptedException);
        }
        this.closeSocket();
        this.interrupt();
        this.learnerMaster.removeLearnerHandler(this);
        this.learnerMaster.unregisterLearnerHandlerBean(this);
    }

    public long tickOfNextAckDeadline() {
        return this.tickOfNextAckDeadline;
    }

    public void ping() {
        if (!this.sendingThreadStarted) {
            return;
        }
        if (this.syncLimitCheck.check(System.nanoTime())) {
            long l2 = this.learnerMaster.getLastProposed();
            QuorumPacket quorumPacket = new QuorumPacket(5, l2, null, null);
            this.queuePacket(quorumPacket);
        } else {
            LOG.d("Closing connection to peer due to transaction timeout.");
            this.shutdown();
        }
    }

    private void queueOpPacket(int n2, long l2) {
        QuorumPacket quorumPacket = new QuorumPacket(n2, l2, null, null);
        this.queuePacket(quorumPacket);
    }

    void queuePacket(QuorumPacket quorumPacket) {
        this.queuedPackets.add(quorumPacket);
        if (this.shouldSendMarkerPacketForLogging() && this.packetCounter.getAndIncrement() % 1000 == 0) {
            this.queuedPackets.add(new MarkerQuorumPacket(System.nanoTime()));
        }
        this.queuedPacketsSize.addAndGet(LearnerHandler.packetSize(quorumPacket));
    }

    static long packetSize(QuorumPacket quorumPacket) {
        long l2 = 28L;
        byte[] byArray = quorumPacket.getData();
        if (byArray != null) {
            l2 += (long)byArray.length;
        }
        return l2;
    }

    public boolean synced() {
        return this.isAlive() && (long)this.learnerMaster.getCurrentTick() <= this.tickOfNextAckDeadline;
    }

    public synchronized Map<String, Object> getLearnerHandlerInfo() {
        LinkedHashMap<String, Object> linkedHashMap = new LinkedHashMap<String, Object>(9);
        linkedHashMap.put("remote_socket_address", this.getRemoteAddress());
        linkedHashMap.put("sid", this.getSid());
        linkedHashMap.put("established", this.getEstablished());
        linkedHashMap.put("queued_packets", this.queuedPackets.size());
        linkedHashMap.put("queued_packets_size", this.queuedPacketsSize.get());
        linkedHashMap.put("packets_received", this.packetsReceived.longValue());
        linkedHashMap.put("packets_sent", this.packetsSent.longValue());
        linkedHashMap.put("requests", this.requestsReceived.longValue());
        linkedHashMap.put("last_zxid", this.getLastZxid());
        return linkedHashMap;
    }

    public synchronized void resetObserverConnectionStats() {
        this.packetsReceived.set(0L);
        this.packetsSent.set(0L);
        this.requestsReceived.set(0L);
        this.lastZxid = -1L;
    }

    public Queue<QuorumPacket> getQueuedPackets() {
        return this.queuedPackets;
    }

    public void setFirstPacket(boolean bl2) {
        this.needOpPacket = bl2;
    }

    void closeSocket() {
        if (this.sock != null && !this.sock.isClosed() && this.sockBeingClosed.compareAndSet(false, true)) {
            if (closeSocketAsync) {
                LOG.c("Asynchronously closing socket to learner {}.", (Object)this.getSid());
                this.closeSockAsync();
            } else {
                LOG.c("Synchronously closing socket to learner {}.", (Object)this.getSid());
                this.closeSockSync();
            }
        }
    }

    void closeSockAsync() {
        Thread thread = new Thread(() -> this.closeSockSync(), "CloseSocketThread(sid:" + this.sid);
        thread.setDaemon(true);
        thread.start();
    }

    void closeSockSync() {
        try {
            if (this.sock != null) {
                long l2 = Time.currentElapsedTime();
                this.sock.close();
                ServerMetrics.getMetrics().SOCKET_CLOSING_TIME.add(Time.currentElapsedTime() - l2);
            }
        }
        catch (IOException iOException) {
            LOG.d("Ignoring error closing connection to learner {}", (Object)this.getSid(), (Object)iOException);
        }
    }

    static {
        LOG.c("{} = {}", (Object)LEADER_CLOSE_SOCKET_ASYNC, (Object)closeSocketAsync);
    }

    static class MarkerQuorumPacket
    extends QuorumPacket {
        long time;

        MarkerQuorumPacket(long l2) {
            this.time = l2;
        }

        @Override
        public int hashCode() {
            return Objects.hash(this.time);
        }

        @Override
        public boolean equals(Object object) {
            if (this == object) {
                return true;
            }
            if (object == null || this.getClass() != object.getClass()) {
                return false;
            }
            MarkerQuorumPacket markerQuorumPacket = (MarkerQuorumPacket)object;
            return this.time == markerQuorumPacket.time;
        }
    }

    class SyncLimitCheck {
        private boolean started = false;
        private long currentZxid = 0L;
        private long currentTime = 0L;
        private long nextZxid = 0L;
        private long nextTime = 0L;

        private SyncLimitCheck() {
        }

        public synchronized void start() {
            this.started = true;
        }

        public synchronized void updateProposal(long l2, long l3) {
            if (!this.started) {
                return;
            }
            if (this.currentTime == 0L) {
                this.currentTime = l3;
                this.currentZxid = l2;
            } else {
                this.nextTime = l3;
                this.nextZxid = l2;
            }
        }

        public synchronized void updateAck(long l2) {
            if (this.currentZxid == l2) {
                this.currentTime = this.nextTime;
                this.currentZxid = this.nextZxid;
                this.nextTime = 0L;
                this.nextZxid = 0L;
            } else if (this.nextZxid == l2) {
                LOG.d("ACK for 0x{} received before ACK for 0x{}", (Object)Long.toHexString(l2), (Object)Long.toHexString(this.currentZxid));
                this.nextTime = 0L;
                this.nextZxid = 0L;
            }
        }

        public synchronized boolean check(long l2) {
            if (this.currentTime == 0L) {
                return true;
            }
            long l3 = (l2 - this.currentTime) / 1000000L;
            return l3 < (long)LearnerHandler.this.learnerMaster.syncTimeout();
        }
    }
}

