/*
 * Decompiled with CFR 0.152.
 */
package XdepsXdatabricksX240X9088.org.apache.zookeeper.server;

import XcoreXdatabricksX240X9088.foe;
import XcoreXdatabricksX240X9088.goe;
import XcoreXdatabricksX240X9088.hme;
import XcoreXdatabricksX240X9088.mme;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.KeeperException;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.WatchedEvent;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.data.ACL;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.data.Id;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.data.Stat;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.proto.ConnectRequest;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.proto.ReplyHeader;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.proto.RequestHeader;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.proto.WatcherEvent;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ByteBufferInputStream;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ClientCnxnLimitException;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.NIOServerCnxnFactory;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.RequestRecord;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ServerCnxn;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ServerCnxnFactory;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ServerMetrics;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ServerStats;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ZooKeeperSaslServer;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ZooKeeperServer;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ZooTrace;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.command.CommandExecutor;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.command.FourLetterCommands;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.command.NopCommand;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.command.SetTraceMaskCommand;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Writer;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.security.cert.Certificate;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

public class NIOServerCnxn
extends ServerCnxn {
    private static final foe LOG = goe.a(NIOServerCnxn.class);
    private final NIOServerCnxnFactory factory;
    private final SocketChannel sock;
    private final NIOServerCnxnFactory.SelectorThread selectorThread;
    private final SelectionKey sk;
    private boolean initialized;
    private final ByteBuffer lenBuffer;
    public ByteBuffer incomingBuffer;
    private final Queue<ByteBuffer> outgoingBuffers;
    private int sessionTimeout;
    private long sessionId;
    private final boolean clientTcpKeepAlive;
    private final AtomicBoolean selectable;
    private final AtomicBoolean throttled;
    private static final ByteBuffer packetSentinel = ByteBuffer.allocate(0);

    public NIOServerCnxn(ZooKeeperServer zooKeeperServer, SocketChannel socketChannel, SelectionKey selectionKey, NIOServerCnxnFactory nIOServerCnxnFactory, NIOServerCnxnFactory.SelectorThread selectorThread) throws IOException {
        super(zooKeeperServer);
        this.incomingBuffer = this.lenBuffer = ByteBuffer.allocate(4);
        this.outgoingBuffers = new LinkedBlockingQueue<ByteBuffer>();
        this.clientTcpKeepAlive = Boolean.getBoolean("zookeeper.clientTcpKeepAlive");
        this.selectable = new AtomicBoolean(true);
        this.throttled = new AtomicBoolean(false);
        this.sock = socketChannel;
        this.sk = selectionKey;
        this.factory = nIOServerCnxnFactory;
        this.selectorThread = selectorThread;
        if (this.factory.login != null) {
            this.zooKeeperSaslServer = new ZooKeeperSaslServer(nIOServerCnxnFactory.login);
        }
        socketChannel.socket().setTcpNoDelay(true);
        socketChannel.socket().setSoLinger(false, -1);
        socketChannel.socket().setKeepAlive(this.clientTcpKeepAlive);
        InetAddress inetAddress = ((InetSocketAddress)socketChannel.socket().getRemoteSocketAddress()).getAddress();
        this.addAuthInfo(new Id("ip", inetAddress.getHostAddress()));
        this.sessionTimeout = nIOServerCnxnFactory.sessionlessCnxnTimeout;
    }

    @Override
    public void sendCloseSession() {
        this.sendBuffer(ServerCnxnFactory.closeConn);
    }

    void sendBufferSync(ByteBuffer byteBuffer) {
        try {
            if (byteBuffer != ServerCnxnFactory.closeConn) {
                if (this.sock.isOpen()) {
                    this.sock.configureBlocking(true);
                    this.sock.write(byteBuffer);
                }
                this.packetSent();
            }
        }
        catch (IOException iOException) {
            LOG.d("Error sending data synchronously ", iOException);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void sendBuffer(ByteBuffer ... byteBufferArray) {
        if (LOG.b()) {
            LOG.a("Add a buffer to outgoingBuffers, sk {} is valid: {}", (Object)this.sk, (Object)this.sk.isValid());
        }
        Queue<ByteBuffer> queue = this.outgoingBuffers;
        synchronized (queue) {
            for (ByteBuffer byteBuffer : byteBufferArray) {
                this.outgoingBuffers.add(byteBuffer);
            }
            this.outgoingBuffers.add(packetSentinel);
        }
        this.requestInterestOpsUpdate();
    }

    private void handleFailedRead() throws ServerCnxn.EndOfStreamException {
        this.setStale();
        ServerMetrics.getMetrics().CONNECTION_DROP_COUNT.add(1L);
        throw new ServerCnxn.EndOfStreamException("Unable to read additional data from client, it probably closed the socket: address = " + this.sock.socket().getRemoteSocketAddress() + ", session = 0x" + Long.toHexString(this.sessionId), ServerCnxn.DisconnectReason.UNABLE_TO_READ_FROM_CLIENT);
    }

    private void readPayload() throws IOException, InterruptedException, ClientCnxnLimitException {
        int n2;
        if (this.incomingBuffer.remaining() != 0 && (n2 = this.sock.read(this.incomingBuffer)) < 0) {
            this.handleFailedRead();
        }
        if (this.incomingBuffer.remaining() == 0) {
            this.incomingBuffer.flip();
            this.packetReceived(4 + this.incomingBuffer.remaining());
            if (!this.initialized) {
                this.readConnectRequest();
            } else {
                this.readRequest();
            }
            this.lenBuffer.clear();
            this.incomingBuffer = this.lenBuffer;
        }
    }

    public boolean isSelectable() {
        return this.sk.isValid() && this.selectable.get();
    }

    public void disableSelectable() {
        this.selectable.set(false);
    }

    public void enableSelectable() {
        this.selectable.set(true);
    }

    private void requestInterestOpsUpdate() {
        if (this.isSelectable()) {
            this.selectorThread.addInterestOpsUpdateRequest(this.sk);
        }
    }

    void handleWrite(SelectionKey selectionKey) throws IOException {
        if (this.outgoingBuffers.isEmpty()) {
            return;
        }
        ByteBuffer byteBuffer = NIOServerCnxnFactory.getDirectBuffer();
        if (byteBuffer == null) {
            ByteBuffer byteBuffer2;
            ByteBuffer[] byteBufferArray = new ByteBuffer[this.outgoingBuffers.size()];
            this.sock.write(this.outgoingBuffers.toArray(byteBufferArray));
            while ((byteBuffer2 = this.outgoingBuffers.peek()) != null) {
                if (byteBuffer2 == ServerCnxnFactory.closeConn) {
                    throw new ServerCnxn.CloseRequestException("close requested", ServerCnxn.DisconnectReason.CLIENT_CLOSED_CONNECTION);
                }
                if (byteBuffer2 == packetSentinel) {
                    this.packetSent();
                }
                if (byteBuffer2.remaining() <= 0) {
                    this.outgoingBuffers.remove();
                    continue;
                }
                break;
            }
        } else {
            ByteBuffer byteBuffer32;
            byteBuffer.clear();
            for (ByteBuffer byteBuffer32 : this.outgoingBuffers) {
                if (byteBuffer.remaining() < byteBuffer32.remaining()) {
                    byteBuffer32 = (ByteBuffer)byteBuffer32.slice().limit(byteBuffer.remaining());
                }
                int n2 = byteBuffer32.position();
                byteBuffer.put(byteBuffer32);
                byteBuffer32.position(n2);
                if (byteBuffer.remaining() != 0) continue;
                break;
            }
            byteBuffer.flip();
            int n3 = this.sock.write(byteBuffer);
            while ((byteBuffer32 = this.outgoingBuffers.peek()) != null) {
                if (byteBuffer32 == ServerCnxnFactory.closeConn) {
                    throw new ServerCnxn.CloseRequestException("close requested", ServerCnxn.DisconnectReason.CLIENT_CLOSED_CONNECTION);
                }
                if (byteBuffer32 == packetSentinel) {
                    this.packetSent();
                }
                if (n3 < byteBuffer32.remaining()) {
                    byteBuffer32.position(byteBuffer32.position() + n3);
                    break;
                }
                n3 -= byteBuffer32.remaining();
                this.outgoingBuffers.remove();
            }
        }
    }

    protected boolean isSocketOpen() {
        return this.sock.isOpen();
    }

    void doIO(SelectionKey selectionKey) throws InterruptedException {
        try {
            if (!this.isSocketOpen()) {
                LOG.d("trying to do i/o on a null socket for session: 0x{}", (Object)Long.toHexString(this.sessionId));
                return;
            }
            if (selectionKey.isReadable()) {
                int n2 = this.sock.read(this.incomingBuffer);
                if (n2 < 0) {
                    try {
                        this.handleFailedRead();
                    }
                    catch (ServerCnxn.EndOfStreamException endOfStreamException) {
                        LOG.c("{}", (Object)endOfStreamException.getMessage());
                        this.close(endOfStreamException.getReason());
                        return;
                    }
                }
                if (this.incomingBuffer.remaining() == 0) {
                    boolean bl2;
                    if (this.incomingBuffer == this.lenBuffer) {
                        this.incomingBuffer.flip();
                        bl2 = this.readLength(selectionKey);
                        this.incomingBuffer.clear();
                    } else {
                        bl2 = true;
                    }
                    if (bl2) {
                        this.readPayload();
                    } else {
                        return;
                    }
                }
            }
            if (selectionKey.isWritable()) {
                this.handleWrite(selectionKey);
                if (!(this.initialized || this.getReadInterest() || this.getWriteInterest())) {
                    throw new ServerCnxn.CloseRequestException("responded to info probe", ServerCnxn.DisconnectReason.INFO_PROBE);
                }
            }
        }
        catch (CancelledKeyException cancelledKeyException) {
            LOG.d("CancelledKeyException causing close of session: 0x{}", (Object)Long.toHexString(this.sessionId));
            LOG.a("CancelledKeyException stack trace", cancelledKeyException);
            this.close(ServerCnxn.DisconnectReason.CANCELLED_KEY_EXCEPTION);
        }
        catch (ServerCnxn.CloseRequestException closeRequestException) {
            this.close();
        }
        catch (ServerCnxn.EndOfStreamException endOfStreamException) {
            LOG.c("Unexpected exception", endOfStreamException);
            this.close(endOfStreamException.getReason());
        }
        catch (ClientCnxnLimitException clientCnxnLimitException) {
            ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1L);
            LOG.d("Closing session 0x{}", (Object)Long.toHexString(this.sessionId), (Object)clientCnxnLimitException);
            this.close(ServerCnxn.DisconnectReason.CLIENT_CNX_LIMIT);
        }
        catch (IOException iOException) {
            LOG.d("Close of session 0x{}", (Object)Long.toHexString(this.sessionId), (Object)iOException);
            this.close(ServerCnxn.DisconnectReason.IO_EXCEPTION);
        }
    }

    public void readRequest() throws IOException {
        RequestHeader requestHeader = new RequestHeader();
        ByteBufferInputStream.byteBuffer2Record(this.incomingBuffer, requestHeader);
        RequestRecord requestRecord = RequestRecord.fromBytes(this.incomingBuffer.slice());
        this.zkServer.processPacket(this, requestHeader, requestRecord);
    }

    private boolean getWriteInterest() {
        return !this.outgoingBuffers.isEmpty();
    }

    private boolean getReadInterest() {
        return !this.throttled.get();
    }

    @Override
    public void disableRecv(boolean bl2) {
        if (this.throttled.compareAndSet(false, true)) {
            this.requestInterestOpsUpdate();
        }
    }

    @Override
    public void enableRecv() {
        if (this.throttled.compareAndSet(true, false)) {
            this.requestInterestOpsUpdate();
        }
    }

    private void readConnectRequest() throws IOException, ClientCnxnLimitException {
        if (!this.isZKServerRunning()) {
            throw new IOException("ZooKeeperServer not running");
        }
        hme hme2 = hme.a(new ByteBufferInputStream(this.incomingBuffer));
        ConnectRequest connectRequest = this.protocolManager.deserializeConnectRequest(hme2);
        this.zkServer.processConnectRequest(this, connectRequest);
        this.initialized = true;
    }

    private boolean checkFourLetterWord(SelectionKey selectionKey, int n2) throws IOException {
        if (!FourLetterCommands.isKnown(n2)) {
            return false;
        }
        String string = FourLetterCommands.getCommandString(n2);
        this.packetReceived(4L);
        if (selectionKey != null) {
            try {
                selectionKey.cancel();
            }
            catch (Exception exception) {
                LOG.d("Error cancelling command selection key", exception);
            }
        }
        PrintWriter printWriter = new PrintWriter(new BufferedWriter(new SendBufferWriter()));
        if (!FourLetterCommands.isEnabled(string)) {
            LOG.b("Command {} is not executed because it is not in the whitelist.", (Object)string);
            NopCommand nopCommand = new NopCommand(printWriter, this, string + " is not executed because it is not in the whitelist.");
            nopCommand.start();
            return true;
        }
        LOG.c("Processing {} command from {}", (Object)string, (Object)this.sock.socket().getRemoteSocketAddress());
        if (n2 == FourLetterCommands.setTraceMaskCmd) {
            this.incomingBuffer = ByteBuffer.allocate(8);
            int n3 = this.sock.read(this.incomingBuffer);
            if (n3 < 0) {
                throw new IOException("Read error");
            }
            this.incomingBuffer.flip();
            long l2 = this.incomingBuffer.getLong();
            ZooTrace.setTextTraceLevel(l2);
            SetTraceMaskCommand setTraceMaskCommand = new SetTraceMaskCommand(printWriter, this, l2);
            setTraceMaskCommand.start();
            return true;
        }
        CommandExecutor commandExecutor = new CommandExecutor();
        return commandExecutor.execute(this, printWriter, n2, this.zkServer, this.factory);
    }

    private boolean readLength(SelectionKey selectionKey) throws IOException {
        int n2 = this.lenBuffer.getInt();
        if (!this.initialized && this.checkFourLetterWord(this.sk, n2)) {
            return false;
        }
        if (n2 < 0 || n2 > hme.a) {
            throw new IOException("Len error. A message from " + this.getRemoteSocketAddress() + " with advertised length of " + n2 + " is either a malformed message or too large to process (length is greater than jute.maxbuffer=" + hme.a + ")");
        }
        if (!this.isZKServerRunning()) {
            throw new IOException("ZooKeeperServer not running");
        }
        this.zkServer.checkRequestSizeWhenReceivingMessage(n2);
        this.incomingBuffer = ByteBuffer.allocate(n2);
        return true;
    }

    @Override
    public int getSessionTimeout() {
        return this.sessionTimeout;
    }

    @Override
    public String toString() {
        return "ip: " + this.sock.socket().getRemoteSocketAddress() + " sessionId: 0x" + Long.toHexString(this.sessionId);
    }

    @Override
    public void close(ServerCnxn.DisconnectReason disconnectReason) {
        this.disconnectReason = disconnectReason;
        this.close();
    }

    private void close() {
        this.setStale();
        if (!this.factory.removeCnxn(this)) {
            return;
        }
        if (this.zkServer != null) {
            this.zkServer.removeCnxn(this);
        }
        if (this.sk != null) {
            try {
                this.sk.cancel();
            }
            catch (Exception exception) {
                LOG.a("ignoring exception during selectionkey cancel", exception);
            }
        }
        this.closeSock();
    }

    private void closeSock() {
        if (!this.sock.isOpen()) {
            return;
        }
        String string = String.format("Closed socket connection for client %s %s", this.sock.socket().getRemoteSocketAddress(), this.sessionId != 0L ? "which had sessionid 0x" + Long.toHexString(this.sessionId) : "(no session established for client)");
        LOG.b(string);
        NIOServerCnxn.closeSock(this.sock);
    }

    public static void closeSock(SocketChannel socketChannel) {
        if (!socketChannel.isOpen()) {
            return;
        }
        try {
            socketChannel.socket().shutdownOutput();
        }
        catch (IOException iOException) {
            LOG.a("ignoring exception during output shutdown", iOException);
        }
        try {
            socketChannel.socket().shutdownInput();
        }
        catch (IOException iOException) {
            LOG.a("ignoring exception during input shutdown", iOException);
        }
        try {
            socketChannel.socket().close();
        }
        catch (IOException iOException) {
            LOG.a("ignoring exception during socket close", iOException);
        }
        try {
            socketChannel.close();
        }
        catch (IOException iOException) {
            LOG.a("ignoring exception during socketchannel close", iOException);
        }
    }

    @Override
    public int sendResponse(ReplyHeader replyHeader, mme mme2, String string, String string2, Stat stat, int n2) {
        int n3 = 0;
        try {
            ByteBuffer[] byteBufferArray = this.serialize(replyHeader, mme2, string, string2, stat, n2);
            n3 = byteBufferArray[0].getInt();
            byteBufferArray[0].rewind();
            this.sendBuffer(byteBufferArray);
            this.decrOutstandingAndCheckThrottle(replyHeader);
        }
        catch (Exception exception) {
            LOG.c("Unexpected exception. Destruction averted.", exception);
        }
        return n3;
    }

    @Override
    public void process(WatchedEvent watchedEvent, List<ACL> list) {
        try {
            this.zkServer.checkACL(this, list, 1, this.getAuthInfo(), watchedEvent.getPath(), null);
        }
        catch (KeeperException.NoAuthException noAuthException) {
            if (LOG.b()) {
                ZooTrace.logTraceMessage(LOG, 64L, "Not delivering event " + watchedEvent + " to 0x" + Long.toHexString(this.sessionId) + " (filtered by ACL)");
            }
            return;
        }
        ReplyHeader replyHeader = new ReplyHeader(-1, watchedEvent.getZxid(), 0);
        if (LOG.b()) {
            ZooTrace.logTraceMessage(LOG, 64L, "Deliver event " + watchedEvent + " to 0x" + Long.toHexString(this.sessionId) + " through " + this);
        }
        WatcherEvent watcherEvent = watchedEvent.getWrapper();
        int n2 = this.sendResponse(replyHeader, watcherEvent, "notification", null, null, -1);
        ServerMetrics.getMetrics().WATCH_BYTES.add(n2);
    }

    @Override
    public long getSessionId() {
        return this.sessionId;
    }

    @Override
    public void setSessionId(long l2) {
        this.sessionId = l2;
        this.factory.addSession(l2, this);
    }

    @Override
    public void setSessionTimeout(int n2) {
        this.sessionTimeout = n2;
        this.factory.touchCnxn(this);
    }

    @Override
    public int getInterestOps() {
        if (!this.isSelectable()) {
            return 0;
        }
        int n2 = 0;
        if (this.getReadInterest()) {
            n2 |= 1;
        }
        if (this.getWriteInterest()) {
            n2 |= 4;
        }
        return n2;
    }

    @Override
    public InetSocketAddress getRemoteSocketAddress() {
        if (!this.sock.isOpen()) {
            return null;
        }
        return (InetSocketAddress)this.sock.socket().getRemoteSocketAddress();
    }

    public InetAddress getSocketAddress() {
        if (!this.sock.isOpen()) {
            return null;
        }
        return this.sock.socket().getInetAddress();
    }

    @Override
    protected ServerStats serverStats() {
        if (this.zkServer == null) {
            return null;
        }
        return this.zkServer.serverStats();
    }

    @Override
    public boolean isSecure() {
        return false;
    }

    @Override
    public Certificate[] getClientCertificateChain() {
        throw new UnsupportedOperationException("SSL is unsupported in NIOServerCnxn");
    }

    @Override
    public void setClientCertificateChain(Certificate[] certificateArray) {
        throw new UnsupportedOperationException("SSL is unsupported in NIOServerCnxn");
    }

    class SendBufferWriter
    extends Writer {
        private StringBuffer sb = new StringBuffer();

        private SendBufferWriter() {
        }

        private void checkFlush(boolean bl2) {
            if (bl2 && this.sb.length() > 0 || this.sb.length() > 2048) {
                NIOServerCnxn.this.sendBufferSync(ByteBuffer.wrap(this.sb.toString().getBytes(StandardCharsets.UTF_8)));
                this.sb.setLength(0);
            }
        }

        @Override
        public void close() throws IOException {
            if (this.sb == null) {
                return;
            }
            this.checkFlush(true);
            this.sb = null;
        }

        @Override
        public void flush() throws IOException {
            this.checkFlush(true);
        }

        @Override
        public void write(char[] cArray, int n2, int n3) throws IOException {
            this.sb.append(cArray, n2, n3);
            this.checkFlush(false);
        }
    }
}

