/*
 * Decompiled with CFR 0.152.
 */
package XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum;

import XcoreXdatabricksX240X9088.foe;
import XcoreXdatabricksX240X9088.goe;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.common.NetUtils;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.common.X509Exception;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ExitCode;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ZooKeeperThread;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.MultipleAddresses;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.QuorumPeer;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.UnifiedServerSocket;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.auth.QuorumAuthLearner;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.util.ConfigUtils;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.util.CircularBlockingQueue;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.util.ServiceUtils;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.UnresolvedAddressException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.net.ssl.SSLSocket;

public class QuorumCnxManager {
    private static final foe LOG = goe.a(QuorumCnxManager.class);
    static final int RECV_CAPACITY = 100;
    static final int SEND_CAPACITY = 1;
    static final int PACKETMAXSIZE = 524288;
    private AtomicLong observerCounter = new AtomicLong(-1L);
    public static final long PROTOCOL_VERSION_V1 = -65536L;
    public static final long PROTOCOL_VERSION_V2 = -65535L;
    public static final int maxBuffer = 2048;
    private int cnxTO = 5000;
    final QuorumPeer self;
    final long mySid;
    final int socketTimeout;
    final Map<Long, QuorumPeer.QuorumServer> view;
    final boolean listenOnAllIPs;
    private ThreadPoolExecutor connectionExecutor;
    private final Set<Long> inprogressConnections = Collections.synchronizedSet(new HashSet());
    private QuorumAuthServer authServer;
    private QuorumAuthLearner authLearner;
    private boolean quorumSaslAuthEnabled;
    private AtomicInteger connectionThreadCnt = new AtomicInteger(0);
    final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
    final ConcurrentHashMap<Long, BlockingQueue<ByteBuffer>> queueSendMap;
    final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;
    public final BlockingQueue<Message> recvQueue;
    volatile boolean shutdown = false;
    public final Listener listener;
    private AtomicInteger threadCnt = new AtomicInteger(0);
    private final boolean tcpKeepAlive = Boolean.getBoolean("zookeeper.tcpKeepAlive");
    static final Supplier<Socket> DEFAULT_SOCKET_FACTORY = () -> new Socket();
    private static Supplier<Socket> SOCKET_FACTORY = DEFAULT_SOCKET_FACTORY;

    static void setSocketFactory(Supplier<Socket> supplier) {
        SOCKET_FACTORY = supplier;
    }

    public QuorumCnxManager(QuorumPeer quorumPeer, long l2, Map<Long, QuorumPeer.QuorumServer> map, QuorumAuthServer quorumAuthServer, QuorumAuthLearner quorumAuthLearner, int n2, boolean bl2, int n3, boolean bl3) {
        this.recvQueue = new CircularBlockingQueue<Message>(100);
        this.queueSendMap = new ConcurrentHashMap();
        this.senderWorkerMap = new ConcurrentHashMap();
        this.lastMessageSent = new ConcurrentHashMap();
        String string = System.getProperty("zookeeper.cnxTimeout");
        if (string != null) {
            this.cnxTO = Integer.parseInt(string);
        }
        this.self = quorumPeer;
        this.mySid = l2;
        this.socketTimeout = n2;
        this.view = map;
        this.listenOnAllIPs = bl2;
        this.authServer = quorumAuthServer;
        this.authLearner = quorumAuthLearner;
        this.quorumSaslAuthEnabled = bl3;
        this.initializeConnectionExecutor(l2, n3);
        this.listener = new Listener();
        this.listener.setName("QuorumPeerListener");
    }

    private void initializeConnectionExecutor(long l2, int n2) {
        AtomicInteger atomicInteger = new AtomicInteger(1);
        SecurityManager securityManager = System.getSecurityManager();
        ThreadGroup threadGroup = securityManager != null ? securityManager.getThreadGroup() : Thread.currentThread().getThreadGroup();
        ThreadFactory threadFactory = runnable -> new Thread(threadGroup, runnable, String.format("QuorumConnectionThread-[myid=%d]-%d", l2, atomicInteger.getAndIncrement()));
        this.connectionExecutor = new ThreadPoolExecutor(3, n2, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory);
        this.connectionExecutor.allowCoreThreadTimeOut(true);
    }

    public void testInitiateConnection(long l2) {
        LOG.b("Opening channel to server {}", (Object)l2);
        this.initiateConnection(this.self.getVotingView().get((Object)Long.valueOf((long)l2)).electionAddr, l2);
    }

