Ignore:
Timestamp:
10/01/13 12:02:41 (11 years ago)
Author:
stoda
Message:

TODO: delete in supervisor
check the code

Location:
branches/supervisor/src/main/java/omq/common/broker
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • branches/supervisor/src/main/java/omq/common/broker/Broker.java

    r91 r92  
    2020import omq.exception.RemoteException;
    2121import omq.server.RemoteObject;
     22import omq.supervisor.Supervisor;
    2223
    2324import org.apache.log4j.Logger;
     
    4950        private boolean connectionClosed = false;
    5051        private Properties environment = null;
    51         private RemoteBrokerImpl remoteBrokerImpl;
    5252        private Map<String, RemoteObject> remoteObjs;
    5353        private Map<String, Object> proxies = new Hashtable<String, Object>();
    5454        private Map<String, Object> multiProxies = new Hashtable<String, Object>();
     55
     56        // Supervisor
     57        private Supervisor supervisor;
    5558
    5659        public Broker(Properties env) throws Exception {
     
    358361        }
    359362
    360         public void setSupervisor(String brokerSet, String brokerName) throws Exception {
    361                 remoteBrokerImpl = new RemoteBrokerImpl();
    362                 remoteBrokerImpl.startRemoteBroker(brokerSet, brokerName, this, getEnvironment());
    363         }
    364 
    365363        public Properties getEnvironment() {
    366364                return environment;
     
    379377        }
    380378
     379        /*
     380         * Supervisor
     381         */
     382        public void setSupervisor(String supervisorName, String brokerName) throws Exception {
     383                // Create a RemoteBrokerImpl
     384                bind(brokerName, new RemoteBrokerImpl());
     385                // Subscribe broker
     386                supervisor = lookup(supervisorName, Supervisor.class);
     387                supervisor.subscribe(brokerName);
     388                logger.info("Supervisor set: " + supervisorName + ", BrokerName: " + brokerName);
     389        }
     390
     391        public Supervisor getSupervisor() {
     392                return supervisor;
     393        }
    381394}
  • branches/supervisor/src/main/java/omq/common/broker/RemoteBroker.java

    r91 r92  
    22
    33import java.io.IOException;
     4import java.util.Properties;
    45import java.util.Set;
    56
    67import omq.Remote;
     8import omq.client.annotation.SyncMethod;
    79import omq.exception.RemoteException;
    810
     
    1012        public Set<String> getRemoteObjects();
    1113
    12         public void spawnObject(String reference, String className, Class<?> parameterTypes, Object... args) throws Exception;
     14        public void spawnObject(String reference, String className, Properties env) throws Exception;
     15
     16        public void spawnObject(String reference, String className) throws Exception;
    1317
    1418        public void deleteObject(String reference) throws RemoteException, IOException;
    1519
    16         public HasObject hasObject(String reference);
     20        @SyncMethod(retry = 1, timeout = 1000)
     21        public boolean hasObject(String reference);
    1722
    1823}
  • branches/supervisor/src/main/java/omq/common/broker/RemoteBrokerImpl.java

    r91 r92  
    22
    33import java.io.IOException;
    4 import java.lang.reflect.Method;
    5 import java.util.ArrayList;
    6 import java.util.HashMap;
    7 import java.util.List;
    84import java.util.Properties;
    95import java.util.Set;
    106
    11 import omq.common.util.ParameterQueue;
    127import omq.exception.RemoteException;
    138import omq.server.RemoteObject;
    14 import omq.server.RemoteWrapper;
    15 
    16 import com.rabbitmq.client.QueueingConsumer;
    179
    1810/**
     
    2820        private static final long serialVersionUID = 1L;
    2921
    30         // fanout broker
    31         private String brokerSet;
    32         // id broker
    33         private String brokerName;
    34 
    35         public void startRemoteBroker(String brokerSet, String brokerName, Broker broker, Properties env) throws Exception {
    36                 this.broker = broker;
    37                 this.UID = brokerName;
    38                 this.env = env;
    39                 this.brokerSet = brokerSet;
    40                 this.brokerName = brokerName;
    41 
    42                 this.params = new HashMap<String, List<Class<?>>>();
    43                 for (Method m : this.getClass().getMethods()) {
    44                         List<Class<?>> list = new ArrayList<Class<?>>();
    45                         for (Class<?> clazz : m.getParameterTypes()) {
    46                                 list.add(clazz);
    47                         }
    48                         this.params.put(m.getName(), list);
    49                 }
    50 
    51                 // Get num threads to use
    52                 int numThreads = Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS, "1"));
    53                 this.remoteWrapper = new RemoteWrapper(this, numThreads, broker.getSerializer());
    54 
    55                 startQueues();
    56 
    57                 // Start this listener
    58                 this.start();
    59         }
    60 
    61         private void startQueues() throws Exception {
    62                 /*
    63                  * Unique queue
    64                  */
    65                 channel.exchangeDeclare(brokerSet, "direct");
    66                 channel.queueDeclare(brokerName, false, true, true, null);
    67                 channel.queueBind(brokerName, brokerSet, brokerName);
    68 
    69                 /*
    70                  * Multi queue
    71                  */
    72                 channel.exchangeDeclare("multi#" + brokerSet, "fanout");
    73                 channel.queueDeclare("multi#" + brokerName, false, true, true, null);
    74                 channel.queueBind("multi#" + brokerName, "multi#" + brokerSet, "");
    75 
    76                 /*
    77                  * Consumer
    78                  */
    79 
    80                 consumer = new QueueingConsumer(channel);
    81                 channel.basicConsume(brokerName, true, consumer);
    82                 channel.basicConsume(brokerName + "#multi", true, consumer);
     22        @Override
     23        public Set<String> getRemoteObjects() {
     24                return getBroker().getRemoteObjs().keySet();
    8325        }
    8426
    8527        @Override
    86         public Set<String> getRemoteObjects() {
    87                 return this.broker.getRemoteObjs().keySet();
     28        public void spawnObject(String reference, String className, Properties env) throws Exception {
     29                RemoteObject remote = (RemoteObject) Class.forName(className).newInstance();
     30                getBroker().bind(reference, remote, env);
    8831        }
    8932
    9033        @Override
    91         public void spawnObject(String reference, String className, Class<?> parameterTypes, Object... args) throws Exception {
    92                 RemoteObject remote = (RemoteObject) Class.forName(className).getConstructor(parameterTypes).newInstance(args);
    93                 this.broker.bind(reference, remote);
     34        public void spawnObject(String reference, String className) throws Exception {
     35                RemoteObject remote = (RemoteObject) Class.forName(className).newInstance();
     36                getBroker().bind(reference, remote);
    9437        }
    9538
    9639        @Override
    9740        public void deleteObject(String reference) throws RemoteException, IOException {
    98                 this.broker.unbind(reference);
     41                getBroker().unbind(reference);
    9942        }
    10043
    10144        @Override
    102         public HasObject hasObject(String reference) {
    103                 boolean hasIt = this.broker.getRemoteObjs().containsKey(reference);
    104                 return new HasObject(this.brokerName, reference, hasIt);
     45        public boolean hasObject(String reference) {
     46                return getBroker().getRemoteObjs().containsKey(reference);
    10547        }
    10648
Note: See TracChangeset for help on using the changeset viewer.