/*
 * Decompiled with CFR 0.152.
 */
package net.grinder.communication;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import net.grinder.common.UncheckedInterruptedException;
import net.grinder.communication.Acceptor;
import net.grinder.communication.CloseCommunicationMessage;
import net.grinder.communication.CommunicationException;
import net.grinder.communication.ConnectionType;
import net.grinder.communication.Message;
import net.grinder.communication.MessageQueue;
import net.grinder.communication.MessageRequiringResponse;
import net.grinder.communication.Receiver;
import net.grinder.communication.ResourcePool;
import net.grinder.communication.Sender;
import net.grinder.communication.SocketWrapper;
import net.grinder.communication.StreamSender;
import net.grinder.util.thread.InterruptibleRunnable;
import net.grinder.util.thread.ThreadPool;
import net.grinder.util.thread.ThreadSafeQueue;

public final class ServerReceiver
implements Receiver {
    private final MessageQueue m_messageQueue = new MessageQueue(true);
    private final List m_threadPools = new ArrayList();
    static /* synthetic */ Class class$net$grinder$communication$ServerReceiver;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void receiveFrom(Acceptor acceptor, ConnectionType[] connectionTypes, int numberOfThreads, final int idleThreadPollDelay) throws CommunicationException {
        if (connectionTypes.length == 0) {
            return;
        }
        final ResourcePool[] acceptedSocketSets = new ResourcePool[connectionTypes.length];
        for (int i = 0; i < connectionTypes.length; ++i) {
            acceptedSocketSets[i] = acceptor.getSocketSet(connectionTypes[i]);
        }
        ThreadPool.InterruptibleRunnableFactory runnableFactory = new ThreadPool.InterruptibleRunnableFactory(){

            public InterruptibleRunnable create() {
                return new ServerReceiverRunnable(new CombinedResourcePool(acceptedSocketSets), idleThreadPollDelay);
            }
        };
        ThreadPool threadPool = new ThreadPool("ServerReceiver (" + acceptor.getPort() + ", " + Arrays.asList(connectionTypes) + ")", numberOfThreads, runnableFactory);
        ServerReceiver serverReceiver = this;
        synchronized (serverReceiver) {
            try {
                this.m_messageQueue.checkIfShutdown();
            }
            catch (ThreadSafeQueue.ShutdownException e) {
                throw new CommunicationException("Shut down", e);
            }
            this.m_threadPools.add(threadPool);
        }
        threadPool.start();
    }

    public Message waitForMessage() throws CommunicationException {
        try {
            return this.m_messageQueue.dequeue(true);
        }
        catch (ThreadSafeQueue.ShutdownException e) {
            return null;
        }
    }

    public synchronized void shutdown() {
        this.m_messageQueue.shutdown();
        Iterator iterator = this.m_threadPools.iterator();
        while (iterator.hasNext()) {
            ((ThreadPool)iterator.next()).stop();
        }
    }

    synchronized int getActveThreadCount() {
        int result = 0;
        Iterator iterator = this.m_threadPools.iterator();
        while (iterator.hasNext()) {
            result += ((ThreadPool)iterator.next()).getThreadGroup().activeCount();
        }
        return result;
    }

    static /* synthetic */ MessageQueue access$200(ServerReceiver x0) {
        return x0.m_messageQueue;
    }

    private static final class SenderWithReservation
    implements Sender {
        private final Sender m_delegateSender;
        private final ResourcePool.Reservation m_reservation;

        private SenderWithReservation(Sender delegateSender, ResourcePool.Reservation reservation) {
            this.m_delegateSender = delegateSender;
            this.m_reservation = reservation;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void send(Message message) throws CommunicationException {
            try {
                this.m_delegateSender.send(message);
            }
            finally {
                this.shutdown();
            }
        }

        public void shutdown() {
            this.m_reservation.free();
        }
    }

    private final class ServerReceiverRunnable
    implements InterruptibleRunnable {
        private final CombinedResourcePool m_sockets;
        private final int m_delay;

        private ServerReceiverRunnable(CombinedResourcePool sockets, int delay) {
            this.m_sockets = sockets;
            this.m_delay = delay;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Unable to fully structure code
         */
        public void interruptibleRun() {
            try {
                try {
                    idle = false;
                    while (true) lbl-1000:
                    // 9 sources

                    {
                        reservation = this.m_sockets.reserveNext();
                        holdReservation = false;
                        try {
                            if (reservation.isSentinel()) {
                                if (idle) {
                                    Thread.sleep(this.m_delay);
                                }
                                idle = true;
                            }
                            socketWrapper = (SocketWrapper)reservation.getResource();
                            inputStream = socketWrapper.getInputStream();
                            if (inputStream.available() <= 0) ** GOTO lbl-1000
                            idle = false;
                            objectStream = new ObjectInputStream(inputStream);
                            message = (Message)objectStream.readObject();
                            if (message instanceof CloseCommunicationMessage) {
                                reservation.close();
                            }
                            if (message instanceof MessageRequiringResponse) {
                                messageRequiringResponse = (MessageRequiringResponse)message;
                                messageRequiringResponse.setResponder(new SenderWithReservation(new StreamSender(socketWrapper.getOutputStream()), reservation));
                                ServerReceiver.access$200(ServerReceiver.this).queue(message);
                                holdReservation = true;
                            }
                            ServerReceiver.access$200(ServerReceiver.this).queue(message);
                        }
                        catch (IOException e) {
                            reservation.close();
                            UncheckedInterruptedException.ioException(e);
                            ServerReceiver.access$200(ServerReceiver.this).queue(e);
                        }
                        catch (ClassNotFoundException e) {
                            reservation.close();
                            ServerReceiver.access$200(ServerReceiver.this).queue(e);
                        }
                        catch (InterruptedException e) {
                            reservation.close();
                            throw new UncheckedInterruptedException(e);
                        }
                        finally {
                            if (holdReservation) continue;
                            reservation.free();
                            continue;
                        }
                        break;
                    }
                }
                catch (ThreadSafeQueue.ShutdownException var1_2) {
                    ServerReceiver.this.shutdown();
                }
                ** GOTO lbl-1000
            }
            catch (Throwable var10_14) {
                ServerReceiver.this.shutdown();
                throw var10_14;
            }
        }
    }

    private static final class CombinedResourcePool {
        private final ResourcePool[] m_resourcePools;
        private int m_next;
        static final /* synthetic */ boolean $assertionsDisabled;

        CombinedResourcePool(ResourcePool[] resourcePools) {
            if (!$assertionsDisabled && resourcePools.length <= 0) {
                throw new AssertionError();
            }
            this.m_resourcePools = resourcePools;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * WARNING - void declaration
         */
        public ResourcePool.Reservation reserveNext() {
            int i = 0;
            ResourcePool[] resourcePoolArray = this.m_resourcePools;
            synchronized (this.m_resourcePools) {
                int start = ++this.m_next;
                // ** MonitorExit[var3_2] (shouldn't be in output)
                void var2_3;
                ResourcePool.Reservation reservation;
                while ((reservation = this.m_resourcePools[(var2_3 + i) % this.m_resourcePools.length].reserveNext()).isSentinel() && i != this.m_resourcePools.length - 1) {
                    ++i;
                }
                return reservation;
            }
        }

        static {
            $assertionsDisabled = !(class$net$grinder$communication$ServerReceiver == null ? (class$net$grinder$communication$ServerReceiver = ServerReceiver.class$("net.grinder.communication.ServerReceiver")) : class$net$grinder$communication$ServerReceiver).desiredAssertionStatus();
        }
    }
}