    public void initiateConnection(MultipleAddresses multipleAddresses, Long l2) {
        Socket socket = null;
        try {
            LOG.b("Opening channel to server {}", (Object)l2);
            socket = this.self.isSslQuorum() ? this.self.getX509Util().createSSLSocket() : SOCKET_FACTORY.get();
            this.setSockOpts(socket);
            socket.connect(multipleAddresses.getReachableOrOne(), this.cnxTO);
            if (socket instanceof SSLSocket) {
                SSLSocket sSLSocket = (SSLSocket)socket;
                sSLSocket.startHandshake();
                LOG.c("SSL handshake complete with {} - {} - {}", sSLSocket.getRemoteSocketAddress(), sSLSocket.getSession().getProtocol(), sSLSocket.getSession().getCipherSuite());
            }
            LOG.b("Connected to server {} using election address: {}:{}", l2, socket.getInetAddress(), socket.getPort());
        }
        catch (X509Exception x509Exception) {
            LOG.d("Cannot open secure channel to {} at election address {}", l2, multipleAddresses, x509Exception);
            this.closeSocket(socket);
            return;
        }
        catch (IOException | UnresolvedAddressException exception) {
            LOG.d("Cannot open channel to {} at election address {}", l2, multipleAddresses, exception);
            this.closeSocket(socket);
            return;
        }
        try {
            this.startConnection(socket, l2);
        }
        catch (IOException iOException) {
            LOG.e("Exception while connecting, id: {}, addr: {}, closing learner connection", l2, socket.getRemoteSocketAddress(), iOException);
            this.closeSocket(socket);
        }
    }

    public boolean initiateConnectionAsync(MultipleAddresses multipleAddresses, Long l2) {
        if (!this.inprogressConnections.add(l2)) {
            LOG.b("Connection request to server id: {} is already in progress, so skipping this request", (Object)l2);
            return true;
        }
        try {
            this.connectionExecutor.execute(new QuorumConnectionReqThread(multipleAddresses, l2));
            this.connectionThreadCnt.incrementAndGet();
        }
        catch (Throwable throwable) {
            this.inprogressConnections.remove(l2);
            LOG.d("Exception while submitting quorum connection request", throwable);
            return false;
        }
        return true;
    }

    private boolean startConnection(Socket socket, Long l2) throws IOException {
        Object object;
        Object object2;
        DataOutputStream dataOutputStream = null;
        DataInputStream dataInputStream = null;
        LOG.b("startConnection (myId:{} --> sid:{})", (Object)this.self.getMyId(), (Object)l2);
        try {
            object2 = new BufferedOutputStream(socket.getOutputStream());
            dataOutputStream = new DataOutputStream((OutputStream)object2);
            long l3 = this.self.isMultiAddressEnabled() ? -65535L : -65536L;
            dataOutputStream.writeLong(l3);
            dataOutputStream.writeLong(this.self.getMyId());
            object = l3 == -65535L ? this.self.getElectionAddress().getAllAddresses() : Arrays.asList(this.self.getElectionAddress().getOne());
            String string = object.stream().map(NetUtils::formatInetAddr).collect(Collectors.joining("|"));
            byte[] byArray = string.getBytes();
            dataOutputStream.writeInt(byArray.length);
            dataOutputStream.write(byArray);
            dataOutputStream.flush();
            dataInputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
        }
        catch (IOException iOException) {
            LOG.c("Ignoring exception reading or writing challenge: ", iOException);
            this.closeSocket(socket);
            return false;
        }
        object2 = this.self.getVotingView().get(l2);
        if (object2 != null) {
            this.authLearner.authenticate(socket, ((QuorumPeer.QuorumServer)object2).hostname);
        }
        if (l2 <= this.self.getMyId()) {
            LOG.b("Have larger server identifier, so keeping the connection: (myId:{} --> sid:{})", (Object)this.self.getMyId(), (Object)l2);
            SendWorker sendWorker = new SendWorker(socket, l2);
            RecvWorker recvWorker = new RecvWorker(socket, dataInputStream, l2, sendWorker);
            sendWorker.setRecv(recvWorker);
            object = this.senderWorkerMap.get(l2);
            if (object != null) {
                ((SendWorker)object).finish();
            }
            this.senderWorkerMap.put(l2, sendWorker);
            this.queueSendMap.putIfAbsent(l2, new CircularBlockingQueue(1));
            sendWorker.start();
            recvWorker.start();
            return true;
        }
        LOG.c("Have smaller server identifier, so dropping the connection: (myId:{} --> sid:{})", (Object)this.self.getMyId(), (Object)l2);
        this.closeSocket(socket);
        return false;
    }

    public void receiveConnection(Socket socket) {
        DataInputStream dataInputStream = null;
        try {
            dataInputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
            LOG.b("Sync handling of connection request received from: {}", (Object)socket.getRemoteSocketAddress());
            this.handleConnection(socket, dataInputStream);
        }
        catch (IOException iOException) {
            LOG.e("Exception handling connection, addr: {}, closing server connection", (Object)socket.getRemoteSocketAddress());
            LOG.a("Exception details: ", iOException);
            this.closeSocket(socket);
        }
    }

