Changeset 92


Ignore:
Timestamp:
10/01/13 12:02:41 (11 years ago)
Author:
stoda
Message:

TODO: delete in supervisor
check the code

Location:
branches/supervisor
Files:
7 added
2 deleted
8 edited

Legend:

Unmodified
Added
Removed
  • branches/supervisor/src/main/java/omq/client/proxy/Proxymq.java

    r84 r92  
    116116                        if (methodName.equals("getRef")) {
    117117                                return getRef();
     118                        }
     119                        if (methodName.equals("equals")) {
     120                                if (arguments[0] instanceof Remote) {
     121                                        return getRef().equals(((Remote) arguments[0]).getRef());
     122                                } else {
     123                                        return false;
     124                                }
    118125                        }
    119126                }
  • branches/supervisor/src/main/java/omq/common/broker/Broker.java

    r91 r92  
    2020import omq.exception.RemoteException;
    2121import omq.server.RemoteObject;
     22import omq.supervisor.Supervisor;
    2223
    2324import org.apache.log4j.Logger;
     
    4950        private boolean connectionClosed = false;
    5051        private Properties environment = null;
    51         private RemoteBrokerImpl remoteBrokerImpl;
    5252        private Map<String, RemoteObject> remoteObjs;
    5353        private Map<String, Object> proxies = new Hashtable<String, Object>();
    5454        private Map<String, Object> multiProxies = new Hashtable<String, Object>();
     55
     56        // Supervisor
     57        private Supervisor supervisor;
    5558
    5659        public Broker(Properties env) throws Exception {
     
    358361        }
    359362
    360         public void setSupervisor(String brokerSet, String brokerName) throws Exception {
    361                 remoteBrokerImpl = new RemoteBrokerImpl();
    362                 remoteBrokerImpl.startRemoteBroker(brokerSet, brokerName, this, getEnvironment());
    363         }
    364 
    365363        public Properties getEnvironment() {
    366364                return environment;
     
    379377        }
    380378
     379        /*
     380         * Supervisor
     381         */
     382        public void setSupervisor(String supervisorName, String brokerName) throws Exception {
     383                // Create a RemoteBrokerImpl
     384                bind(brokerName, new RemoteBrokerImpl());
     385                // Subscribe broker
     386                supervisor = lookup(supervisorName, Supervisor.class);
     387                supervisor.subscribe(brokerName);
     388                logger.info("Supervisor set: " + supervisorName + ", BrokerName: " + brokerName);
     389        }
     390
     391        public Supervisor getSupervisor() {
     392                return supervisor;
     393        }
    381394}
  • branches/supervisor/src/main/java/omq/common/broker/RemoteBroker.java

    r91 r92  
    22
    33import java.io.IOException;
     4import java.util.Properties;
    45import java.util.Set;
    56
    67import omq.Remote;
     8import omq.client.annotation.SyncMethod;
    79import omq.exception.RemoteException;
    810
     
    1012        public Set<String> getRemoteObjects();
    1113
    12         public void spawnObject(String reference, String className, Class<?> parameterTypes, Object... args) throws Exception;
     14        public void spawnObject(String reference, String className, Properties env) throws Exception;
     15
     16        public void spawnObject(String reference, String className) throws Exception;
    1317
    1418        public void deleteObject(String reference) throws RemoteException, IOException;
    1519
    16         public HasObject hasObject(String reference);
     20        @SyncMethod(retry = 1, timeout = 1000)
     21        public boolean hasObject(String reference);
    1722
    1823}
  • branches/supervisor/src/main/java/omq/common/broker/RemoteBrokerImpl.java

    r91 r92  
    22
    33import java.io.IOException;
    4 import java.lang.reflect.Method;
    5 import java.util.ArrayList;
    6 import java.util.HashMap;
    7 import java.util.List;
    84import java.util.Properties;
    95import java.util.Set;
    106
    11 import omq.common.util.ParameterQueue;
    127import omq.exception.RemoteException;
    138import omq.server.RemoteObject;
    14 import omq.server.RemoteWrapper;
    15 
    16 import com.rabbitmq.client.QueueingConsumer;
    179
    1810/**
     
    2820        private static final long serialVersionUID = 1L;
    2921
    30         // fanout broker
    31         private String brokerSet;
    32         // id broker
    33         private String brokerName;
    34 
    35         public void startRemoteBroker(String brokerSet, String brokerName, Broker broker, Properties env) throws Exception {
    36                 this.broker = broker;
    37                 this.UID = brokerName;
    38                 this.env = env;
    39                 this.brokerSet = brokerSet;
    40                 this.brokerName = brokerName;
    41 
    42                 this.params = new HashMap<String, List<Class<?>>>();
    43                 for (Method m : this.getClass().getMethods()) {
    44                         List<Class<?>> list = new ArrayList<Class<?>>();
    45                         for (Class<?> clazz : m.getParameterTypes()) {
    46                                 list.add(clazz);
    47                         }
    48                         this.params.put(m.getName(), list);
    49                 }
    50 
    51                 // Get num threads to use
    52                 int numThreads = Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS, "1"));
    53                 this.remoteWrapper = new RemoteWrapper(this, numThreads, broker.getSerializer());
    54 
    55                 startQueues();
    56 
    57                 // Start this listener
    58                 this.start();
    59         }
    60 
    61         private void startQueues() throws Exception {
    62                 /*
    63                  * Unique queue
    64                  */
    65                 channel.exchangeDeclare(brokerSet, "direct");
    66                 channel.queueDeclare(brokerName, false, true, true, null);
    67                 channel.queueBind(brokerName, brokerSet, brokerName);
    68 
    69                 /*
    70                  * Multi queue
    71                  */
    72                 channel.exchangeDeclare("multi#" + brokerSet, "fanout");
    73                 channel.queueDeclare("multi#" + brokerName, false, true, true, null);
    74                 channel.queueBind("multi#" + brokerName, "multi#" + brokerSet, "");
    75 
    76                 /*
    77                  * Consumer
    78                  */
    79 
    80                 consumer = new QueueingConsumer(channel);
    81                 channel.basicConsume(brokerName, true, consumer);
    82                 channel.basicConsume(brokerName + "#multi", true, consumer);
     22        @Override
     23        public Set<String> getRemoteObjects() {
     24                return getBroker().getRemoteObjs().keySet();
    8325        }
    8426
    8527        @Override
    86         public Set<String> getRemoteObjects() {
    87                 return this.broker.getRemoteObjs().keySet();
     28        public void spawnObject(String reference, String className, Properties env) throws Exception {
     29                RemoteObject remote = (RemoteObject) Class.forName(className).newInstance();
     30                getBroker().bind(reference, remote, env);
    8831        }
    8932
    9033        @Override
    91         public void spawnObject(String reference, String className, Class<?> parameterTypes, Object... args) throws Exception {
    92                 RemoteObject remote = (RemoteObject) Class.forName(className).getConstructor(parameterTypes).newInstance(args);
    93                 this.broker.bind(reference, remote);
     34        public void spawnObject(String reference, String className) throws Exception {
     35                RemoteObject remote = (RemoteObject) Class.forName(className).newInstance();
     36                getBroker().bind(reference, remote);
    9437        }
    9538
    9639        @Override
    9740        public void deleteObject(String reference) throws RemoteException, IOException {
    98                 this.broker.unbind(reference);
     41                getBroker().unbind(reference);
    9942        }
    10043
    10144        @Override
    102         public HasObject hasObject(String reference) {
    103                 boolean hasIt = this.broker.getRemoteObjs().containsKey(reference);
    104                 return new HasObject(this.brokerName, reference, hasIt);
     45        public boolean hasObject(String reference) {
     46                return getBroker().getRemoteObjs().containsKey(reference);
    10547        }
    10648
  • branches/supervisor/src/main/java/omq/server/RemoteObject.java

    r91 r92  
    3838        private static final Logger logger = Logger.getLogger(RemoteObject.class.getName());
    3939
    40         protected String UID;
    41         protected Properties env;
    42         protected transient Broker broker;
    43         protected transient String multiQueue;
    44         protected transient RemoteWrapper remoteWrapper;
    45         protected transient Map<String, List<Class<?>>> params;
    46         protected transient Channel channel;
    47         protected transient QueueingConsumer consumer;
    48         protected transient boolean killed = false;
     40        private String UID;
     41        private Properties env;
     42        private transient Broker broker;
     43        private transient String multiQueue;
     44        private transient RemoteWrapper remoteWrapper;
     45        private transient Map<String, List<Class<?>>> params;
     46        private transient Channel channel;
     47        private transient QueueingConsumer consumer;
     48        private transient boolean killed = false;
    4949
    5050        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
     
    254254        }
    255255
     256        public Broker getBroker() {
     257                return broker;
     258        }
     259
    256260        /**
    257261         * This method starts the queues using the information got in the
     
    319323                boolean autoAck = false;
    320324
     325                //TODO see if this is useless
     326                int prefetchCount = 1;
     327                channel.basicQos(prefetchCount);
     328
    321329                // Declare a new consumer
    322330                consumer = new QueueingConsumer(channel);
  • branches/supervisor/src/main/java/omq/supervisor/OmqSettings.java

    r91 r92  
    11package omq.supervisor;
    22
     3import java.io.Serializable;
    34import java.util.Properties;
    45
    5 public class OmqSettings {
     6public class OmqSettings implements Serializable {
     7       
     8       
    69
     10        /**
     11         *
     12         */
     13        private static final long serialVersionUID = 1L;
    714        private String reference;
    815        private String className;
  • branches/supervisor/src/main/java/omq/supervisor/Supervisor.java

    r91 r92  
    11package omq.supervisor;
    22
    3 import java.util.Map;
    4 import java.util.Set;
     3import omq.Remote;
    54
    6 import omq.common.broker.RemoteBroker;
     5public interface Supervisor extends Remote {
    76
    8 public class Supervisor {
     7        public void subscribe(String brokerName) throws Exception;
    98
    10         private Set<String> bindReferences;
    11         private Map<String, RemoteBroker> brokers;
     9        public void spawnObject(OmqSettings settings) throws Exception;
    1210
    13         private void checkObject() {
    14                 String reference = null;
    15 
    16                 int numObjects = 0;
    17 
    18                 if(minObjects > numObjects || maxMessages < numEncuats){
    19                         spawn:
    20                                 pregunta a tots i qui no té l'objecte li poses
    21                 }else if(numEncuats < minMessages && minObjects > numObjects){
    22                         delete:
    23                                 pregunta a tots i qui té l'objecte li treus
    24                 }
    25         }
    2611}
  • branches/supervisor/src/test/java/omq/test/lock/SleepTest.java

    r91 r92  
    2020                env.setProperty(ParameterQueue.RABBIT_HOST, "127.0.0.1");
    2121                env.setProperty(ParameterQueue.RABBIT_PORT, "5672");
    22                 env.setProperty(ParameterQueue.NUM_THREADS, "2");
     22                env.setProperty(ParameterQueue.NUM_THREADS, "1");
    2323
    24                 SleepImpl sleep = new SleepImpl();
     24                for (int i = 0; i < 4; i++) {
     25                        Broker broker = new Broker(env);
     26                        broker.bind("sleep", new SleepImpl());
     27                }
    2528
    26                 Broker broker = new Broker(env);
    27                 broker.bind("sleep", sleep);
    2829        }
    2930
Note: See TracChangeset for help on using the changeset viewer.