/*
 * Decompiled with CFR 0.152.
 */
package XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum;

import XcoreXdatabricksX240X9088.foe;
import XcoreXdatabricksX240X9088.goe;
import XcoreXdatabricksX240X9088.hme;
import XcoreXdatabricksX240X9088.ime;
import XcoreXdatabricksX240X9088.kme;
import XcoreXdatabricksX240X9088.lme;
import XcoreXdatabricksX240X9088.mme;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.common.Time;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.common.X509Exception;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ExitCode;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.Request;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ServerCnxn;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ServerMetrics;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.TxnLogEntry;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ZooTrace;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.FollowerZooKeeperServer;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.LearnerHandler;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.LearnerInfo;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.LearnerSender;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.LearnerZooKeeperServer;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.MultipleAddresses;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.ObserverZooKeeperServer;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.QuorumPacket;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.QuorumPeer;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.Vote;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.util.ConfigUtils;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.util.MessageTracker;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.util.SerializeUtils;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.util.ZxidUtils;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.txn.SetDataTxn;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.txn.TxnDigest;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.txn.TxnHeader;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.util.ServiceUtils;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLSocket;

public class Learner {
    QuorumPeer self;
    LearnerZooKeeperServer zk;
    protected BufferedOutputStream bufferedOutput;
    protected Socket sock;
    protected MultipleAddresses leaderAddr;
    protected AtomicBoolean sockBeingClosed = new AtomicBoolean(false);
    LearnerSender sender = null;
    protected kme leaderIs;
    protected lme leaderOs;
    protected int leaderProtocolVersion = 1;
    private static final int BUFFERED_MESSAGE_SIZE = 10;
    protected final MessageTracker messageTracker = new MessageTracker(10);
    protected static final foe LOG = goe.a(Learner.class);
    private static final int leaderConnectDelayDuringRetryMs = Integer.getInteger("zookeeper.leaderConnectDelayDuringRetryMs", 100);
    private static final boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true");
    public static final String LEARNER_ASYNC_SENDING = "zookeeper.learner.asyncSending";
    private static boolean asyncSending = Boolean.parseBoolean(ConfigUtils.getPropertyBackwardCompatibleWay("zookeeper.learner.asyncSending"));
    public static final String LEARNER_CLOSE_SOCKET_ASYNC = "zookeeper.learner.closeSocketAsync";
    public static final boolean closeSocketAsync = Boolean.parseBoolean(ConfigUtils.getPropertyBackwardCompatibleWay("zookeeper.learner.closeSocketAsync"));
    final ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new ConcurrentHashMap();

    public Socket getSocket() {
        return this.sock;
    }

    public int getPendingRevalidationsCount() {
        return this.pendingRevalidations.size();
    }

    protected static void setAsyncSending(boolean bl2) {
        asyncSending = bl2;
        LOG.c("{} = {}", (Object)LEARNER_ASYNC_SENDING, (Object)asyncSending);
    }

    protected static boolean getAsyncSending() {
        return asyncSending;
    }

    void validateSession(ServerCnxn serverCnxn, long l2, int n2) throws IOException {
        LOG.c("Revalidating client: 0x{}", (Object)Long.toHexString(l2));
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
        dataOutputStream.writeLong(l2);
        dataOutputStream.writeInt(n2);
        dataOutputStream.close();
        QuorumPacket quorumPacket = new QuorumPacket(6, -1L, byteArrayOutputStream.toByteArray(), null);
        this.pendingRevalidations.put(l2, serverCnxn);
        if (LOG.b()) {
            ZooTrace.logTraceMessage(LOG, 32L, "To validate session 0x" + Long.toHexString(l2));
        }
        this.writePacket(quorumPacket, true);
    }