    public void receiveConnectionAsync(Socket socket) {
        try {
            LOG.b("Async handling of connection request received from: {}", (Object)socket.getRemoteSocketAddress());
            this.connectionExecutor.execute(new QuorumConnectionReceiverThread(socket));
            this.connectionThreadCnt.incrementAndGet();
        }
        catch (Throwable throwable) {
            LOG.e("Exception handling connection, addr: {}, closing server connection", (Object)socket.getRemoteSocketAddress());
            LOG.a("Exception details: ", throwable);
            this.closeSocket(socket);
        }
    }

    private void handleConnection(Socket socket, DataInputStream dataInputStream) throws IOException {
        Object object;
        Long l2 = null;
        Long l3 = null;
        MultipleAddresses multipleAddresses = null;
        try {
            l3 = dataInputStream.readLong();
            if (l3 >= 0L) {
                l2 = l3;
            } else {
                try {
                    object = InitialMessage.parse(l3, dataInputStream);
                    l2 = ((InitialMessage)object).sid;
                    if (!((InitialMessage)object).electionAddr.isEmpty()) {
                        multipleAddresses = new MultipleAddresses(((InitialMessage)object).electionAddr, Duration.ofMillis(this.self.getMultiAddressReachabilityCheckTimeoutMs()));
                    }
                    LOG.b("Initial message parsed by {}: {}", (Object)this.self.getMyId(), (Object)((InitialMessage)object).toString());
                }
                catch (InitialMessage.InitialMessageException initialMessageException) {
                    LOG.d("Initial message parsing error!", initialMessageException);
                    this.closeSocket(socket);
                    return;
                }
            }
            if (l2 == Long.MAX_VALUE) {
                l2 = this.observerCounter.getAndDecrement();
                LOG.c("Setting arbitrary identifier to observer: {}", (Object)l2);
            }
        }
        catch (IOException iOException) {
            LOG.c("Exception reading or writing challenge", iOException);
            this.closeSocket(socket);
            return;
        }
        this.authServer.authenticate(socket, dataInputStream);
        if (l2 < this.self.getMyId()) {
            object = this.senderWorkerMap.get(l2);
            if (object != null) {
                ((SendWorker)object).finish();
            }
            LOG.b("Create new connection to server: {}", (Object)l2);
            this.closeSocket(socket);
            if (multipleAddresses != null) {
                this.connectOne(l2, multipleAddresses);
            } else {
                this.connectOne(l2);
            }
        } else if (l2.longValue() == this.self.getMyId()) {
            LOG.d("We got a connection request from a server with our own ID. This should be either a configuration error, or a bug.");
        } else {
            object = new SendWorker(socket, l2);
            RecvWorker recvWorker = new RecvWorker(socket, dataInputStream, l2, (SendWorker)object);
            ((SendWorker)object).setRecv(recvWorker);
            SendWorker sendWorker = this.senderWorkerMap.get(l2);
            if (sendWorker != null) {
                sendWorker.finish();
            }
            this.senderWorkerMap.put(l2, (SendWorker)object);
            this.queueSendMap.putIfAbsent(l2, new CircularBlockingQueue(1));
            ((Thread)object).start();
            recvWorker.start();
        }
    }

    public void toSend(Long l3, ByteBuffer byteBuffer) {
        if (this.mySid == l3) {
            byteBuffer.position(0);
            this.addToRecvQueue(new Message(byteBuffer.duplicate(), l3));
        } else {
            BlockingQueue blockingQueue = this.queueSendMap.computeIfAbsent(l3, l2 -> new CircularBlockingQueue(1));
            this.addToSendQueue(blockingQueue, byteBuffer);
            this.connectOne(l3);
        }
    }

