/*
 * Decompiled with CFR 0.152.
 */
package XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum;

import XcoreXdatabricksX240X9088.foe;
import XcoreXdatabricksX240X9088.goe;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.data.Id;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.jmx.MBeanRegistry;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.Request;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.ZKDatabase;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.FollowerZooKeeperServer;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.Leader;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.Learner;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.LearnerHandler;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.LearnerHandlerBean;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.LearnerMaster;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.QuorumPacket;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.QuorumPeer;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.StateSummary;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.UnifiedServerSocket;
import XdepsXdatabricksX240X9088.org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class ObserverMaster
extends LearnerMaster
implements Runnable {
    private static final foe LOG = goe.a(ObserverMaster.class);
    private final AtomicLong followerCounter = new AtomicLong(-1L);
    private QuorumPeer self;
    private FollowerZooKeeperServer zks;
    private int port;
    private Set<LearnerHandler> activeObservers = Collections.newSetFromMap(new ConcurrentHashMap());
    private final ConcurrentHashMap<LearnerHandler, LearnerHandlerBean> connectionBeans = new ConcurrentHashMap();
    private static final int PKTS_SIZE_LIMIT = 0x2000000;
    private static volatile int pktsSizeLimit = Integer.getInteger("zookeeper.observerMaster.sizeLimit", 0x2000000);
    private ConcurrentLinkedQueue<QuorumPacket> proposedPkts = new ConcurrentLinkedQueue();
    private ConcurrentLinkedQueue<QuorumPacket> committedPkts = new ConcurrentLinkedQueue();
    private int pktsSize = 0;
    private long lastProposedZxid;
    private final Object revalidateSessionLock = new Object();
    private final ConcurrentLinkedQueue<Revalidation> pendingRevalidations = new ConcurrentLinkedQueue();
    private Thread thread;
    private ServerSocket ss;
    private boolean listenerRunning;
    private ScheduledExecutorService pinger;
    Runnable ping = new Runnable(){

        @Override
        public void run() {
            for (LearnerHandler learnerHandler : ObserverMaster.this.activeObservers) {
                learnerHandler.ping();
            }
        }
    };

    ObserverMaster(QuorumPeer quorumPeer, FollowerZooKeeperServer followerZooKeeperServer, int n2) {
        this.self = quorumPeer;
        this.zks = followerZooKeeperServer;
        this.port = n2;
    }

    @Override
    public void addLearnerHandler(LearnerHandler learnerHandler) {
        if (!this.listenerRunning) {
            throw new RuntimeException("ObserverMaster is not running");
        }
    }

    @Override
    public void removeLearnerHandler(LearnerHandler learnerHandler) {
        this.activeObservers.remove(learnerHandler);
    }

    @Override
    public int syncTimeout() {
        return this.self.getSyncLimit() * this.self.getTickTime();
    }

    @Override
    public int getTickOfNextAckDeadline() {
        return this.self.tick.get() + this.self.syncLimit;
    }

    @Override
    public int getTickOfInitialAckDeadline() {
        return this.self.tick.get() + this.self.initLimit + this.self.syncLimit;
    }

    @Override
    public long getAndDecrementFollowerCounter() {
        return this.followerCounter.getAndDecrement();
    }

    @Override
    public void waitForEpochAck(long l2, StateSummary stateSummary) throws IOException, InterruptedException {
    }

    @Override
    public void waitForStartup() throws InterruptedException {
    }

    @Override
    public synchronized long getLastProposed() {
        return this.lastProposedZxid;
    }

    @Override
    public long getEpochToPropose(long l2, long l3) throws InterruptedException, IOException {
        return this.self.getCurrentEpoch();
    }

    @Override
    public ZKDatabase getZKDatabase() {
        return this.zks.getZKDatabase();
    }

    @Override
    public void waitForNewLeaderAck(long l2, long l3) throws InterruptedException {
    }

    @Override
    public int getCurrentTick() {
        return this.self.tick.get();
    }

    @Override
    public void processAck(long l2, long l3, SocketAddress socketAddress) {
        if ((l3 & 0xFFFFFFFFL) == 0L) {
            return;
        }
        throw new RuntimeException("Observers shouldn't send ACKS ack = " + Long.toHexString(l3));
    }

    @Override
    public void touch(long l2, int n2) {
        this.zks.getSessionTracker().touchSession(l2, n2);
    }

    boolean revalidateLearnerSession(QuorumPacket quorumPacket) throws IOException {
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(quorumPacket.getData());
        DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream);
        long l2 = dataInputStream.readLong();
        boolean bl2 = dataInputStream.readBoolean();
        Iterator<Revalidation> iterator = this.pendingRevalidations.iterator();
        if (!iterator.hasNext()) {
            return false;
        }
        Revalidation revalidation = iterator.next();
        if (revalidation.sessionId != l2) {
            return false;
        }
        iterator.remove();
        LearnerHandler learnerHandler = revalidation.handler;
        QuorumPacket quorumPacket2 = new QuorumPacket(quorumPacket.getType(), quorumPacket.getZxid(), Arrays.copyOf(quorumPacket.getData(), quorumPacket.getData().length), (List<Id>)(quorumPacket.getAuthinfo() == null ? null : new ArrayList<Id>(quorumPacket.getAuthinfo())));
        learnerHandler.queuePacket(quorumPacket2);
        if (bl2) {
            this.touch(revalidation.sessionId, revalidation.timeout);
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void revalidateSession(QuorumPacket quorumPacket, LearnerHandler learnerHandler) throws IOException {
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(quorumPacket.getData());
        DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream);
        long l2 = dataInputStream.readLong();
        int n2 = dataInputStream.readInt();
        Object object = this.revalidateSessionLock;
        synchronized (object) {
            this.pendingRevalidations.add(new Revalidation(l2, n2, learnerHandler));
            Learner learner = this.zks.getLearner();
            if (learner != null) {
                learner.writePacket(quorumPacket, true);
            }
        }
    }

    @Override
    public void submitLearnerRequest(Request request) {
        this.zks.processObserverRequest(request);
    }

    @Override
    public synchronized long startForwarding(LearnerHandler learnerHandler, long l2) {
        Iterator<QuorumPacket> iterator = this.committedPkts.iterator();
        if (iterator.hasNext()) {
            QuorumPacket quorumPacket = iterator.next();
            if (quorumPacket.getZxid() > l2 + 1L) {
                LOG.e("LearnerHandler is too far behind (0x{} < 0x{}), disconnecting {} at {}", Long.toHexString(l2 + 1L), Long.toHexString(quorumPacket.getZxid()), learnerHandler.getSid(), learnerHandler.getRemoteAddress());
                learnerHandler.shutdown();
                return -1L;
            }
            if (quorumPacket.getZxid() == l2 + 1L) {
                learnerHandler.queuePacket(quorumPacket);
            }
            long l3 = quorumPacket.getZxid();
            long l4 = LearnerHandler.packetSize(quorumPacket);
            while (iterator.hasNext()) {
                quorumPacket = iterator.next();
                if (quorumPacket.getZxid() <= l2) continue;
                learnerHandler.queuePacket(quorumPacket);
                l4 += LearnerHandler.packetSize(quorumPacket);
            }
            LOG.c("finished syncing observer from retained commit queue: sid {}, queue head 0x{}, queue tail 0x{}, sync position 0x{}, num packets used {}, num bytes used {}", learnerHandler.getSid(), Long.toHexString(l3), Long.toHexString(quorumPacket.getZxid()), Long.toHexString(l2), quorumPacket.getZxid() - l2, l4);
        }
        this.activeObservers.add(learnerHandler);
        return this.lastProposedZxid;
    }

    @Override
    public long getQuorumVerifierVersion() {
        return this.self.getQuorumVerifier().getVersion();
    }

    @Override
    public String getPeerInfo(long l2) {
        QuorumPeer.QuorumServer quorumServer = this.self.getView().get(l2);
        return quorumServer == null ? "" : quorumServer.toString();
    }

    @Override
    public byte[] getQuorumVerifierBytes() {
        return this.self.getLastSeenQuorumVerifier().toString().getBytes(StandardCharsets.UTF_8);
    }

    @Override
    public QuorumAuthServer getQuorumAuthServer() {
        return this.self == null ? null : this.self.authServer;
    }

    void proposalReceived(QuorumPacket quorumPacket) {
        this.proposedPkts.add(new QuorumPacket(8, quorumPacket.getZxid(), quorumPacket.getData(), null));
    }

    private synchronized QuorumPacket removeProposedPacket(long l2) {
        QuorumPacket quorumPacket = this.proposedPkts.peek();
        if (quorumPacket == null || quorumPacket.getZxid() > l2) {
            LOG.b("ignore missing proposal packet for {}", (Object)Long.toHexString(l2));
            return null;
        }
        if (quorumPacket.getZxid() != l2) {
            String string = String.format("Unexpected proposal packet on commit ack, expected zxid 0x%d got zxid 0x%d", l2, quorumPacket.getZxid());
            LOG.e(string);
            throw new RuntimeException(string);
        }
        this.proposedPkts.remove();
        return quorumPacket;
    }

    private synchronized void cacheCommittedPacket(QuorumPacket quorumPacket) {
        this.committedPkts.add(quorumPacket);
        this.pktsSize = (int)((long)this.pktsSize + LearnerHandler.packetSize(quorumPacket));
        for (int i2 = 0; (double)this.pktsSize > (double)pktsSizeLimit * 0.8 && i2 < 5; ++i2) {
            QuorumPacket quorumPacket2 = this.committedPkts.poll();
            if (quorumPacket2 == null) {
                this.pktsSize = 0;
                break;
            }
            this.pktsSize = (int)((long)this.pktsSize - LearnerHandler.packetSize(quorumPacket2));
        }
        while (this.pktsSize > pktsSizeLimit) {
            QuorumPacket quorumPacket3 = this.committedPkts.poll();
            if (quorumPacket3 == null) {
                this.pktsSize = 0;
                break;
            }
            this.pktsSize = (int)((long)this.pktsSize - LearnerHandler.packetSize(quorumPacket3));
        }
    }

    private synchronized void sendPacket(QuorumPacket quorumPacket) {
        for (LearnerHandler learnerHandler : this.activeObservers) {
            learnerHandler.queuePacket(quorumPacket);
        }
        this.lastProposedZxid = quorumPacket.getZxid();
    }

    synchronized void proposalCommitted(long l2) {
        QuorumPacket quorumPacket = this.removeProposedPacket(l2);
        if (quorumPacket == null) {
            return;
        }
        this.cacheCommittedPacket(quorumPacket);
        this.sendPacket(quorumPacket);
    }

    synchronized void informAndActivate(long l2, long l3) {
        QuorumPacket quorumPacket = this.removeProposedPacket(l2);
        if (quorumPacket == null) {
            return;
        }
        QuorumPacket quorumPacket2 = Leader.buildInformAndActivePacket(l2, l3, quorumPacket.getData());
        this.cacheCommittedPacket(quorumPacket2);
        this.sendPacket(quorumPacket2);
    }

    public synchronized void start() throws IOException {
        if (this.thread != null && this.thread.isAlive()) {
            return;
        }
        this.listenerRunning = true;
        int n2 = 10;
        InetAddress inetAddress = this.self.getQuorumAddress().getReachableOrOne().getAddress();
        if (this.self.shouldUsePortUnification() || this.self.isSslQuorum()) {
            boolean bl2 = this.self.shouldUsePortUnification();
            this.ss = this.self.getQuorumListenOnAllIPs() ? new UnifiedServerSocket(this.self.getX509Util(), bl2, this.port, n2) : new UnifiedServerSocket(this.self.getX509Util(), bl2, this.port, n2, inetAddress);
        } else {
            this.ss = this.self.getQuorumListenOnAllIPs() ? new ServerSocket(this.port, n2) : new ServerSocket(this.port, n2, inetAddress);
        }
        this.thread = new Thread((Runnable)this, "ObserverMaster");
        this.thread.start();
        this.pinger = Executors.newSingleThreadScheduledExecutor();
        this.pinger.scheduleAtFixedRate(this.ping, this.self.tickTime / 2, this.self.tickTime / 2, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        ServerSocket serverSocket;
        Object object = this;
        synchronized (object) {
            serverSocket = this.ss;
        }
        while (this.listenerRunning) {
            try {
                object = serverSocket.accept();
                ((Socket)object).setSoTimeout(this.self.tickTime * this.self.initLimit);
                BufferedInputStream bufferedInputStream = new BufferedInputStream(((Socket)object).getInputStream());
                LearnerHandler learnerHandler = new LearnerHandler((Socket)object, bufferedInputStream, this);
                learnerHandler.start();
            }
            catch (Exception exception) {
                if (this.listenerRunning) {
                    LOG.a("Ignoring accept exception (maybe shutting down)", exception);
                    continue;
                }
                LOG.a("Ignoring accept exception (maybe client closed)", exception);
            }
        }
    }

    public synchronized void stop() {
        this.listenerRunning = false;
        if (this.pinger != null) {
            this.pinger.shutdownNow();
        }
        if (this.ss != null) {
            try {
                this.ss.close();
            }
            catch (IOException iOException) {
                iOException.printStackTrace();
            }
        }
        for (LearnerHandler learnerHandler : this.activeObservers) {
            learnerHandler.shutdown();
        }
    }

    int getNumActiveObservers() {
        return this.activeObservers.size();
    }

    public Iterable<Map<String, Object>> getActiveObservers() {
        HashSet<Map<String, Object>> hashSet = new HashSet<Map<String, Object>>();
        for (LearnerHandler learnerHandler : this.activeObservers) {
            hashSet.add(learnerHandler.getLearnerHandlerInfo());
        }
        return hashSet;
    }

    public void resetObserverConnectionStats() {
        for (LearnerHandler learnerHandler : this.activeObservers) {
            learnerHandler.resetObserverConnectionStats();
        }
    }

    int getPktsSizeLimit() {
        return pktsSizeLimit;
    }

    static void setPktsSizeLimit(int n2) {
        pktsSizeLimit = n2;
    }

    @Override
    public void registerLearnerHandlerBean(LearnerHandler learnerHandler, Socket socket) {
        LearnerHandlerBean learnerHandlerBean = new LearnerHandlerBean(learnerHandler, socket);
        if (this.zks.registerJMX(learnerHandlerBean)) {
            this.connectionBeans.put(learnerHandler, learnerHandlerBean);
        }
    }

    @Override
    public void unregisterLearnerHandlerBean(LearnerHandler learnerHandler) {
        LearnerHandlerBean learnerHandlerBean = this.connectionBeans.remove(learnerHandler);
        if (learnerHandlerBean != null) {
            MBeanRegistry.getInstance().unregister(learnerHandlerBean);
        }
    }

    static class Revalidation {
        public final long sessionId;
        public final int timeout;
        public final LearnerHandler handler;

        Revalidation(Long l2, int n2, LearnerHandler learnerHandler) {
            this.sessionId = l2;
            this.timeout = n2;
            this.handler = learnerHandler;
        }

        public boolean equals(Object object) {
            if (this == object) {
                return true;
            }
            if (object == null || this.getClass() != object.getClass()) {
                return false;
            }
            Revalidation revalidation = (Revalidation)object;
            return this.sessionId == revalidation.sessionId && this.timeout == revalidation.timeout && this.handler.equals(revalidation.handler);
        }

        public int hashCode() {
            int n2 = (int)(this.sessionId ^ this.sessionId >>> 32);
            n2 = 31 * n2 + this.timeout;
            n2 = 31 * n2 + this.handler.hashCode();
            return n2;
        }
    }
}