    void writePacket(QuorumPacket quorumPacket, boolean bl2) throws IOException {
        if (asyncSending) {
            this.sender.queuePacket(quorumPacket);
        } else {
            this.writePacketNow(quorumPacket, bl2);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void writePacketNow(QuorumPacket quorumPacket, boolean bl2) throws IOException {
        lme lme2 = this.leaderOs;
        synchronized (lme2) {
            if (quorumPacket != null) {
                this.messageTracker.trackSent(quorumPacket.getType());
                this.leaderOs.a(quorumPacket, "packet");
            }
            if (bl2) {
                this.bufferedOutput.flush();
            }
        }
    }

    protected void startSendingThread() {
        this.sender = new LearnerSender(this);
        this.sender.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void readPacket(QuorumPacket quorumPacket) throws IOException {
        kme kme2 = this.leaderIs;
        synchronized (kme2) {
            this.leaderIs.a(quorumPacket, "packet");
            this.messageTracker.trackReceived(quorumPacket.getType());
        }
        if (LOG.b()) {
            long l2 = quorumPacket.getType() == 5 ? 128L : 16L;
            ZooTrace.logQuorumPacket(LOG, l2, 'i', quorumPacket);
        }
    }

    void request(Request request) throws IOException {
        if (request.isThrottled()) {
            LOG.e("Throttled request sent to leader: {}. Exiting", (Object)request);
            ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
        }
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
        dataOutputStream.writeLong(request.sessionId);
        dataOutputStream.writeInt(request.cxid);
        dataOutputStream.writeInt(request.type);
        byte[] byArray = request.readRequestBytes();
        if (byArray != null) {
            dataOutputStream.write(byArray);
        }
        dataOutputStream.close();
        QuorumPacket quorumPacket = new QuorumPacket(1, -1L, byteArrayOutputStream.toByteArray(), request.authInfo);
        this.writePacket(quorumPacket, true);
    }

    protected QuorumPeer.QuorumServer findLeader() {
        QuorumPeer.QuorumServer quorumServer = null;
        Vote vote = this.self.getCurrentVote();
        for (QuorumPeer.QuorumServer quorumServer2 : this.self.getView().values()) {
            if (quorumServer2.id != vote.getId()) continue;
            quorumServer2.recreateSocketAddresses();
            quorumServer = quorumServer2;
            break;
        }
        if (quorumServer == null) {
            LOG.d("Couldn't find the leader with id = {}", (Object)vote.getId());
        }
        return quorumServer;
    }

    protected long nanoTime() {
        return System.nanoTime();
    }

    protected void sockConnect(Socket socket, InetSocketAddress inetSocketAddress, int n2) throws IOException {
        socket.connect(inetSocketAddress, n2);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void connectToLeader(MultipleAddresses multipleAddresses, String string) throws IOException {
        this.leaderAddr = multipleAddresses;
        Set<InetSocketAddress> set = this.self.isMultiAddressReachabilityCheckEnabled() ? multipleAddresses.getAllReachableAddressesOrAll() : multipleAddresses.getAllAddresses();
        ExecutorService executorService = Executors.newFixedThreadPool(set.size());
        CountDownLatch countDownLatch = new CountDownLatch(set.size());
        AtomicReference<Object> atomicReference = new AtomicReference<Object>(null);
        set.stream().map(inetSocketAddress -> new LeaderConnector((InetSocketAddress)inetSocketAddress, (AtomicReference<Socket>)atomicReference, countDownLatch)).forEach(executorService::submit);
        try {
            countDownLatch.await();
        }
        catch (InterruptedException interruptedException) {
            LOG.c("Interrupted while trying to connect to Leader", interruptedException);
        }
        finally {
            executorService.shutdown();
            try {
                if (!executorService.awaitTermination(1L, TimeUnit.SECONDS)) {
                    LOG.e("not all the LeaderConnector terminated properly");
                }
            }
            catch (InterruptedException interruptedException) {
                LOG.d("Interrupted while terminating LeaderConnector executor.", interruptedException);
            }
        }
        if (atomicReference.get() == null) {
            throw new IOException("Failed connect to " + multipleAddresses);
        }
        this.sock = atomicReference.get();
        this.sockBeingClosed.set(false);
        this.self.authLearner.authenticate(this.sock, string);
        this.leaderIs = hme.a(new BufferedInputStream(this.sock.getInputStream()));
        this.bufferedOutput = new BufferedOutputStream(this.sock.getOutputStream());
        this.leaderOs = ime.a(this.bufferedOutput);
        if (asyncSending) {
            this.startSendingThread();
        }
    }

    protected Socket createSocket() throws X509Exception, IOException {
        Socket socket = this.self.isSslQuorum() ? this.self.getX509Util().createSSLSocket() : new Socket();
        socket.setSoTimeout(this.self.tickTime * this.self.initLimit);
        return socket;
    }

    protected long registerWithLeader(int n2) throws IOException {
        long l2 = this.self.getLastLoggedZxid();
        QuorumPacket quorumPacket = new QuorumPacket();
        quorumPacket.setType(n2);
        quorumPacket.setZxid(ZxidUtils.makeZxid(this.self.getAcceptedEpoch(), 0L));
        LearnerInfo learnerInfo = new LearnerInfo(this.self.getMyId(), 65536, this.self.getQuorumVerifier().getVersion());
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        ime ime2 = ime.a(byteArrayOutputStream);
        ime2.a(learnerInfo, "LearnerInfo");
        quorumPacket.setData(byteArrayOutputStream.toByteArray());
        this.writePacket(quorumPacket, true);
        this.readPacket(quorumPacket);
        long l3 = ZxidUtils.getEpochFromZxid(quorumPacket.getZxid());
        if (quorumPacket.getType() == 17) {
            this.leaderProtocolVersion = ByteBuffer.wrap(quorumPacket.getData()).getInt();
            byte[] byArray = new byte[4];
            ByteBuffer byteBuffer = ByteBuffer.wrap(byArray);
            if (l3 > this.self.getAcceptedEpoch()) {
                byteBuffer.putInt((int)this.self.getCurrentEpoch());
                this.self.setAcceptedEpoch(l3);
            } else if (l3 == this.self.getAcceptedEpoch()) {
                byteBuffer.putInt(-1);
            } else {
                throw new IOException("Leaders epoch, " + l3 + " is less than accepted epoch, " + this.self.getAcceptedEpoch());
            }
            QuorumPacket quorumPacket2 = new QuorumPacket(18, l2, byArray, null);
            this.writePacket(quorumPacket2, true);
            return ZxidUtils.makeZxid(l3, 0L);
        }
        if (l3 > this.self.getAcceptedEpoch()) {
            this.self.setAcceptedEpoch(l3);
        }
        if (quorumPacket.getType() != 10) {
            LOG.e("First packet should have been NEWLEADER");
            throw new IOException("First packet should have been NEWLEADER");
        }
        return quorumPacket.getZxid();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    protected void syncWithLeader(long l2) throws Exception {
        QuorumPacket quorumPacket = new QuorumPacket(3, 0L, null, null);
        QuorumPacket quorumPacket2 = new QuorumPacket();
        long l3 = ZxidUtils.getEpochFromZxid(l2);
        QuorumVerifier quorumVerifier = null;
        boolean bl2 = true;
        boolean bl3 = false;
        this.readPacket(quorumPacket2);
        ArrayDeque<Long> arrayDeque = new ArrayDeque<Long>();
        ArrayDeque<Object> arrayDeque2 = new ArrayDeque<Object>();
        ArrayDeque<Request> arrayDeque3 = new ArrayDeque<Request>();
        Object object = this.zk;
        synchronized (object) {
            boolean bl4;
            if (quorumPacket2.getType() == 13) {
                LOG.c("Getting a diff from the leader 0x{}", (Object)Long.toHexString(quorumPacket2.getZxid()));
                this.self.setSyncMode(QuorumPeer.SyncMode.DIFF);
                if (this.zk.shouldForceWriteInitialSnapshotAfterLeaderElection()) {
                    LOG.c("Forcing a snapshot write as part of upgrading from an older Zookeeper. This should only happen while upgrading.");
                    bl2 = true;
                    bl3 = true;
                } else {
                    bl2 = false;
                }
            } else if (quorumPacket2.getType() == 15) {
                String string;
                this.self.setSyncMode(QuorumPeer.SyncMode.SNAP);
                LOG.c("Getting a snapshot from leader 0x{}", (Object)Long.toHexString(quorumPacket2.getZxid()));
                this.zk.getZKDatabase().deserializeSnapshot(this.leaderIs);
                if (!this.self.isReconfigEnabled()) {
                    LOG.b("Reset config node content from local config after deserialization of snapshot.");
                    this.zk.getZKDatabase().initConfigInZKDatabase(this.self.getQuorumVerifier());
                }
                if (!(string = this.leaderIs.e("signature")).equals("BenWasHere")) {
                    LOG.e("Missing signature. Got {}", (Object)string);
                    throw new IOException("Missing signature");
                }
                this.zk.getZKDatabase().setlastProcessedZxid(quorumPacket2.getZxid());
                bl3 = true;
            } else if (quorumPacket2.getType() == 14) {
                this.self.setSyncMode(QuorumPeer.SyncMode.TRUNC);
                LOG.d("Truncating log to get in sync with the leader 0x{}", (Object)Long.toHexString(quorumPacket2.getZxid()));
                boolean bl5 = this.zk.getZKDatabase().truncateLog(quorumPacket2.getZxid());
                if (!bl5) {
                    LOG.e("Not able to truncate the log 0x{}", (Object)Long.toHexString(quorumPacket2.getZxid()));
                    ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue());
                }
                this.zk.getZKDatabase().setlastProcessedZxid(quorumPacket2.getZxid());
            } else {
                LOG.e("Got unexpected packet from leader: {}, exiting ... ", (Object)LearnerHandler.packetToString(quorumPacket2));
                ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue());
            }
            this.zk.getZKDatabase().initConfigInZKDatabase(this.self.getQuorumVerifier());
            this.zk.createSessionTracker();
            long l4 = 0L;
            boolean bl6 = true;
            boolean bl7 = bl4 = !bl2;
            block11: while (this.self.isRunning()) {
                this.readPacket(quorumPacket2);
                switch (quorumPacket2.getType()) {
                    case 2: {
                        Object object2;
                        PacketInFlight packetInFlight = new PacketInFlight();
                        TxnLogEntry txnLogEntry = SerializeUtils.deserializeTxn(quorumPacket2.getData());
                        packetInFlight.hdr = txnLogEntry.getHeader();
                        packetInFlight.rec = txnLogEntry.getTxn();
                        packetInFlight.digest = txnLogEntry.getDigest();
                        if (packetInFlight.hdr.getZxid() != l4 + 1L) {
                            LOG.d("Got zxid 0x{} expected 0x{}", (Object)Long.toHexString(packetInFlight.hdr.getZxid()), (Object)Long.toHexString(l4 + 1L));
                        }
                        l4 = packetInFlight.hdr.getZxid();
                        if (packetInFlight.hdr.getType() == 16) {
                            object2 = (SetDataTxn)packetInFlight.rec;
                            QuorumVerifier quorumVerifier2 = this.self.configFromString(new String(((SetDataTxn)object2).getData(), StandardCharsets.UTF_8));
                            this.self.setLastSeenQuorumVerifier(quorumVerifier2, true);
                        }
                        arrayDeque2.add(packetInFlight);
                        break;
                    }
                    case 4: 
                    case 9: {
                        boolean bl8;
                        Object object2;
                        PacketInFlight packetInFlight = (PacketInFlight)arrayDeque2.peekFirst();
                        if (packetInFlight.hdr.getZxid() == quorumPacket2.getZxid() && quorumPacket2.getType() == 9 && (bl8 = this.self.processReconfig((QuorumVerifier)(object2 = this.self.configFromString(new String(((SetDataTxn)packetInFlight.rec).getData(), StandardCharsets.UTF_8))), ByteBuffer.wrap(quorumPacket2.getData()).getLong(), quorumPacket2.getZxid(), true))) {
                            throw new Exception("changes proposed in reconfig");
                        }
                        if (!bl4) {
                            if (packetInFlight.hdr.getZxid() != quorumPacket2.getZxid()) {
                                LOG.d("Committing 0x{}, but next proposal is 0x{}", (Object)Long.toHexString(quorumPacket2.getZxid()), (Object)Long.toHexString(packetInFlight.hdr.getZxid()));
                                break;
                            }
                            this.zk.processTxn(packetInFlight.hdr, packetInFlight.rec);
                            arrayDeque2.remove();
                            break;
                        }
                        arrayDeque.add(quorumPacket2.getZxid());
                        break;
                    }
                    case 8: 
                    case 19: {
                        Object object3;
                        Object object4;
                        TxnLogEntry txnLogEntry;
                        Object object2 = new PacketInFlight();
                        if (quorumPacket2.getType() == 19) {
                            ByteBuffer byteBuffer = ByteBuffer.wrap(quorumPacket2.getData());
                            long l5 = byteBuffer.getLong();
                            object4 = new byte[byteBuffer.remaining()];
                            byteBuffer.get((byte[])object4);
                            txnLogEntry = SerializeUtils.deserializeTxn((byte[])object4);
                            ((PacketInFlight)object2).hdr = txnLogEntry.getHeader();
                            ((PacketInFlight)object2).rec = txnLogEntry.getTxn();
                            ((PacketInFlight)object2).digest = txnLogEntry.getDigest();
                            object3 = this.self.configFromString(new String(((SetDataTxn)((PacketInFlight)object2).rec).getData(), StandardCharsets.UTF_8));
                            boolean bl9 = this.self.processReconfig((QuorumVerifier)object3, l5, quorumPacket2.getZxid(), true);
                            if (bl9) {
                                throw new Exception("changes proposed in reconfig");
                            }
                        } else {
                            txnLogEntry = SerializeUtils.deserializeTxn(quorumPacket2.getData());
                            ((PacketInFlight)object2).rec = txnLogEntry.getTxn();
                            ((PacketInFlight)object2).hdr = txnLogEntry.getHeader();
                            ((PacketInFlight)object2).digest = txnLogEntry.getDigest();
                            if (((PacketInFlight)object2).hdr.getZxid() != l4 + 1L) {
                                LOG.d("Got zxid 0x{} expected 0x{}", (Object)Long.toHexString(((PacketInFlight)object2).hdr.getZxid()), (Object)Long.toHexString(l4 + 1L));
                            }
                            l4 = ((PacketInFlight)object2).hdr.getZxid();
                        }
                        if (!bl4) {
                            this.zk.processTxn(((PacketInFlight)object2).hdr, ((PacketInFlight)object2).rec);
                            break;
                        }
                        arrayDeque2.add(object2);
                        arrayDeque.add(quorumPacket2.getZxid());
                        break;
                    }
                    case 12: {
                        boolean bl10;
                        LOG.c("Learner received UPTODATE message");
                        if (quorumVerifier != null && (bl10 = this.self.processReconfig(quorumVerifier, null, null, true))) {
                            throw new Exception("changes proposed in reconfig");
                        }
                        if (bl6) {
                            this.zk.takeSnapshot(bl3);
                            this.self.setCurrentEpoch(l3);
                        }
                        this.self.setZooKeeperServer(this.zk);
                        this.self.adminServer.setZooKeeperServer(this.zk);
                        break block11;
                    }
                    case 10: {
                        Object object3;
                        Object object4;
                        LOG.c("Learner received NEWLEADER message");
                        if (quorumPacket2.getData() != null && quorumPacket2.getData().length > 1) {
                            try {
                                QuorumVerifier quorumVerifier3 = this.self.configFromString(new String(quorumPacket2.getData(), StandardCharsets.UTF_8));
                                this.self.setLastSeenQuorumVerifier(quorumVerifier3, true);
                                quorumVerifier = quorumVerifier3;
                            }
                            catch (Exception exception) {
                                exception.printStackTrace();
                            }
                        }
                        if (bl2) {
                            this.zk.takeSnapshot(bl3);
                        }
                        bl4 = true;
                        bl6 = false;
                        this.sock.setSoTimeout(this.self.tickTime * this.self.syncLimit);
                        this.self.setSyncMode(QuorumPeer.SyncMode.NONE);
                        this.zk.startupWithoutServing();
                        if (this.zk instanceof FollowerZooKeeperServer) {
                            long l6 = Time.currentElapsedTime();
                            FollowerZooKeeperServer followerZooKeeperServer = (FollowerZooKeeperServer)this.zk;
                            object4 = arrayDeque2.iterator();
                            while (object4.hasNext()) {
                                object3 = (PacketInFlight)object4.next();
                                Request request = followerZooKeeperServer.appendRequest(((PacketInFlight)object3).hdr, ((PacketInFlight)object3).rec, ((PacketInFlight)object3).digest);
                                arrayDeque3.add(request);
                            }
                            followerZooKeeperServer.getZKDatabase().commit();
                            LOG.c("{} txns have been persisted and it took {}ms", (Object)arrayDeque2.size(), (Object)(Time.currentElapsedTime() - l6));
                            arrayDeque2.clear();
                        }
                        this.self.setCurrentEpoch(l3);
                        LOG.c("Set the current epoch to {}", (Object)l3);
                        this.writePacket(new QuorumPacket(3, l2, null, null), true);
                        LOG.c("Sent NEWLEADER ack to leader with zxid {}", (Object)Long.toHexString(l2));
                    }
                }
            }
        }
        quorumPacket.setZxid(ZxidUtils.makeZxid(l3, 0L));
        this.writePacket(quorumPacket, true);
        this.zk.startServing();
        this.self.updateElectionVote(l3);
        if (this.zk instanceof FollowerZooKeeperServer) {
            for (Request request : arrayDeque3) {
                QuorumPacket quorumPacket3 = new QuorumPacket(3, request.getHdr().getZxid(), null, null);
                this.writePacket(quorumPacket3, false);
            }
            this.writePacket(null, true);
            arrayDeque3.clear();
            object = (FollowerZooKeeperServer)this.zk;
            for (PacketInFlight packetInFlight : arrayDeque2) {
                ((FollowerZooKeeperServer)object).logRequest(packetInFlight.hdr, packetInFlight.rec, packetInFlight.digest);
            }
            LOG.c("{} txns have been logged asynchronously", (Object)arrayDeque2.size());
            Iterator iterator = arrayDeque.iterator();
            while (true) {
                if (!iterator.hasNext()) {
                    LOG.c("{} txns have been committed", (Object)arrayDeque.size());
                    return;
                }
                Long l7 = (Long)iterator.next();
                ((FollowerZooKeeperServer)object).commit(l7);
            }
        }
        if (!(this.zk instanceof ObserverZooKeeperServer)) throw new UnsupportedOperationException("Unknown server type");
        object = (ObserverZooKeeperServer)this.zk;
        Iterator iterator = arrayDeque2.iterator();
        while (iterator.hasNext()) {
            PacketInFlight packetInFlight = (PacketInFlight)iterator.next();
            Long l8 = (Long)arrayDeque.peekFirst();
            if (packetInFlight.hdr.getZxid() != l8.longValue()) {
                LOG.d("Committing 0x{}, but next proposal is 0x{}", (Object)Long.toHexString(l8), (Object)Long.toHexString(packetInFlight.hdr.getZxid()));
                continue;
            }
            arrayDeque.remove();
            Request request = new Request(packetInFlight.hdr.getClientId(), packetInFlight.hdr.getCxid(), packetInFlight.hdr.getType(), packetInFlight.hdr, packetInFlight.rec, -1L);
            request.setTxnDigest(packetInFlight.digest);
            ((ObserverZooKeeperServer)object).commitRequest(request);
        }
    }

    protected void revalidate(QuorumPacket quorumPacket) throws IOException {
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(quorumPacket.getData());
        DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream);
        long l2 = dataInputStream.readLong();
        boolean bl2 = dataInputStream.readBoolean();
        ServerCnxn serverCnxn = this.pendingRevalidations.remove(l2);
        if (serverCnxn == null) {
            LOG.d("Missing session 0x{} for validation", (Object)Long.toHexString(l2));
        } else {
            this.zk.finishSessionInit(serverCnxn, bl2);
        }
        if (LOG.b()) {
            ZooTrace.logTraceMessage(LOG, 32L, "Session 0x" + Long.toHexString(l2) + " is valid: " + bl2);
        }
    }

    protected void ping(QuorumPacket quorumPacket) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
        Map<Long, Integer> map = this.zk.getTouchSnapshot();
        for (Map.Entry<Long, Integer> entry : map.entrySet()) {
            dataOutputStream.writeLong(entry.getKey());
            dataOutputStream.writeInt(entry.getValue());
        }
        QuorumPacket quorumPacket2 = new QuorumPacket(quorumPacket.getType(), quorumPacket.getZxid(), byteArrayOutputStream.toByteArray(), quorumPacket.getAuthinfo());
        this.writePacket(quorumPacket2, true);
    }

    public void shutdown() {
        this.self.setZooKeeperServer(null);
        this.self.closeAllConnections();
        this.self.adminServer.setZooKeeperServer(null);
        if (this.sender != null) {
            this.sender.shutdown();
        }
        this.closeSocket();
        if (this.zk != null) {
            this.zk.shutdown(this.self.getSyncMode().equals((Object)QuorumPeer.SyncMode.SNAP));
        }
    }

    boolean isRunning() {
        return this.self.isRunning() && this.zk.isRunning();
    }

    void closeSocket() {
        if (this.sock != null && this.sockBeingClosed.compareAndSet(false, true)) {
            if (closeSocketAsync) {
                Thread thread = new Thread(() -> this.closeSockSync(), "CloseSocketThread(sid:" + this.zk.getServerId());
                thread.setDaemon(true);
                thread.start();
            } else {
                this.closeSockSync();
            }
        }
    }

    void closeSockSync() {
        try {
            long l2 = Time.currentElapsedTime();
            if (this.sock != null) {
                this.sock.close();
                this.sock = null;
            }
            ServerMetrics.getMetrics().SOCKET_CLOSING_TIME.add(Time.currentElapsedTime() - l2);
        }
        catch (IOException iOException) {
            LOG.c("Ignoring error closing connection to leader", iOException);
        }
    }

    static {
        LOG.c("leaderConnectDelayDuringRetryMs: {}", (Object)leaderConnectDelayDuringRetryMs);
        LOG.c("TCP NoDelay set to: {}", (Object)nodelay);
        LOG.c("{} = {}", (Object)LEARNER_ASYNC_SENDING, (Object)asyncSending);
        LOG.c("{} = {}", (Object)LEARNER_CLOSE_SOCKET_ASYNC, (Object)closeSocketAsync);
    }

    class LeaderConnector
    implements Runnable {
        private AtomicReference<Socket> socket;
        private InetSocketAddress address;
        private CountDownLatch latch;

        LeaderConnector(InetSocketAddress inetSocketAddress, AtomicReference<Socket> atomicReference, CountDownLatch countDownLatch) {
            this.address = inetSocketAddress;
            this.socket = atomicReference;
            this.latch = countDownLatch;
        }

        @Override
        public void run() {
            try {
                Thread.currentThread().setName("LeaderConnector-" + this.address);
                Socket socket = this.connectToLeader();
                if (socket != null && socket.isConnected()) {
                    if (this.socket.compareAndSet(null, socket)) {
                        LOG.c("Successfully connected to leader, using address: {}", (Object)this.address);
                    } else {
                        LOG.c("Connection to the leader is already established, close the redundant connection");
                        socket.close();
                    }
                }
            }
            catch (Exception exception) {
                LOG.e("Failed connect to {}", (Object)this.address, (Object)exception);
            }
            finally {
                this.latch.countDown();
            }
        }

        private Socket connectToLeader() throws IOException, X509Exception, InterruptedException {
            Socket socket = Learner.this.createSocket();
            int n2 = Learner.this.self.tickTime * Learner.this.self.initLimit;
            if (Learner.this.self.connectToLearnerMasterLimit > 0) {
                n2 = Learner.this.self.tickTime * Learner.this.self.connectToLearnerMasterLimit;
            }
            long l2 = Learner.this.nanoTime();
            for (int i2 = 0; i2 < 5 && this.socket.get() == null; ++i2) {
                int n3;
                try {
                    n3 = n2 - (int)((Learner.this.nanoTime() - l2) / 1000000L);
                    if (n3 <= 0) {
                        LOG.e("connectToLeader exceeded on retries.");
                        throw new IOException("connectToLeader exceeded on retries.");
                    }
                    Learner.this.sockConnect(socket, this.address, Math.min(n2, n3));
                    if (Learner.this.self.isSslQuorum()) {
                        ((SSLSocket)socket).startHandshake();
                    }
                    socket.setTcpNoDelay(nodelay);
                    break;
                }
                catch (IOException iOException) {
                    n3 = n2 - (int)((Learner.this.nanoTime() - l2) / 1000000L);
                    if (n3 <= leaderConnectDelayDuringRetryMs) {
                        LOG.e("Unexpected exception, connectToLeader exceeded. tries={}, remaining init limit={}, connecting to {}", i2, n3, this.address, iOException);
                        throw iOException;
                    }
                    if (i2 >= 4) {
                        LOG.e("Unexpected exception, retries exceeded. tries={}, remaining init limit={}, connecting to {}", i2, n3, this.address, iOException);
                        throw iOException;
                    }
                    LOG.d("Unexpected exception, tries={}, remaining init limit={}, connecting to {}", i2, n3, this.address, iOException);
                    socket = Learner.this.createSocket();
                    Thread.sleep(leaderConnectDelayDuringRetryMs);
                    continue;
                }
            }
            return socket;
        }
    }

    static class PacketInFlight {
        TxnHeader hdr;
        mme rec;
        TxnDigest digest;

        PacketInFlight() {
        }
    }
}

