/*
 * Decompiled with CFR 0.152.
 */
package XdepsXdatabricksX240X9088.org.apache.zookeeper.server;

import XcoreXdatabricksX240X9088.foe;
import XcoreXdatabricksX240X9088.goe;
import XcoreXdatabricksX240X9088.hme;
import XcoreXdatabricksX240X9088.kme;
import XcoreXdatabricksX240X9088.mme;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.KeeperException;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.WatchedEvent;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.data.ACL;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.data.Id;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.data.Stat;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.proto.ConnectRequest;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.proto.ReplyHeader;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.proto.RequestHeader;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.proto.WatcherEvent;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ByteBufferInputStream;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ClientCnxnLimitException;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.NettyServerCnxnFactory;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.RequestRecord;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ServerCnxn;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ServerCnxnFactory;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ServerMetrics;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ServerStats;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ZooKeeperSaslServer;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ZooKeeperServer;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ZooTrace;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.command.CommandExecutor;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.command.FourLetterCommands;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.command.NopCommand;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.command.SetTraceMaskCommand;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Writer;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.cert.Certificate;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

public class NettyServerCnxn
extends ServerCnxn {
    private static final foe LOG = goe.a(NettyServerCnxn.class);
    private final Channel channel;
    private CompositeByteBuf queuedBuffer;
    private final AtomicBoolean throttled = new AtomicBoolean(false);
    private ByteBuffer bb;
    private final ByteBuffer bbLen = ByteBuffer.allocate(4);
    private long sessionId;
    private int sessionTimeout;
    private Certificate[] clientChain;
    private volatile boolean closingChannel;
    private final NettyServerCnxnFactory factory;
    private boolean initialized;
    public int readIssuedAfterReadComplete;
    private volatile HandshakeState handshakeState = HandshakeState.NONE;
    private final GenericFutureListener<Future<Void>> onSendBufferDoneListener = future -> {
        if (future.isSuccess()) {
            this.packetSent();
        }
    };

    NettyServerCnxn(Channel channel, ZooKeeperServer zooKeeperServer, NettyServerCnxnFactory nettyServerCnxnFactory) {
        super(zooKeeperServer);
        this.channel = channel;
        this.closingChannel = false;
        this.factory = nettyServerCnxnFactory;
        if (this.factory.login != null) {
            this.zooKeeperSaslServer = new ZooKeeperSaslServer(nettyServerCnxnFactory.login);
        }
        InetAddress inetAddress = ((InetSocketAddress)channel.remoteAddress()).getAddress();
        this.addAuthInfo(new Id("ip", inetAddress.getHostAddress()));
    }

    @Override
    public void close(ServerCnxn.DisconnectReason disconnectReason) {
        this.disconnectReason = disconnectReason;
        this.close();
    }

    public void close() {
        this.closingChannel = true;
        LOG.b("close called for session id: 0x{}", (Object)Long.toHexString(this.sessionId));
        this.setStale();
        this.factory.unregisterConnection(this);
        if (!this.factory.cnxns.remove(this)) {
            LOG.b("cnxns size:{}", (Object)this.factory.cnxns.size());
            if (this.channel.isOpen()) {
                this.channel.close();
            }
            return;
        }
        LOG.b("close in progress for session id: 0x{}", (Object)Long.toHexString(this.sessionId));
        this.factory.removeCnxnFromSessionMap(this);
        this.factory.removeCnxnFromIpMap(this, ((InetSocketAddress)this.channel.remoteAddress()).getAddress());
        if (this.zkServer != null) {
            this.zkServer.removeCnxn(this);
        }
        if (this.channel.isOpen()) {
            this.channel.writeAndFlush((Object)Unpooled.EMPTY_BUFFER).addListener((GenericFutureListener)new ChannelFutureListener(){

                public void operationComplete(ChannelFuture channelFuture) {
                    channelFuture.channel().close().addListener(future -> NettyServerCnxn.this.releaseQueuedBuffer());
                }
            });
        } else {
            ServerMetrics.getMetrics().CONNECTION_DROP_COUNT.add(1L);
            this.channel.eventLoop().execute(this::releaseQueuedBuffer);
        }
    }

    @Override
    public long getSessionId() {
        return this.sessionId;
    }

    @Override
    public int getSessionTimeout() {
        return this.sessionTimeout;
    }

    @Override
    public void process(WatchedEvent watchedEvent, List<ACL> list) {
        try {
            this.zkServer.checkACL(this, list, 1, this.getAuthInfo(), watchedEvent.getPath(), null);
        }
        catch (KeeperException.NoAuthException noAuthException) {
            if (LOG.b()) {
                ZooTrace.logTraceMessage(LOG, 64L, "Not delivering event " + watchedEvent + " to 0x" + Long.toHexString(this.sessionId) + " (filtered by ACL)");
            }
            return;
        }
        ReplyHeader replyHeader = new ReplyHeader(-1, watchedEvent.getZxid(), 0);
        if (LOG.b()) {
            ZooTrace.logTraceMessage(LOG, 64L, "Deliver event " + watchedEvent + " to 0x" + Long.toHexString(this.sessionId) + " through " + this);
        }
        WatcherEvent watcherEvent = watchedEvent.getWrapper();
        try {
            int n2 = this.sendResponse(replyHeader, watcherEvent, "notification");
            ServerMetrics.getMetrics().WATCH_BYTES.add(n2);
        }
        catch (IOException iOException) {
            LOG.b("Problem sending to {}", (Object)this.getRemoteSocketAddress(), (Object)iOException);
            this.close();
        }
    }

    @Override
    public int sendResponse(ReplyHeader replyHeader, mme mme2, String string, String string2, Stat stat, int n2) throws IOException {
        if (this.closingChannel || !this.channel.isOpen()) {
            return 0;
        }
        ByteBuffer[] byteBufferArray = this.serialize(replyHeader, mme2, string, string2, stat, n2);
        int n3 = byteBufferArray[0].getInt();
        byteBufferArray[0].rewind();
        this.sendBuffer(byteBufferArray);
        this.decrOutstandingAndCheckThrottle(replyHeader);
        return n3;
    }

    @Override
    public void setSessionId(long l2) {
        this.sessionId = l2;
        this.factory.addSession(l2, this);
    }

    @Override
    public void sendBuffer(ByteBuffer ... byteBufferArray) {
        if (byteBufferArray.length == 1 && byteBufferArray[0] == ServerCnxnFactory.closeConn) {
            this.close(ServerCnxn.DisconnectReason.CLIENT_CLOSED_CONNECTION);
            return;
        }
        this.channel.writeAndFlush((Object)Unpooled.wrappedBuffer((ByteBuffer[])byteBufferArray)).addListener(this.onSendBufferDoneListener);
    }

    private boolean checkFourLetterWord(Channel channel, ByteBuf byteBuf, int n2) {
        if (!FourLetterCommands.isKnown(n2)) {
            return false;
        }
        String string = FourLetterCommands.getCommandString(n2);
        channel.config().setAutoRead(false);
        this.packetReceived(4L);
        PrintWriter printWriter = new PrintWriter(new BufferedWriter(new SendBufferWriter()));
        if (!FourLetterCommands.isEnabled(string)) {
            LOG.b("Command {} is not executed because it is not in the whitelist.", (Object)string);
            NopCommand nopCommand = new NopCommand(printWriter, this, string + " is not executed because it is not in the whitelist.");
            nopCommand.start();
            return true;
        }
        LOG.c("Processing {} command from {}", (Object)string, (Object)channel.remoteAddress());
        if (n2 == FourLetterCommands.setTraceMaskCmd) {
            ByteBuffer byteBuffer = ByteBuffer.allocate(8);
            byteBuf.readBytes(byteBuffer);
            byteBuffer.flip();
            long l2 = byteBuffer.getLong();
            ZooTrace.setTextTraceLevel(l2);
            SetTraceMaskCommand setTraceMaskCommand = new SetTraceMaskCommand(printWriter, this, l2);
            setTraceMaskCommand.start();
            return true;
        }
        CommandExecutor commandExecutor = new CommandExecutor();
        return commandExecutor.execute(this, printWriter, n2, this.zkServer, this.factory);
    }

    private void checkIsInEventLoop(String string) {
        if (!this.channel.eventLoop().inEventLoop()) {
            throw new IllegalStateException(string + "() called from non-EventLoop thread");
        }
    }

    private void appendToQueuedBuffer(ByteBuf byteBuf) {
        this.checkIsInEventLoop("appendToQueuedBuffer");
        if (this.queuedBuffer.numComponents() == this.queuedBuffer.maxNumComponents()) {
            this.queuedBuffer.consolidate();
        }
        this.queuedBuffer.addComponent(true, byteBuf);
        ServerMetrics.getMetrics().NETTY_QUEUED_BUFFER.add(this.queuedBuffer.capacity());
    }

    void processMessage(ByteBuf byteBuf) {
        this.checkIsInEventLoop("processMessage");
        LOG.b("0x{} queuedBuffer: {}", (Object)Long.toHexString(this.sessionId), (Object)this.queuedBuffer);
        if (LOG.b()) {
            LOG.a("0x{} buf {}", (Object)Long.toHexString(this.sessionId), (Object)ByteBufUtil.hexDump((ByteBuf)byteBuf));
        }
        if (this.throttled.get()) {
            LOG.b("Received message while throttled");
            if (this.queuedBuffer == null) {
                LOG.b("allocating queue");
                this.queuedBuffer = this.channel.alloc().compositeBuffer();
            }
            this.appendToQueuedBuffer(byteBuf.retainedDuplicate());
            if (LOG.b()) {
                LOG.a("0x{} queuedBuffer {}", (Object)Long.toHexString(this.sessionId), (Object)ByteBufUtil.hexDump((ByteBuf)this.queuedBuffer));
            }
        } else {
            LOG.b("not throttled");
            if (this.queuedBuffer != null) {
                this.appendToQueuedBuffer(byteBuf.retainedDuplicate());
                this.processQueuedBuffer();
            } else {
                this.receiveMessage(byteBuf);
                if (!this.closingChannel && byteBuf.isReadable()) {
                    if (LOG.b()) {
                        LOG.a("Before copy {}", (Object)byteBuf);
                    }
                    if (this.queuedBuffer == null) {
                        this.queuedBuffer = this.channel.alloc().compositeBuffer();
                    }
                    this.appendToQueuedBuffer(byteBuf.retainedSlice(byteBuf.readerIndex(), byteBuf.readableBytes()));
                    if (LOG.b()) {
                        LOG.a("Copy is {}", (Object)this.queuedBuffer);
                        LOG.a("0x{} queuedBuffer {}", (Object)Long.toHexString(this.sessionId), (Object)ByteBufUtil.hexDump((ByteBuf)this.queuedBuffer));
                    }
                }
            }
        }
    }

    void processQueuedBuffer() {
        this.checkIsInEventLoop("processQueuedBuffer");
        if (this.queuedBuffer != null) {
            if (LOG.b()) {
                LOG.a("processing queue 0x{} queuedBuffer {}", (Object)Long.toHexString(this.sessionId), (Object)ByteBufUtil.hexDump((ByteBuf)this.queuedBuffer));
            }
            this.receiveMessage((ByteBuf)this.queuedBuffer);
            if (this.closingChannel) {
                LOG.b("Processed queue - channel closed, dropping remaining bytes");
            } else if (!this.queuedBuffer.isReadable()) {
                LOG.b("Processed queue - no bytes remaining");
                this.releaseQueuedBuffer();
            } else {
                LOG.b("Processed queue - bytes remaining");
                this.queuedBuffer.discardReadComponents();
            }
        } else {
            LOG.b("queue empty");
        }
    }

    private void releaseQueuedBuffer() {
        this.checkIsInEventLoop("releaseQueuedBuffer");
        if (this.queuedBuffer != null) {
            this.queuedBuffer.release();
            this.queuedBuffer = null;
        }
    }

    private void receiveMessage(ByteBuf byteBuf) {
        this.checkIsInEventLoop("receiveMessage");
        try {
            while (byteBuf.isReadable() && !this.throttled.get()) {
                Object object;
                if (this.bb != null) {
                    Object object2;
                    Object object3;
                    if (LOG.b()) {
                        LOG.a("message readable {} bb len {} {}", byteBuf.readableBytes(), this.bb.remaining(), this.bb);
                        object3 = this.bb.duplicate();
                        ((ByteBuffer)object3).flip();
                        LOG.a("0x{} bb {}", (Object)Long.toHexString(this.sessionId), (Object)ByteBufUtil.hexDump((ByteBuf)Unpooled.wrappedBuffer((ByteBuffer)object3)));
                    }
                    if (this.bb.remaining() > byteBuf.readableBytes()) {
                        int n2 = this.bb.position() + byteBuf.readableBytes();
                        this.bb.limit(n2);
                    }
                    byteBuf.readBytes(this.bb);
                    this.bb.limit(this.bb.capacity());
                    if (LOG.b()) {
                        LOG.a("after readBytes message readable {} bb len {} {}", byteBuf.readableBytes(), this.bb.remaining(), this.bb);
                        object3 = this.bb.duplicate();
                        ((ByteBuffer)object3).flip();
                        LOG.a("after readbytes 0x{} bb {}", (Object)Long.toHexString(this.sessionId), (Object)ByteBufUtil.hexDump((ByteBuf)Unpooled.wrappedBuffer((ByteBuffer)object3)));
                    }
                    if (this.bb.remaining() != 0) continue;
                    this.bb.flip();
                    this.packetReceived(4 + this.bb.remaining());
                    object3 = this.zkServer;
                    if (object3 == null || !((ZooKeeperServer)object3).isRunning()) {
                        throw new IOException("ZK down");
                    }
                    if (this.initialized) {
                        object = new RequestHeader();
                        ByteBufferInputStream.byteBuffer2Record(this.bb, (mme)object);
                        object2 = RequestRecord.fromBytes(this.bb.slice());
                        ((ZooKeeperServer)object3).processPacket(this, (RequestHeader)object, (RequestRecord)object2);
                    } else {
                        LOG.b("got conn req request from {}", (Object)this.getRemoteSocketAddress());
                        object = hme.a(new ByteBufferInputStream(this.bb));
                        object2 = this.protocolManager.deserializeConnectRequest((kme)object);
                        ((ZooKeeperServer)object3).processConnectRequest(this, (ConnectRequest)object2);
                        this.initialized = true;
                    }
                    this.bb = null;
                    continue;
                }
                if (LOG.b()) {
                    LOG.a("message readable {} bblenrem {}", (Object)byteBuf.readableBytes(), (Object)this.bbLen.remaining());
                    ByteBuffer byteBuffer = this.bbLen.duplicate();
                    byteBuffer.flip();
                    LOG.a("0x{} bbLen {}", (Object)Long.toHexString(this.sessionId), (Object)ByteBufUtil.hexDump((ByteBuf)Unpooled.wrappedBuffer((ByteBuffer)byteBuffer)));
                }
                if (byteBuf.readableBytes() < this.bbLen.remaining()) {
                    this.bbLen.limit(this.bbLen.position() + byteBuf.readableBytes());
                }
                byteBuf.readBytes(this.bbLen);
                this.bbLen.limit(this.bbLen.capacity());
                if (this.bbLen.remaining() != 0) continue;
                this.bbLen.flip();
                if (LOG.b()) {
                    LOG.a("0x{} bbLen {}", (Object)Long.toHexString(this.sessionId), (Object)ByteBufUtil.hexDump((ByteBuf)Unpooled.wrappedBuffer((ByteBuffer)this.bbLen)));
                }
                int n3 = this.bbLen.getInt();
                if (LOG.b()) {
                    LOG.a("0x{} bbLen len is {}", (Object)Long.toHexString(this.sessionId), (Object)n3);
                }
                this.bbLen.clear();
                if (!this.initialized && this.checkFourLetterWord(this.channel, byteBuf, n3)) {
                    return;
                }
                if (n3 < 0 || n3 > hme.a) {
                    throw new IOException("Len error " + n3);
                }
                object = this.zkServer;
                if (object == null || !((ZooKeeperServer)object).isRunning()) {
                    LOG.c("Closing connection to {} because the server is not ready", (Object)this.getRemoteSocketAddress());
                    this.close(ServerCnxn.DisconnectReason.IO_EXCEPTION);
                    return;
                }
                ((ZooKeeperServer)object).checkRequestSizeWhenReceivingMessage(n3);
                this.bb = ByteBuffer.allocate(n3);
            }
        }
        catch (IOException iOException) {
            LOG.d("Closing connection to {}", (Object)this.getRemoteSocketAddress(), (Object)iOException);
            this.close(ServerCnxn.DisconnectReason.IO_EXCEPTION);
        }
        catch (ClientCnxnLimitException clientCnxnLimitException) {
            ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1L);
            LOG.b("Closing connection to {}", (Object)this.getRemoteSocketAddress(), (Object)clientCnxnLimitException);
            this.close(ServerCnxn.DisconnectReason.CLIENT_RATE_LIMIT);
        }
    }

    @Override
    public void disableRecv(boolean bl2) {
        if (this.throttled.compareAndSet(false, true)) {
            LOG.b("Throttling - disabling recv {}", (Object)this);
            this.channel.pipeline().fireUserEventTriggered((Object)ReadEvent.DISABLE);
        }
    }

    @Override
    public void enableRecv() {
        if (this.throttled.compareAndSet(true, false)) {
            LOG.b("Sending unthrottle event {}", (Object)this);
            this.channel.pipeline().fireUserEventTriggered((Object)ReadEvent.ENABLE);
        }
    }

    @Override
    public void setSessionTimeout(int n2) {
        this.sessionTimeout = n2;
    }

    @Override
    public int getInterestOps() {
        if (this.channel == null || !this.channel.isOpen()) {
            return 0;
        }
        int n2 = 0;
        if (!this.throttled.get()) {
            n2 |= 1;
        }
        if (!this.channel.isWritable()) {
            n2 |= 4;
        }
        return n2;
    }

    @Override
    public InetSocketAddress getRemoteSocketAddress() {
        return (InetSocketAddress)this.channel.remoteAddress();
    }

    @Override
    public void sendCloseSession() {
        this.sendBuffer(ServerCnxnFactory.closeConn);
    }

    @Override
    protected ServerStats serverStats() {
        if (this.zkServer == null) {
            return null;
        }
        return this.zkServer.serverStats();
    }

    @Override
    public boolean isSecure() {
        return this.factory.secure;
    }

    @Override
    public Certificate[] getClientCertificateChain() {
        if (this.clientChain == null) {
            return null;
        }
        return Arrays.copyOf(this.clientChain, this.clientChain.length);
    }

    @Override
    public void setClientCertificateChain(Certificate[] certificateArray) {
        this.clientChain = certificateArray == null ? null : Arrays.copyOf(certificateArray, certificateArray.length);
    }

    Channel getChannel() {
        return this.channel;
    }

    public int getQueuedReadableBytes() {
        this.checkIsInEventLoop("getQueuedReadableBytes");
        if (this.queuedBuffer != null) {
            return this.queuedBuffer.readableBytes();
        }
        return 0;
    }

    public void setHandshakeState(HandshakeState handshakeState) {
        this.handshakeState = handshakeState;
    }

    public HandshakeState getHandshakeState() {
        return this.handshakeState;
    }

    static enum ReadEvent {
        DISABLE,
        ENABLE;

    }

    class SendBufferWriter
    extends Writer {
        private StringBuffer sb = new StringBuffer();

        private SendBufferWriter() {
        }

        private void checkFlush(boolean bl2) {
            if (bl2 && this.sb.length() > 0 || this.sb.length() > 2048) {
                NettyServerCnxn.this.sendBuffer(ByteBuffer.wrap(this.sb.toString().getBytes(StandardCharsets.UTF_8)));
                this.sb.setLength(0);
            }
        }

        @Override
        public void close() throws IOException {
            if (this.sb == null) {
                return;
            }
            this.checkFlush(true);
            this.sb = null;
        }

        @Override
        public void flush() throws IOException {
            this.checkFlush(true);
        }

        @Override
        public void write(char[] cArray, int n2, int n3) throws IOException {
            this.sb.append(cArray, n2, n3);
            this.checkFlush(false);
        }
    }

    public static enum HandshakeState {
        NONE,
        STARTED,
        FINISHED;

    }
}

