/*
 * Decompiled with CFR 0.152.
 */
package XdepsXdatabricksX240X9088.org.apache.zookeeper;

import XcoreXdatabricksX240X9088.foe;
import XcoreXdatabricksX240X9088.goe;
import XdepsXdatabricksX240X9088.io.netty.handler.ssl.SslContext;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.ClientCnxn;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.ClientCnxnSocket;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.client.ZKClientConfig;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.common.ClientX509Util;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.common.NettyUtils;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.common.X509Exception;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.SSLException;

public class ClientCnxnSocketNetty
extends ClientCnxnSocket {
    private static final foe LOG = goe.a(ClientCnxnSocketNetty.class);
    private final EventLoopGroup eventLoopGroup;
    private Channel channel;
    private CountDownLatch firstConnect;
    private ChannelFuture connectFuture;
    private final Lock connectLock = new ReentrantLock();
    private final AtomicBoolean disconnected = new AtomicBoolean();
    private final AtomicBoolean needSasl = new AtomicBoolean();
    private final Semaphore waitSasl = new Semaphore(0);
    private static final AtomicReference<ByteBufAllocator> TEST_ALLOCATOR = new AtomicReference<Object>(null);
    private final GenericFutureListener<Future<Void>> onSendPktDoneListener = future -> {
        if (future.isSuccess()) {
            this.sentCount.getAndIncrement();
        }
    };

    ClientCnxnSocketNetty(ZKClientConfig zKClientConfig) throws IOException {
        this.clientConfig = zKClientConfig;
        this.eventLoopGroup = NettyUtils.newNioOrEpollEventLoopGroup(1);
        this.initProperties();
    }

    @Override
    boolean isConnected() {
        this.connectLock.lock();
        try {
            boolean bl2 = this.channel != null || this.connectFuture != null;
            return bl2;
        }
        finally {
            this.connectLock.unlock();
        }
    }

    private Bootstrap configureBootstrapAllocator(Bootstrap bootstrap) {
        ByteBufAllocator byteBufAllocator = TEST_ALLOCATOR.get();
        if (byteBufAllocator != null) {
            return (Bootstrap)bootstrap.option(ChannelOption.ALLOCATOR, (Object)byteBufAllocator);
        }
        return bootstrap;
    }

    @Override
    void connect(InetSocketAddress inetSocketAddress) throws IOException {
        this.firstConnect = new CountDownLatch(1);
        Bootstrap bootstrap = (Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().group(this.eventLoopGroup)).channel(NettyUtils.nioOrEpollSocketChannel())).option(ChannelOption.SO_LINGER, (Object)-1)).option(ChannelOption.TCP_NODELAY, (Object)true)).handler((ChannelHandler)new ZKClientPipelineFactory(inetSocketAddress.getHostString(), inetSocketAddress.getPort()));
        bootstrap = this.configureBootstrapAllocator(bootstrap);
        bootstrap.validate();
        this.connectLock.lock();
        try {
            this.connectFuture = bootstrap.connect((SocketAddress)inetSocketAddress);
            this.connectFuture.addListener((GenericFutureListener)new ChannelFutureListener(){

                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    boolean bl2 = false;
                    ClientCnxnSocketNetty.this.connectLock.lock();
                    try {
                        if (!channelFuture.isSuccess()) {
                            LOG.c("future isn't success.", channelFuture.cause());
                            return;
                        }
                        if (ClientCnxnSocketNetty.this.connectFuture == null) {
                            LOG.c("connect attempt cancelled");
                            channelFuture.channel().close();
                            return;
                        }
                        ClientCnxnSocketNetty.this.channel = channelFuture.channel();
                        ClientCnxnSocketNetty.this.disconnected.set(false);
                        ClientCnxnSocketNetty.this.initialized = false;
                        ClientCnxnSocketNetty.this.lenBuffer.clear();
                        ClientCnxnSocketNetty.this.incomingBuffer = ClientCnxnSocketNetty.this.lenBuffer;
                        ClientCnxnSocketNetty.this.sendThread.primeConnection();
                        ClientCnxnSocketNetty.this.updateNow();
                        ClientCnxnSocketNetty.this.updateLastSendAndHeard();
                        if (ClientCnxnSocketNetty.this.sendThread.tunnelAuthInProgress()) {
                            ClientCnxnSocketNetty.this.waitSasl.drainPermits();
                            ClientCnxnSocketNetty.this.needSasl.set(true);
                            ClientCnxnSocketNetty.this.sendPrimePacket();
                        } else {
                            ClientCnxnSocketNetty.this.needSasl.set(false);
                        }
                        bl2 = true;
                    }
                    finally {
                        ClientCnxnSocketNetty.this.connectFuture = null;
                        ClientCnxnSocketNetty.this.connectLock.unlock();
                        if (bl2) {
                            LOG.c("channel is connected: {}", (Object)channelFuture.channel());
                        }
                        ClientCnxnSocketNetty.this.wakeupCnxn();
                        ClientCnxnSocketNetty.this.firstConnect.countDown();
                    }
                }
            });
        }
        finally {
            this.connectLock.unlock();
        }
    }

    @Override
    void cleanup() {
        this.connectLock.lock();
        try {
            if (this.connectFuture != null) {
                this.connectFuture.cancel(false);
                this.connectFuture = null;
            }
            if (this.channel != null) {
                this.channel.close().syncUninterruptibly();
                this.channel = null;
            }
        }
        finally {
            this.connectLock.unlock();
        }
        Iterator iterator = this.outgoingQueue.iterator();
        while (iterator.hasNext()) {
            ClientCnxn.Packet packet = (ClientCnxn.Packet)iterator.next();
            if (packet != WakeupPacket.getInstance()) continue;
            iterator.remove();
        }
    }

    @Override
    void close() {
        this.eventLoopGroup.shutdownGracefully();
    }

    @Override
    void saslCompleted() {
        this.needSasl.set(false);
        this.waitSasl.release();
    }

    @Override
    void connectionPrimed() {
    }

    @Override
    void packetAdded() {
    }

    @Override
    void onClosing() {
        if (this.firstConnect != null) {
            this.firstConnect.countDown();
        }
        this.wakeupCnxn();
        LOG.c("channel is told closing");
    }

    private void wakeupCnxn() {
        if (this.needSasl.get()) {
            this.waitSasl.release();
        }
        if (this.outgoingQueue != null) {
            this.outgoingQueue.add(WakeupPacket.getInstance());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    void doTransport(int n2, Queue<ClientCnxn.Packet> queue, ClientCnxn clientCnxn) throws IOException, InterruptedException {
        try {
            if (!this.firstConnect.await(n2, TimeUnit.MILLISECONDS)) {
                return;
            }
            ClientCnxn.Packet packet = null;
            if (this.needSasl.get()) {
                if (!this.waitSasl.tryAcquire(n2, TimeUnit.MILLISECONDS)) {
                    return;
                }
            } else {
                packet = (ClientCnxn.Packet)this.outgoingQueue.poll(n2, TimeUnit.MILLISECONDS);
            }
            if (!this.sendThread.getZkState().isAlive()) {
                this.addBack(packet);
                return;
            }
            if (this.disconnected.get()) {
                this.addBack(packet);
                throw new ClientCnxn.EndOfStreamException("channel for sessionid 0x" + Long.toHexString(this.sessionId) + " is lost");
            }
            if (packet != null) {
                this.doWrite(queue, packet, clientCnxn);
            }
        }
        finally {
            this.updateNow();
        }
    }

    private void addBack(ClientCnxn.Packet packet) {
        if (packet != null && packet != WakeupPacket.getInstance()) {
            this.outgoingQueue.addFirst(packet);
        }
    }

    private ChannelFuture sendPktAndFlush(ClientCnxn.Packet packet) throws IOException {
        return this.sendPkt(packet, true);
    }

    private ChannelFuture sendPktOnly(ClientCnxn.Packet packet) throws IOException {
        return this.sendPkt(packet, false);
    }

    private ChannelFuture sendPkt(ClientCnxn.Packet packet, boolean bl2) throws IOException {
        if (this.channel == null) {
            throw new IOException("channel has been closed");
        }
        packet.createBB();
        this.updateLastSend();
        ByteBuf byteBuf = Unpooled.wrappedBuffer((ByteBuffer)packet.bb);
        ChannelFuture channelFuture = bl2 ? this.channel.writeAndFlush((Object)byteBuf) : this.channel.write((Object)byteBuf);
        channelFuture.addListener(this.onSendPktDoneListener);
        return channelFuture;
    }

    private void sendPrimePacket() throws IOException {
        this.sendPktAndFlush((ClientCnxn.Packet)this.outgoingQueue.remove());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doWrite(Queue<ClientCnxn.Packet> queue, ClientCnxn.Packet packet, ClientCnxn clientCnxn) throws IOException {
        this.updateNow();
        boolean bl2 = false;
        while (true) {
            if (packet != WakeupPacket.getInstance()) {
                if (packet.requestHeader != null && packet.requestHeader.getType() != 11 && packet.requestHeader.getType() != 100) {
                    packet.requestHeader.setXid(clientCnxn.getXid());
                    Queue<ClientCnxn.Packet> queue2 = queue;
                    synchronized (queue2) {
                        queue.add(packet);
                    }
                }
                this.sendPktOnly(packet);
                bl2 = true;
            }
            if (this.outgoingQueue.isEmpty()) break;
            packet = (ClientCnxn.Packet)this.outgoingQueue.remove();
        }
        if (bl2) {
            this.channel.flush();
        }
    }

    @Override
    void sendPacket(ClientCnxn.Packet packet) throws IOException {
        this.sendPktAndFlush(packet);
    }

    @Override
    SocketAddress getRemoteSocketAddress() {
        Channel channel = this.channel;
        return channel == null ? null : channel.remoteAddress();
    }

    @Override
    SocketAddress getLocalSocketAddress() {
        Channel channel = this.channel;
        return channel == null ? null : channel.localAddress();
    }

    @Override
    void testableCloseSocket() throws IOException {
        Channel channel = this.channel;
        if (channel != null) {
            channel.disconnect().awaitUninterruptibly();
        }
    }

    static void setTestAllocator(ByteBufAllocator byteBufAllocator) {
        TEST_ALLOCATOR.set(byteBufAllocator);
    }

    static void clearTestAllocator() {
        TEST_ALLOCATOR.set(null);
    }

    class ZKClientHandler
    extends SimpleChannelInboundHandler<ByteBuf> {
        AtomicBoolean channelClosed = new AtomicBoolean(false);

        private ZKClientHandler() {
        }

        public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
            LOG.c("channel is disconnected: {}", (Object)channelHandlerContext.channel());
            this.cleanup();
        }

        private void cleanup() {
            if (!this.channelClosed.compareAndSet(false, true)) {
                return;
            }
            ClientCnxnSocketNetty.this.disconnected.set(true);
            ClientCnxnSocketNetty.this.onClosing();
        }

        protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
            ClientCnxnSocketNetty.this.updateNow();
            while (byteBuf.isReadable()) {
                if (ClientCnxnSocketNetty.this.incomingBuffer.remaining() > byteBuf.readableBytes()) {
                    int n2 = ClientCnxnSocketNetty.this.incomingBuffer.position() + byteBuf.readableBytes();
                    ClientCnxnSocketNetty.this.incomingBuffer.limit(n2);
                }
                byteBuf.readBytes(ClientCnxnSocketNetty.this.incomingBuffer);
                ClientCnxnSocketNetty.this.incomingBuffer.limit(ClientCnxnSocketNetty.this.incomingBuffer.capacity());
                if (ClientCnxnSocketNetty.this.incomingBuffer.hasRemaining()) continue;
                ClientCnxnSocketNetty.this.incomingBuffer.flip();
                if (ClientCnxnSocketNetty.this.incomingBuffer == ClientCnxnSocketNetty.this.lenBuffer) {
                    ClientCnxnSocketNetty.this.recvCount.getAndIncrement();
                    ClientCnxnSocketNetty.this.readLength();
                    continue;
                }
                if (!ClientCnxnSocketNetty.this.initialized) {
                    ClientCnxnSocketNetty.this.readConnectResult();
                    ClientCnxnSocketNetty.this.lenBuffer.clear();
                    ClientCnxnSocketNetty.this.incomingBuffer = ClientCnxnSocketNetty.this.lenBuffer;
                    ClientCnxnSocketNetty.this.initialized = true;
                    ClientCnxnSocketNetty.this.updateLastHeard();
                    continue;
                }
                ClientCnxnSocketNetty.this.sendThread.readResponse(ClientCnxnSocketNetty.this.incomingBuffer);
                ClientCnxnSocketNetty.this.lenBuffer.clear();
                ClientCnxnSocketNetty.this.incomingBuffer = ClientCnxnSocketNetty.this.lenBuffer;
                ClientCnxnSocketNetty.this.updateLastHeard();
            }
            ClientCnxnSocketNetty.this.wakeupCnxn();
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) {
            LOG.d("Unexpected throwable", throwable);
            this.cleanup();
        }
    }

    class ZKClientPipelineFactory
    extends ChannelInitializer<SocketChannel> {
        private SslContext sslContext = null;
        private final String host;
        private final int port;

        private ZKClientPipelineFactory(String string, int n2) {
            this.host = string;
            this.port = n2;
        }

        protected void initChannel(SocketChannel socketChannel) throws Exception {
            ChannelPipeline channelPipeline = socketChannel.pipeline();
            if (ClientCnxnSocketNetty.this.clientConfig.getBoolean("zookeeper.client.secure")) {
                this.initSSL(channelPipeline);
            }
            channelPipeline.addLast("handler", (ChannelHandler)new ZKClientHandler());
        }

        private synchronized void initSSL(ChannelPipeline channelPipeline) throws X509Exception.KeyManagerException, X509Exception.TrustManagerException, SSLException {
            if (this.sslContext == null) {
                try (ClientX509Util clientX509Util = new ClientX509Util();){
                    this.sslContext = clientX509Util.createNettySslContextForClient(ClientCnxnSocketNetty.this.clientConfig);
                }
            }
            channelPipeline.addLast("ssl", (ChannelHandler)this.sslContext.newHandler(channelPipeline.channel().alloc(), this.host, this.port));
            LOG.c("SSL handler added for channel: {}", (Object)channelPipeline.channel());
        }
    }

    static class WakeupPacket {
        private static final ClientCnxn.Packet instance = new ClientCnxn.Packet(null, null, null, null, null);

        protected WakeupPacket() {
        }

        public static ClientCnxn.Packet getInstance() {
            return instance;
        }
    }
}

