Changeset 106 for branches/supervisor


Ignore:
Timestamp:
10/18/13 16:09:36 (11 years ago)
Author:
stoda
Message:

abans que la segueixi liant...

Location:
branches/supervisor
Files:
1 added
3 deleted
7 edited

Legend:

Unmodified
Added
Removed
  • branches/supervisor/src/main/java/omq/common/broker/Broker.java

    r99 r106  
    240240        }
    241241
     242        public void bind(String reference, String UID, RemoteObject remote) throws RemoteException, AlreadyBoundException {
     243                bind(reference, UID, remote, environment);
     244        }
     245
    242246        /**
    243247         * Binds the reference to the specified remote object. This function uses
     
    264268                try {
    265269                        remote.startRemoteObject(reference, this, env);
     270                        remoteObjs.put(reference, remote);
     271                } catch (Exception e) {
     272                        throw new RemoteException(e);
     273                }
     274        }
     275
     276        public void bind(String reference, String UID, RemoteObject remote, Properties env) throws RemoteException, AlreadyBoundException {
     277                if (remoteObjs.containsKey(reference)) {
     278                        throw new AlreadyBoundException(reference);
     279                }
     280                // Try to start the remtoeObject listeners
     281                try {
     282                        remote.startRemoteObject(reference, UID, this, env);
    266283                        remoteObjs.put(reference, remote);
    267284                } catch (Exception e) {
     
    400417         * Supervisor
    401418         */
    402         public void setSupervisor(String supervisorName, String brokerName) throws Exception {
     419        public void setSupervisor(String supervisorName, String brokerSet, String brokerName) throws Exception {
    403420                // Create a RemoteBrokerImpl
    404                 bind(brokerName, new RemoteBrokerImpl());
     421                bind(brokerSet, brokerName, new RemoteBrokerImpl());
    405422                // Subscribe broker
    406423                supervisor = lookup(supervisorName, Supervisor.class);
    407                 supervisor.subscribe(brokerName);
    408                 logger.info("Supervisor set: " + supervisorName + ", BrokerName: " + brokerName);
     424                supervisor.subscribe(brokerSet, brokerName);
     425                logger.info("Supervisor set: " + supervisorName + ", BrokerSet: " + brokerSet + ", BrokerName: " + brokerName);
    409426        }
    410427
  • branches/supervisor/src/main/java/omq/server/InvocationThread.java

    r105 r106  
    3636        private RemoteObject obj;
    3737        private String reference;
     38        private String UID;
    3839        private Properties env;
    3940        private boolean idle;
     
    5455        public InvocationThread(RemoteObject obj) throws Exception {
    5556                this.obj = obj;
     57                this.UID = obj.getUID();
    5658                this.reference = obj.getRef();
    5759                this.env = obj.getEnv();
     
    8789                                String serializerType = delivery.getProperties().getType();
    8890
    89                                 // Deserialize the json
     91                                // Deserialize the request
    9092                                Request request = serializer.deserializeRequest(serializerType, delivery.getBody(), obj);
    9193                                String methodName = request.getMethod();
     
    196198
    197199                /*
     200                 * UID queue
     201                 */
     202
     203                if (UID != null) {
     204
     205                        boolean uidDurable = false;
     206                        boolean uidExclusive = true;
     207                        boolean uidAutoDelete = true;
     208
     209                        channel.queueDeclare(UID, uidDurable, uidExclusive, uidAutoDelete, null);
     210                        if (!exchange.equalsIgnoreCase("")) { // Default exchange case
     211                                channel.queueBind(UID, exchange, UID);
     212                        }
     213                }
     214
     215                /*
    198216                 * Multi queue, exclusive per each instance
    199217                 */
     
    233251                channel.basicConsume(queue, autoAck, consumer);
    234252                channel.basicConsume(multiQueue, autoAck, consumer);
     253                if (UID != null) {
     254                        channel.basicConsume(UID, autoAck, consumer);
     255                }
    235256        }
    236257
  • branches/supervisor/src/main/java/omq/server/RemoteObject.java

    r105 r106  
    3131
    3232        private String reference;
     33        private String UID;
    3334        private Properties env;
    3435        private transient Broker broker;
     
    8384                pool = new RemoteThreadPool(minPoolThreads, maxPoolThreads, refresh, keepAliveTime, maxMessagesPerThread, this, broker);
    8485                pool.start();
     86        }
     87
     88        public void startRemoteObject(String reference, String UID, Broker broker, Properties env) throws Exception {
     89                this.UID = UID;
     90                startRemoteObject(reference, broker, env);
    8591        }
    8692
     
    209215        }
    210216
     217        public String getUID() {
     218                return UID;
     219        }
     220
     221        public void setUID(String uID) {
     222                UID = uID;
     223        }
     224
    211225}
  • branches/supervisor/src/main/java/omq/supervisor/Supervisor.java

    r103 r106  
    22
    33import omq.Remote;
     4import omq.common.broker.HasObject;
    45
    56public interface Supervisor extends Remote {
    67
    7         public void subscribe(String brokerName) throws Exception;
     8        public void subscribe(String brokerSet, String brokerName) throws Exception;
    89
    9         public void spawnObject(OmqSettings settings, int numObjects) throws Exception;
     10        public void spawnObject(OmqSettings settings) throws Exception;
    1011
    11         public void unbindObject(OmqSettings settings, int numObjects) throws Exception;
     12        public void spawnObject(OmqSettings settings, HasObject[] hasList, int numObjects) throws Exception;
     13
     14        public void unbindObject(OmqSettings settings, HasObject[] hasList, int numObjects) throws Exception;
    1215
    1316}
  • branches/supervisor/src/main/java/omq/supervisor/SupervisorImpl.java

    r103 r106  
    55import java.util.List;
    66import java.util.Map;
     7import java.util.Set;
    78
     9import omq.common.broker.HasObject;
    810import omq.common.broker.RemoteBroker;
     11import omq.common.broker.RemoteMultiBroker;
     12import omq.exception.RemoteException;
    913import omq.server.RemoteObject;
    1014
    1115import org.apache.log4j.Logger;
    1216
    13 public class SupervisorImpl extends RemoteObject implements Supervisor {
     17import com.rabbitmq.client.Channel;
     18import com.rabbitmq.client.AMQP.Queue.DeclareOk;
     19
     20public class SupervisorImpl extends RemoteObject implements Supervisor, Runnable {
    1421
    1522        /**
     
    1825        private static final long serialVersionUID = 1L;
    1926        private static final Logger logger = Logger.getLogger(SupervisorImpl.class.getName());
    20         private SupervisorThread thread;
     27
     28        private String brokerSet;
     29        private long sleep;
    2130        private Map<String, OmqSettings> objectSettings;
    2231        // TODO: Set<?>
     32        private RemoteMultiBroker multiBroker;
     33        private Map<String, RemoteBroker> brokerMap;
    2334        private List<RemoteBroker> brokers;
    2435
    25         public SupervisorImpl(long sleep) {
     36        public SupervisorImpl(String brokerSet, long sleep) {
     37                this.brokerSet = brokerSet;
     38                this.sleep = sleep;
    2639                brokers = new ArrayList<RemoteBroker>();
    2740                objectSettings = new HashMap<String, OmqSettings>();
    28                 thread = new SupervisorThread(this, sleep);
    29                 thread.start();
    3041        }
    3142
    3243        @Override
    33         public void subscribe(String brokerName) throws Exception {
    34                 logger.info("Broker " + brokerName + " subscrived");
    35                 RemoteBroker broker = getBroker().lookup(brokerName, RemoteBroker.class);
    36                 brokers.add(broker);
     44        public void run() {
     45                try {
     46                        multiBroker = getBroker().lookup(brokerSet, RemoteMultiBroker.class);
     47                        while (true) {
     48                                try {
     49                                        Set<String> keys = objectSettings.keySet();
     50                                        for (String reference : keys) {
     51                                                System.out.println("key = " + keys);
     52                                                checkObject(reference);
     53                                        }
     54                                } catch (Exception e) {
     55                                        e.printStackTrace();
     56                                }
     57
     58                                try {
     59                                        Thread.sleep(sleep);
     60                                } catch (InterruptedException e) {
     61                                        e.printStackTrace();
     62                                }
     63                        }
     64                } catch (RemoteException e1) {
     65                        // TODO Auto-generated catch block
     66                        e1.printStackTrace();
     67                }
    3768        }
    3869
    3970        @Override
    40         public void spawnObject(OmqSettings settings, int numObjects) throws Exception {
     71        public void subscribe(String brokerSet, String brokerName) throws Exception {
     72                if (brokerSet.equals(brokerSet) && !brokerMap.containsKey(brokerName)) {
     73                        logger.info("Broker " + brokerName + " subscrived");
     74                        RemoteBroker broker = getBroker().lookup(brokerName, RemoteBroker.class);
     75                        brokerMap.put(brokerSet, broker);
     76                } else {
     77                        throw new Exception("blablabla");
     78                }
     79        }
     80
     81        @Override
     82        public void spawnObject(OmqSettings settings) throws Exception {
     83                String reference = settings.getReference();
     84
     85                if (objectSettings.containsKey(reference)) {
     86                        throw new Exception("JAJAJAJAJA");
     87                }
     88                objectSettings.put(reference, settings);
     89        }
     90
     91        @Override
     92        public void spawnObject(OmqSettings settings, HasObject[] hasList, int numObjects) throws Exception {
    4193
    4294                String reference = settings.getReference();
     
    48100                int minObjects = settings.getMinNumberObjects();
    49101
    50                 for (RemoteBroker broker : brokers) {
    51                         if (!broker.hasObject(reference) && minObjects >= numObjects) {
    52                                 broker.spawnObject(reference, settings.getClassName(), settings.getProps());
     102                for (HasObject h : hasList) {
     103                        if (h.hasObject() && minObjects >= numObjects) {
     104                                brokerMap.get(h.getBrokerName()).spawnObject(reference, settings.getClassName(), settings.getProps());
    53105                                numObjects++;
    54106                                if (minObjects >= numObjects) {
     
    61113
    62114        @Override
    63         public void unbindObject(OmqSettings settings, int numObjects) throws Exception {
     115        public void unbindObject(OmqSettings settings, HasObject[] hasList, int numObjects) throws Exception {
    64116                String reference = settings.getReference();
    65117
     
    75127        }
    76128
    77         public SupervisorThread getThread() {
    78                 return thread;
    79         }
     129        private void checkObject(String reference) throws Exception {
     130                OmqSettings settings = objectSettings.get(reference);
    80131
    81         public void setThread(SupervisorThread thread) {
    82                 this.thread = thread;
     132                int minObjects = settings.getMinNumberObjects();
     133                int maxMessages = settings.getMaxNumQueued();
     134                int minMessages = settings.getMinNumQueued();
     135
     136                Channel channel = getBroker().getChannel();
     137                DeclareOk dok = channel.queueDeclarePassive(reference);
     138
     139                int numObjects = 0;
     140                int numMessages = dok.getMessageCount();
     141
     142                HasObject[] hasList = multiBroker.hasObjectInfo(reference);
     143                for (HasObject h : hasList) {
     144                        if (h.hasObject()) {
     145                                numObjects++;
     146                        }
     147                }
     148
     149                System.out.println("Num Consumers: " + numObjects + ", num Messages: " + numMessages);
     150
     151                if (maxMessages < numMessages || numObjects < minObjects) {
     152                        logger.info("SPAWN TIME!!");
     153                        spawnObject(settings, hasList, numObjects);
     154                } else if (numMessages < minMessages && minObjects < numObjects) {
     155                        logger.info("Unbinding object!!!");
     156                        unbindObject(settings, hasList, numObjects);
     157                }
    83158        }
    84159
     
    99174        }
    100175
     176        public String getBrokerSet() {
     177                return brokerSet;
     178        }
     179
     180        public void setBrokerSet(String brokerSet) {
     181                this.brokerSet = brokerSet;
     182        }
     183
     184        public RemoteMultiBroker getMultiBroker() {
     185                return multiBroker;
     186        }
     187
     188        public void setMultiBroker(RemoteMultiBroker multiBroker) {
     189                this.multiBroker = multiBroker;
     190        }
     191
    101192}
  • branches/supervisor/src/main/resources/log4j.xml

    r86 r106  
    44<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
    55    <appender name="consoleAppender" class="org.apache.log4j.ConsoleAppender">
    6         <param name="Threshold" value="INFO" />
     6        <param name="Threshold" value="DEBUG" />
    77        <layout class="org.apache.log4j.PatternLayout">
    88            <param name="ConversionPattern" value="%d{[yyyy-MM-dd HH:mm:ss]} %-5p %c:%L - %m%n" />
     
    3030
    3131  <root>
    32     <priority value ="INFO" />
     32    <priority value ="DEBUG" />
    3333    <appender-ref ref="consoleAppender" />
    3434    <appender-ref ref="A2" /> 
  • branches/supervisor/src/test/java/omq/test/supervisor/SleepTest.java

    r103 r106  
    1313
    1414public class SleepTest {
     15        private static String brokerSet = "broker";
    1516
    1617        @BeforeClass
     
    4243
    4344                // Set supervisor
    44                 SupervisorImpl supervisor = new SupervisorImpl(500);
     45                SupervisorImpl supervisor = new SupervisorImpl(brokerSet, 500);
    4546                broker.bind("supervisor", supervisor);
     47                Thread t = new Thread(supervisor);
     48                t.start();
    4649
    47                 broker.setSupervisor("supervisor", "b1");
    48                 broker2.setSupervisor("supervisor", "b2");
     50                broker.setSupervisor("supervisor", brokerSet, "b1");
     51                broker2.setSupervisor("supervisor", brokerSet, "b2");
    4952        }
    5053
     
    6366
    6467                OmqSettings settings = new OmqSettings("sleep", SleepImpl.class.getName(), env, 1, 20, 20);
    65                 supervisor.spawnObject(settings, 0);
     68                supervisor.spawnObject(settings);
    6669
    6770                Sleep sleep = broker.lookup("sleep", Sleep.class);
Note: See TracChangeset for help on using the changeset viewer.