    synchronized boolean connectOne(long l2, MultipleAddresses multipleAddresses) {
        if (this.senderWorkerMap.get(l2) != null) {
            LOG.b("There is a connection already for server {}", (Object)l2);
            if (this.self.isMultiAddressEnabled() && multipleAddresses.size() > 1 && this.self.isMultiAddressReachabilityCheckEnabled()) {
                this.senderWorkerMap.get(l2).asyncValidateIfSocketIsStillReachable();
            }
            return true;
        }
        return this.initiateConnectionAsync(multipleAddresses, l2);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    synchronized void connectOne(long l2) {
        if (this.senderWorkerMap.get(l2) != null) {
            LOG.b("There is a connection already for server {}", (Object)l2);
            if (this.self.isMultiAddressEnabled() && this.self.isMultiAddressReachabilityCheckEnabled()) {
                this.senderWorkerMap.get(l2).asyncValidateIfSocketIsStillReachable();
            }
            return;
        }
        Object object = this.self.QV_LOCK;
        synchronized (object) {
            boolean bl2 = false;
            this.self.recreateSocketAddresses(l2);
            Map<Long, QuorumPeer.QuorumServer> map = this.self.getView();
            QuorumVerifier quorumVerifier = this.self.getLastSeenQuorumVerifier();
            Map<Long, QuorumPeer.QuorumServer> map2 = quorumVerifier.getAllMembers();
            if (map.containsKey(l2)) {
                bl2 = true;
                LOG.b("Server {} knows {} already, it is in the lastCommittedView", (Object)this.self.getMyId(), (Object)l2);
                if (this.connectOne(l2, map.get((Object)Long.valueOf((long)l2)).electionAddr)) {
                    return;
                }
            }
            if (!(quorumVerifier == null || !map2.containsKey(l2) || bl2 && map2.get((Object)Long.valueOf((long)l2)).electionAddr.equals(map.get((Object)Long.valueOf((long)l2)).electionAddr))) {
                bl2 = true;
                LOG.b("Server {} knows {} already, it is in the lastProposedView", (Object)this.self.getMyId(), (Object)l2);
                if (this.connectOne(l2, map2.get((Object)Long.valueOf((long)l2)).electionAddr)) {
                    return;
                }
            }
            if (!bl2) {
                LOG.d("Invalid server id: {} ", (Object)l2);
            }
        }
    }

    public void connectAll() {
        Enumeration<Long> enumeration = this.queueSendMap.keys();
        while (enumeration.hasMoreElements()) {
            long l2 = enumeration.nextElement();
            this.connectOne(l2);
        }
    }

    boolean haveDelivered() {
        for (BlockingQueue<ByteBuffer> blockingQueue : this.queueSendMap.values()) {
            int n2 = blockingQueue.size();
            LOG.b("Queue size: {}", (Object)n2);
            if (n2 != 0) continue;
            return true;
        }
        return false;
    }

    public void halt() {
        this.shutdown = true;
        LOG.b("Halting listener");
        this.listener.halt();
        try {
            this.listener.join();
        }
        catch (InterruptedException interruptedException) {
            LOG.c("Got interrupted before joining the listener", interruptedException);
        }
        this.softHalt();
        if (this.connectionExecutor != null) {
            this.connectionExecutor.shutdown();
        }
        this.inprogressConnections.clear();
        this.resetConnectionThreadCount();
    }

    public void softHalt() {
        for (SendWorker sendWorker : this.senderWorkerMap.values()) {
            LOG.b("Server {} is soft-halting sender towards: {}", (Object)this.self.getMyId(), (Object)sendWorker);
            sendWorker.finish();
        }
    }

    private void setSockOpts(Socket socket) throws SocketException {
        socket.setTcpNoDelay(true);
        socket.setKeepAlive(this.tcpKeepAlive);
        socket.setSoTimeout(this.socketTimeout);
    }

    private void closeSocket(Socket socket) {
        if (socket == null) {
            return;
        }
        try {
            socket.close();
        }
        catch (IOException iOException) {
            LOG.d("Exception while closing", iOException);
        }
    }

    public long getThreadCount() {
        return this.threadCnt.get();
    }

    public long getConnectionThreadCount() {
        return this.connectionThreadCnt.get();
    }

    private void resetConnectionThreadCount() {
        this.connectionThreadCnt.set(0);
    }

    private void addToSendQueue(BlockingQueue<ByteBuffer> blockingQueue, ByteBuffer byteBuffer) {
        boolean bl2 = blockingQueue.offer(byteBuffer);
        if (!bl2) {
            throw new RuntimeException("Could not insert into receive queue");
        }
    }

    private boolean isSendQueueEmpty(BlockingQueue<ByteBuffer> blockingQueue) {
        return blockingQueue.isEmpty();
    }

    private ByteBuffer pollSendQueue(BlockingQueue<ByteBuffer> blockingQueue, long l2, TimeUnit timeUnit) throws InterruptedException {
        return blockingQueue.poll(l2, timeUnit);
    }

    public void addToRecvQueue(Message message) {
        boolean bl2 = this.recvQueue.offer(message);
        if (!bl2) {
            throw new RuntimeException("Could not insert into receive queue");
        }
    }

    public Message pollRecvQueue(long l2, TimeUnit timeUnit) throws InterruptedException {
        return this.recvQueue.poll(l2, timeUnit);
    }

    public boolean connectedToPeer(long l2) {
        return this.senderWorkerMap.get(l2) != null;
    }

    public boolean isReconfigEnabled() {
        return this.self.isReconfigEnabled();
    }

    class RecvWorker
    extends ZooKeeperThread {
        Long sid;
        Socket sock;
        volatile boolean running;
        final DataInputStream din;
        final SendWorker sw;

        RecvWorker(Socket socket, DataInputStream dataInputStream, Long l2, SendWorker sendWorker) {
            super("RecvWorker:" + l2);
            this.running = true;
            this.sid = l2;
            this.sock = socket;
            this.sw = sendWorker;
            this.din = dataInputStream;
            try {
                socket.setSoTimeout(0);
            }
            catch (IOException iOException) {
                LOG.e("Error while accessing socket for {}", (Object)l2, (Object)iOException);
                QuorumCnxManager.this.closeSocket(socket);
                this.running = false;
            }
        }

        synchronized boolean finish() {
            LOG.b("RecvWorker.finish called. sid: {}. myId: {}", (Object)this.sid, (Object)QuorumCnxManager.this.mySid);
            if (!this.running) {
                return this.running;
            }
            this.running = false;
            this.interrupt();
            QuorumCnxManager.this.threadCnt.decrementAndGet();
            return this.running;
        }

        @Override
        public void run() {
            QuorumCnxManager.this.threadCnt.incrementAndGet();
            try {
                LOG.b("RecvWorker thread towards {} started. myId: {}", (Object)this.sid, (Object)QuorumCnxManager.this.mySid);
                while (this.running && !QuorumCnxManager.this.shutdown && this.sock != null) {
                    int n2 = this.din.readInt();
                    if (n2 <= 0 || n2 > 524288) {
                        throw new IOException("Received packet with invalid packet: " + n2);
                    }
                    byte[] byArray = new byte[n2];
                    this.din.readFully(byArray, 0, n2);
                    QuorumCnxManager.this.addToRecvQueue(new Message(ByteBuffer.wrap(byArray), this.sid));
                }
            }
            catch (Exception exception) {
                LOG.d("Connection broken for id {}, my id = {}", this.sid, QuorumCnxManager.this.mySid, exception);
            }
            finally {
                LOG.d("Interrupting SendWorker thread from RecvWorker. sid: {}. myId: {}", (Object)this.sid, (Object)QuorumCnxManager.this.mySid);
                this.sw.finish();
                QuorumCnxManager.this.closeSocket(this.sock);
            }
        }
    }

    class SendWorker
    extends ZooKeeperThread {
        Long sid;
        Socket sock;
        RecvWorker recvWorker;
        volatile boolean running;
        DataOutputStream dout;
        AtomicBoolean ongoingAsyncValidation;

        SendWorker(Socket socket, Long l2) {
            super("SendWorker:" + l2);
            this.running = true;
            this.ongoingAsyncValidation = new AtomicBoolean(false);
            this.sid = l2;
            this.sock = socket;
            this.recvWorker = null;
            try {
                this.dout = new DataOutputStream(socket.getOutputStream());
            }
            catch (IOException iOException) {
                LOG.d("Unable to access socket output stream", iOException);
                QuorumCnxManager.this.closeSocket(socket);
                this.running = false;
            }
            LOG.b("Address of remote peer: {}", (Object)this.sid);
        }

        synchronized void setRecv(RecvWorker recvWorker) {
            this.recvWorker = recvWorker;
        }

        synchronized RecvWorker getRecvWorker() {
            return this.recvWorker;
        }

        synchronized boolean finish() {
            LOG.b("Calling SendWorker.finish for {}", (Object)this.sid);
            if (!this.running) {
                return this.running;
            }
            this.running = false;
            QuorumCnxManager.this.closeSocket(this.sock);
            this.interrupt();
            if (this.recvWorker != null) {
                this.recvWorker.finish();
            }
            LOG.b("Removing entry from senderWorkerMap sid={}", (Object)this.sid);
            QuorumCnxManager.this.senderWorkerMap.remove(this.sid, this);
            QuorumCnxManager.this.threadCnt.decrementAndGet();
            return this.running;
        }

        synchronized void send(ByteBuffer byteBuffer) throws IOException {
            byte[] byArray = new byte[byteBuffer.capacity()];
            try {
                byteBuffer.position(0);
                byteBuffer.get(byArray);
            }
            catch (BufferUnderflowException bufferUnderflowException) {
                LOG.d("BufferUnderflowException ", bufferUnderflowException);
                return;
            }
            this.dout.writeInt(byteBuffer.capacity());
            this.dout.write(byteBuffer.array());
            this.dout.flush();
        }

        @Override
        public void run() {
            Object object;
            Object object2;
            QuorumCnxManager.this.threadCnt.incrementAndGet();
            try {
                object2 = QuorumCnxManager.this.queueSendMap.get(this.sid);
                if ((object2 == null || QuorumCnxManager.this.isSendQueueEmpty(object2)) && (object = QuorumCnxManager.this.lastMessageSent.get(this.sid)) != null) {
                    LOG.b("Attempting to send lastMessage to sid={}", (Object)this.sid);
                    this.send((ByteBuffer)object);
                }
            }
            catch (IOException iOException) {
                LOG.d("Failed to send last message. Shutting down thread.", iOException);
                this.finish();
            }
            LOG.b("SendWorker thread started towards {}. myId: {}", (Object)this.sid, (Object)QuorumCnxManager.this.mySid);
            block6: while (true) {
                try {
                    while (this.running && !QuorumCnxManager.this.shutdown && this.sock != null) {
                        object2 = null;
                        try {
                            object = QuorumCnxManager.this.queueSendMap.get(this.sid);
                            if (object == null) {
                                LOG.e("No queue of incoming messages for server {}", (Object)this.sid);
                                break block6;
                            }
                            object2 = QuorumCnxManager.this.pollSendQueue((BlockingQueue)object, 1000L, TimeUnit.MILLISECONDS);
                            if (object2 == null) continue block6;
                            QuorumCnxManager.this.lastMessageSent.put(this.sid, (ByteBuffer)object2);
                            this.send((ByteBuffer)object2);
                            continue block6;
                        }
                        catch (InterruptedException interruptedException) {
                            LOG.c("Interrupted while waiting for message on queue", interruptedException);
                        }
                    }
                    break;
                }
                catch (Exception exception) {
                    LOG.d("Exception when using channel: for id {} my id = {}", this.sid, QuorumCnxManager.this.mySid, exception);
                    break;
                }
            }
            this.finish();
            LOG.d("Send worker leaving thread id {} my id = {}", (Object)this.sid, (Object)QuorumCnxManager.this.self.getMyId());
        }

        public void asyncValidateIfSocketIsStillReachable() {
            if (this.ongoingAsyncValidation.compareAndSet(false, true)) {
                new Thread(() -> {
                    LOG.b("validate if destination address is reachable for sid {}", (Object)this.sid);
                    if (this.sock != null) {
                        InetAddress inetAddress = this.sock.getInetAddress();
                        try {
                            if (inetAddress.isReachable(500)) {
                                LOG.b("destination address {} is reachable for sid {}", (Object)inetAddress.toString(), (Object)this.sid);
                                this.ongoingAsyncValidation.set(false);
                                return;
                            }
                        }
                        catch (IOException | NullPointerException exception) {
                            // empty catch block
                        }
                        LOG.d("destination address {} not reachable anymore, shutting down the SendWorker for sid {}", (Object)inetAddress.toString(), (Object)this.sid);
                        this.finish();
                    }
                }).start();
            } else {
                LOG.b("validation of destination address for sid {} is skipped (it is already running)", (Object)this.sid);
            }
        }
    }

    public class Listener
    extends ZooKeeperThread {
        private static final String ELECTION_PORT_BIND_RETRY = "zookeeper.electionPortBindRetry";
        private static final int DEFAULT_PORT_BIND_MAX_RETRY = 3;
        private final int portBindMaxRetry;
        private Runnable socketBindErrorHandler;
        private List<ListenerHandler> listenerHandlers;
        private final AtomicBoolean socketException;

        public Listener() {
            super("ListenerThread");
            this.socketBindErrorHandler = () -> ServiceUtils.requestSystemExit(ExitCode.UNABLE_TO_BIND_QUORUM_PORT.getValue());
            this.socketException = new AtomicBoolean(false);
            Integer n2 = Integer.getInteger(ELECTION_PORT_BIND_RETRY, 3);
            if (n2 >= 0) {
                LOG.c("Election port bind maximum retries is {}", n2 == 0 ? "infinite" : n2);
                this.portBindMaxRetry = n2;
            } else {
                LOG.c("'{}' contains invalid value: {}(must be >= 0). Use default value of {} instead.", ELECTION_PORT_BIND_RETRY, n2, 3);
                this.portBindMaxRetry = 3;
            }
        }

        void setSocketBindErrorHandler(Runnable runnable) {
            this.socketBindErrorHandler = runnable;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            if (!QuorumCnxManager.this.shutdown) {
                LOG.b("Listener thread started, myId: {}", (Object)QuorumCnxManager.this.self.getMyId());
                Set<InetSocketAddress> set = QuorumCnxManager.this.self.getQuorumListenOnAllIPs() ? QuorumCnxManager.this.self.getElectionAddress().getWildcardAddresses() : QuorumCnxManager.this.self.getElectionAddress().getAllAddresses();
                CountDownLatch countDownLatch = new CountDownLatch(set.size());
                this.listenerHandlers = set.stream().map(inetSocketAddress -> new ListenerHandler((InetSocketAddress)inetSocketAddress, QuorumCnxManager.this.self.shouldUsePortUnification(), QuorumCnxManager.this.self.isSslQuorum(), countDownLatch)).collect(Collectors.toList());
                ExecutorService executorService = Executors.newFixedThreadPool(set.size());
                try {
                    this.listenerHandlers.forEach(executorService::submit);
                }
                finally {
                    executorService.shutdown();
                }
                try {
                    countDownLatch.await();
                }
                catch (InterruptedException interruptedException) {
                    LOG.d("Interrupted while sleeping. Ignoring exception", interruptedException);
                }
                finally {
                    for (ListenerHandler listenerHandler : this.listenerHandlers) {
                        try {
                            listenerHandler.close();
                        }
                        catch (IOException iOException) {
                            LOG.a("Error closing server socket", iOException);
                        }
                    }
                }
            }
            LOG.c("Leaving listener");
            if (!QuorumCnxManager.this.shutdown) {
                LOG.e("As I'm leaving the listener thread, I won't be able to participate in leader election any longer: {}", (Object)QuorumCnxManager.this.self.getElectionAddress().getAllAddresses().stream().map(NetUtils::formatInetAddr).collect(Collectors.joining("|")));
                if (this.socketException.get()) {
                    this.socketBindErrorHandler.run();
                }
            }
        }

        void halt() {
            LOG.b("Halt called: Trying to close listeners");
            if (this.listenerHandlers != null) {
                LOG.b("Closing listener: {}", (Object)QuorumCnxManager.this.mySid);
                for (ListenerHandler listenerHandler : this.listenerHandlers) {
                    try {
                        listenerHandler.close();
                    }
                    catch (IOException iOException) {
                        LOG.c("Exception when shutting down listener: ", iOException);
                    }
                }
            }
        }

        class ListenerHandler
        implements Closeable,
        Runnable {
            private ServerSocket serverSocket;
            private InetSocketAddress address;
            private boolean portUnification;
            private boolean sslQuorum;
            private CountDownLatch latch;

            ListenerHandler(InetSocketAddress inetSocketAddress, boolean bl2, boolean bl3, CountDownLatch countDownLatch) {
                this.address = inetSocketAddress;
                this.portUnification = bl2;
                this.sslQuorum = bl3;
                this.latch = countDownLatch;
            }

            @Override
            public void run() {
                try {
                    Thread.currentThread().setName("ListenerHandler-" + this.address);
                    this.acceptConnections();
                    try {
                        this.close();
                    }
                    catch (IOException iOException) {
                        LOG.c("Exception when shutting down listener: ", iOException);
                    }
                }
                catch (Exception exception) {
                    LOG.d("Unexpected error ", exception);
                }
                finally {
                    this.latch.countDown();
                }
            }

            @Override
            public synchronized void close() throws IOException {
                if (this.serverSocket != null && !this.serverSocket.isClosed()) {
                    LOG.b("Trying to close listeners: {}", (Object)this.serverSocket);
                    this.serverSocket.close();
                }
            }

            private void acceptConnections() {
                int n2 = 0;
                Socket socket = null;
                while (!(QuorumCnxManager.this.shutdown || Listener.this.portBindMaxRetry != 0 && n2 >= Listener.this.portBindMaxRetry)) {
                    try {
                        this.serverSocket = this.createNewServerSocket();
                        LOG.c("{} is accepting connections now, my election bind port: {}", (Object)QuorumCnxManager.this.mySid, (Object)this.address.toString());
                        while (!QuorumCnxManager.this.shutdown) {
                            try {
                                socket = this.serverSocket.accept();
                                QuorumCnxManager.this.setSockOpts(socket);
                                LOG.c("Received connection request from {}", (Object)socket.getRemoteSocketAddress());
                                if (QuorumCnxManager.this.quorumSaslAuthEnabled) {
                                    QuorumCnxManager.this.receiveConnectionAsync(socket);
                                } else {
                                    QuorumCnxManager.this.receiveConnection(socket);
                                }
                                n2 = 0;
                            }
                            catch (SocketTimeoutException socketTimeoutException) {
                                LOG.d("The socket is listening for the election accepted and it timed out unexpectedly, but will retry.see ZOOKEEPER-2836");
                            }
                        }
                    }
                    catch (IOException iOException) {
                        if (QuorumCnxManager.this.shutdown) break;
                        LOG.e("Exception while listening to address {}", (Object)this.address, (Object)iOException);
                        if (iOException instanceof SocketException) {
                            Listener.this.socketException.set(true);
                        }
                        ++n2;
                        try {
                            this.close();
                            Thread.sleep(1000L);
                        }
                        catch (IOException iOException2) {
                            LOG.d("Error closing server socket", iOException2);
                        }
                        catch (InterruptedException interruptedException) {
                            LOG.d("Interrupted while sleeping. Ignoring exception", interruptedException);
                        }
                        QuorumCnxManager.this.closeSocket(socket);
                    }
                }
                if (!QuorumCnxManager.this.shutdown) {
                    LOG.e("Leaving listener thread for address {} after {} errors. Use {} property to increase retry count.", NetUtils.formatInetAddr(this.address), n2, Listener.ELECTION_PORT_BIND_RETRY);
                }
            }

            private ServerSocket createNewServerSocket() throws IOException {
                ServerSocket serverSocket;
                if (this.portUnification) {
                    LOG.c("Creating TLS-enabled quorum server socket");
                    serverSocket = new UnifiedServerSocket(QuorumCnxManager.this.self.getX509Util(), true);
                } else if (this.sslQuorum) {
                    LOG.c("Creating TLS-only quorum server socket");
                    serverSocket = new UnifiedServerSocket(QuorumCnxManager.this.self.getX509Util(), false);
                } else {
                    serverSocket = new ServerSocket();
                }
                serverSocket.setReuseAddress(true);
                this.address = new InetSocketAddress(this.address.getHostString(), this.address.getPort());
                serverSocket.bind(this.address);
                return serverSocket;
            }
        }
    }

    class QuorumConnectionReceiverThread
    extends ZooKeeperThread {
        private final Socket sock;

        QuorumConnectionReceiverThread(Socket socket) {
            super("QuorumConnectionReceiverThread-" + socket.getRemoteSocketAddress());
            this.sock = socket;
        }

        @Override
        public void run() {
            QuorumCnxManager.this.receiveConnection(this.sock);
        }
    }

    class QuorumConnectionReqThread
    extends ZooKeeperThread {
        final MultipleAddresses electionAddr;
        final Long sid;

        QuorumConnectionReqThread(MultipleAddresses multipleAddresses, Long l2) {
            super("QuorumConnectionReqThread-" + l2);
            this.electionAddr = multipleAddresses;
            this.sid = l2;
        }

        @Override
        public void run() {
            try {
                QuorumCnxManager.this.initiateConnection(this.electionAddr, this.sid);
            }
            finally {
                QuorumCnxManager.this.inprogressConnections.remove(this.sid);
            }
        }
    }

    public static class InitialMessage {
        public Long sid;
        public List<InetSocketAddress> electionAddr;

        InitialMessage(Long l2, List<InetSocketAddress> list) {
            this.sid = l2;
            this.electionAddr = list;
        }

        public static InitialMessage parse(Long l2, DataInputStream dataInputStream) throws InitialMessageException, IOException {
            if (l2 != -65536L && l2 != -65535L) {
                throw new InitialMessageException("Got unrecognized protocol version %s", l2);
            }
            Long l3 = dataInputStream.readLong();
            int n2 = dataInputStream.readInt();
            if (n2 <= 0 || n2 > 2048) {
                throw new InitialMessageException("Unreasonable buffer length: %s", n2);
            }
            byte[] byArray = new byte[n2];
            int n3 = dataInputStream.read(byArray);
            if (n3 != n2) {
                throw new InitialMessageException("Read only %s bytes out of %s sent by server %s", n3, n2, l3);
            }
            String[] stringArray = new String(byArray, StandardCharsets.UTF_8).split("\\|");
            ArrayList<InetSocketAddress> arrayList = new ArrayList<InetSocketAddress>(stringArray.length);
            for (String string : stringArray) {
                int n4;
                String[] stringArray2;
                try {
                    stringArray2 = ConfigUtils.getHostAndPort(string);
                }
                catch (QuorumPeerConfig.ConfigException configException) {
                    throw new InitialMessageException("Badly formed address: %s", string);
                }
                if (stringArray2.length != 2) {
                    throw new InitialMessageException("Badly formed address: %s", string);
                }
                try {
                    n4 = Integer.parseInt(stringArray2[1]);
                }
                catch (NumberFormatException numberFormatException) {
                    throw new InitialMessageException("Bad port number: %s", stringArray2[1]);
                }
                catch (ArrayIndexOutOfBoundsException arrayIndexOutOfBoundsException) {
                    throw new InitialMessageException("No port number in: %s", string);
                }
                if (InitialMessage.isWildcardAddress(stringArray2[0])) continue;
                arrayList.add(new InetSocketAddress(stringArray2[0], n4));
            }
            return new InitialMessage(l3, arrayList);
        }

        static boolean isWildcardAddress(String string) {
            try {
                return InetAddress.getByName(string).isAnyLocalAddress();
            }
            catch (UnknownHostException unknownHostException) {
                return false;
            }
        }

        public String toString() {
            return "InitialMessage{sid=" + this.sid + ", electionAddr=" + this.electionAddr + '}';
        }

        public static class InitialMessageException
        extends Exception {
            InitialMessageException(String string, Object ... objectArray) {
                super(String.format(string, objectArray));
            }
        }
    }

    public static class Message {
        ByteBuffer buffer;
        long sid;

        Message(ByteBuffer byteBuffer, long l2) {
            this.buffer = byteBuffer;
            this.sid = l2;
        }
    }
}

