/*
 * Decompiled with CFR 0.152.
 */
package XdepsXdatabricksX240X9088.org.apache.zookeeper;

import XcoreXdatabricksX240X9088.foe;
import XcoreXdatabricksX240X9088.goe;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.ClientCnxn;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.ClientCnxnSocket;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.client.ZKClientConfig;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.UnresolvedAddressException;
import java.nio.channels.UnsupportedAddressTypeException;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.LinkedBlockingDeque;

public class ClientCnxnSocketNIO
extends ClientCnxnSocket {
    private static final foe LOG = goe.a(ClientCnxnSocketNIO.class);
    private final Selector selector = Selector.open();
    private SelectionKey sockKey;
    private SocketAddress localSocketAddress;
    private SocketAddress remoteSocketAddress;

    ClientCnxnSocketNIO(ZKClientConfig zKClientConfig) throws IOException {
        this.clientConfig = zKClientConfig;
        this.initProperties();
    }

    @Override
    boolean isConnected() {
        return this.sockKey != null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void doIO(Queue<ClientCnxn.Packet> queue, ClientCnxn clientCnxn) throws InterruptedException, IOException {
        SocketChannel socketChannel = (SocketChannel)this.sockKey.channel();
        if (socketChannel == null) {
            throw new IOException("Socket is null!");
        }
        if (this.sockKey.isReadable()) {
            int n2 = socketChannel.read(this.incomingBuffer);
            if (n2 < 0) {
                throw new ClientCnxn.EndOfStreamException("Unable to read additional data from server sessionid 0x" + Long.toHexString(this.sessionId) + ", likely server has closed socket");
            }
            if (!this.incomingBuffer.hasRemaining()) {
                this.incomingBuffer.flip();
                if (this.incomingBuffer == this.lenBuffer) {
                    this.recvCount.getAndIncrement();
                    this.readLength();
                } else if (!this.initialized) {
                    this.readConnectResult();
                    this.enableRead();
                    if (this.findSendablePacket(this.outgoingQueue, this.sendThread.tunnelAuthInProgress()) != null) {
                        this.enableWrite();
                    }
                    this.lenBuffer.clear();
                    this.incomingBuffer = this.lenBuffer;
                    this.updateLastHeard();
                    this.initialized = true;
                } else {
                    this.sendThread.readResponse(this.incomingBuffer);
                    this.lenBuffer.clear();
                    this.incomingBuffer = this.lenBuffer;
                    this.updateLastHeard();
                }
            }
        }
        if (this.sockKey.isWritable()) {
            ClientCnxn.Packet packet = this.findSendablePacket(this.outgoingQueue, this.sendThread.tunnelAuthInProgress());
            if (packet != null) {
                this.updateLastSend();
                if (packet.bb == null) {
                    if (packet.requestHeader != null && packet.requestHeader.getType() != 11 && packet.requestHeader.getType() != 100) {
                        packet.requestHeader.setXid(clientCnxn.getXid());
                    }
                    packet.createBB();
                }
                socketChannel.write(packet.bb);
                if (!packet.bb.hasRemaining()) {
                    this.sentCount.getAndIncrement();
                    this.outgoingQueue.removeFirstOccurrence(packet);
                    if (packet.requestHeader != null && packet.requestHeader.getType() != 11 && packet.requestHeader.getType() != 100) {
                        Queue<ClientCnxn.Packet> queue2 = queue;
                        synchronized (queue2) {
                            queue.add(packet);
                        }
                    }
                }
            }
            if (this.outgoingQueue.isEmpty()) {
                this.disableWrite();
            } else if (!this.initialized && packet != null && !packet.bb.hasRemaining()) {
                this.disableWrite();
            } else {
                this.enableWrite();
            }
        }
    }

    private ClientCnxn.Packet findSendablePacket(LinkedBlockingDeque<ClientCnxn.Packet> linkedBlockingDeque, boolean bl2) {
        if (linkedBlockingDeque.isEmpty()) {
            return null;
        }
        if (linkedBlockingDeque.getFirst().bb != null || !bl2) {
            return linkedBlockingDeque.getFirst();
        }
        Iterator<ClientCnxn.Packet> iterator = linkedBlockingDeque.iterator();
        while (iterator.hasNext()) {
            ClientCnxn.Packet packet = iterator.next();
            if (packet.requestHeader == null) {
                iterator.remove();
                linkedBlockingDeque.addFirst(packet);
                return packet;
            }
            LOG.b("Deferring non-priming packet {} until SASL authentication completes.", (Object)packet);
        }
        return null;
    }

    @Override
    void cleanup() {
        if (this.sockKey != null) {
            SocketChannel socketChannel = (SocketChannel)this.sockKey.channel();
            this.sockKey.cancel();
            try {
                socketChannel.socket().shutdownInput();
            }
            catch (IOException iOException) {
                LOG.a("Ignoring exception during shutdown input", iOException);
            }
            try {
                socketChannel.socket().shutdownOutput();
            }
            catch (IOException iOException) {
                LOG.a("Ignoring exception during shutdown output", iOException);
            }
            try {
                socketChannel.socket().close();
            }
            catch (IOException iOException) {
                LOG.a("Ignoring exception during socket close", iOException);
            }
            try {
                socketChannel.close();
            }
            catch (IOException iOException) {
                LOG.a("Ignoring exception during channel close", iOException);
            }
        }
        try {
            Thread.sleep(100L);
        }
        catch (InterruptedException interruptedException) {
            LOG.b("SendThread interrupted during sleep, ignoring");
        }
        this.sockKey = null;
    }

    @Override
    void close() {
        try {
            if (LOG.b()) {
                LOG.a("Doing client selector close");
            }
            this.selector.close();
            if (LOG.b()) {
                LOG.a("Closed client selector");
            }
        }
        catch (IOException iOException) {
            LOG.c("Ignoring exception during selector close", iOException);
        }
    }

    SocketChannel createSock() throws IOException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        socketChannel.socket().setSoLinger(false, -1);
        socketChannel.socket().setTcpNoDelay(true);
        return socketChannel;
    }

    void registerAndConnect(SocketChannel socketChannel, InetSocketAddress inetSocketAddress) throws IOException {
        this.sockKey = socketChannel.register(this.selector, 8);
        boolean bl2 = socketChannel.connect(inetSocketAddress);
        if (bl2) {
            this.sendThread.primeConnection();
        }
    }

    @Override
    void connect(InetSocketAddress inetSocketAddress) throws IOException {
        SocketChannel socketChannel = this.createSock();
        try {
            this.registerAndConnect(socketChannel, inetSocketAddress);
        }
        catch (IOException | SecurityException | UnresolvedAddressException | UnsupportedAddressTypeException exception) {
            LOG.e("Unable to open socket to {}", (Object)inetSocketAddress);
            socketChannel.close();
            throw exception;
        }
        this.initialized = false;
        this.lenBuffer.clear();
        this.incomingBuffer = this.lenBuffer;
    }

    @Override
    SocketAddress getRemoteSocketAddress() {
        return this.remoteSocketAddress;
    }

    @Override
    SocketAddress getLocalSocketAddress() {
        return this.localSocketAddress;
    }

    private void updateSocketAddresses() {
        Socket socket = ((SocketChannel)this.sockKey.channel()).socket();
        this.localSocketAddress = socket.getLocalSocketAddress();
        this.remoteSocketAddress = socket.getRemoteSocketAddress();
    }

    @Override
    void packetAdded() {
        this.wakeupCnxn();
    }

    @Override
    void onClosing() {
        this.wakeupCnxn();
    }

    private synchronized void wakeupCnxn() {
        this.selector.wakeup();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    void doTransport(int n2, Queue<ClientCnxn.Packet> queue, ClientCnxn clientCnxn) throws IOException, InterruptedException {
        Set<SelectionKey> set;
        this.selector.select(n2);
        ClientCnxnSocketNIO clientCnxnSocketNIO = this;
        synchronized (clientCnxnSocketNIO) {
            set = this.selector.selectedKeys();
        }
        this.updateNow();
        for (SelectionKey selectionKey : set) {
            SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
            if (selectionKey.isConnectable()) {
                if (!socketChannel.finishConnect()) continue;
                this.updateLastSendAndHeard();
                this.updateSocketAddresses();
                this.sendThread.primeConnection();
                continue;
            }
            if (!selectionKey.isReadable() && !selectionKey.isWritable()) continue;
            this.doIO(queue, clientCnxn);
        }
        if (this.sendThread.getZkState().isConnected() && this.findSendablePacket(this.outgoingQueue, this.sendThread.tunnelAuthInProgress()) != null) {
            this.enableWrite();
        }
        set.clear();
    }

    @Override
    void testableCloseSocket() throws IOException {
        LOG.c("testableCloseSocket() called");
        SelectionKey selectionKey = this.sockKey;
        if (selectionKey != null) {
            ((SocketChannel)selectionKey.channel()).socket().close();
        }
    }

    @Override
    void saslCompleted() {
        this.enableWrite();
    }

    synchronized void enableWrite() {
        int n2 = this.sockKey.interestOps();
        if ((n2 & 4) == 0) {
            this.sockKey.interestOps(n2 | 4);
        }
    }

    private synchronized void disableWrite() {
        int n2 = this.sockKey.interestOps();
        if ((n2 & 4) != 0) {
            this.sockKey.interestOps(n2 & 0xFFFFFFFB);
        }
    }

    private synchronized void enableRead() {
        int n2 = this.sockKey.interestOps();
        if ((n2 & 1) == 0) {
            this.sockKey.interestOps(n2 | 1);
        }
    }

    @Override
    void connectionPrimed() {
        this.sockKey.interestOps(5);
    }

    Selector getSelector() {
        return this.selector;
    }

    @Override
    void sendPacket(ClientCnxn.Packet packet) throws IOException {
        SocketChannel socketChannel = (SocketChannel)this.sockKey.channel();
        if (socketChannel == null) {
            throw new IOException("Socket is null!");
        }
        packet.createBB();
        ByteBuffer byteBuffer = packet.bb;
        socketChannel.write(byteBuffer);
    }
}

