Ignore:
Timestamp:
10/01/13 12:02:41 (11 years ago)
Author:
stoda
Message:

TODO: delete in supervisor
check the code

File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/supervisor/src/main/java/omq/common/broker/RemoteBrokerImpl.java

    r91 r92  
    22
    33import java.io.IOException;
    4 import java.lang.reflect.Method;
    5 import java.util.ArrayList;
    6 import java.util.HashMap;
    7 import java.util.List;
    84import java.util.Properties;
    95import java.util.Set;
    106
    11 import omq.common.util.ParameterQueue;
    127import omq.exception.RemoteException;
    138import omq.server.RemoteObject;
    14 import omq.server.RemoteWrapper;
    15 
    16 import com.rabbitmq.client.QueueingConsumer;
    179
    1810/**
     
    2820        private static final long serialVersionUID = 1L;
    2921
    30         // fanout broker
    31         private String brokerSet;
    32         // id broker
    33         private String brokerName;
    34 
    35         public void startRemoteBroker(String brokerSet, String brokerName, Broker broker, Properties env) throws Exception {
    36                 this.broker = broker;
    37                 this.UID = brokerName;
    38                 this.env = env;
    39                 this.brokerSet = brokerSet;
    40                 this.brokerName = brokerName;
    41 
    42                 this.params = new HashMap<String, List<Class<?>>>();
    43                 for (Method m : this.getClass().getMethods()) {
    44                         List<Class<?>> list = new ArrayList<Class<?>>();
    45                         for (Class<?> clazz : m.getParameterTypes()) {
    46                                 list.add(clazz);
    47                         }
    48                         this.params.put(m.getName(), list);
    49                 }
    50 
    51                 // Get num threads to use
    52                 int numThreads = Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS, "1"));
    53                 this.remoteWrapper = new RemoteWrapper(this, numThreads, broker.getSerializer());
    54 
    55                 startQueues();
    56 
    57                 // Start this listener
    58                 this.start();
    59         }
    60 
    61         private void startQueues() throws Exception {
    62                 /*
    63                  * Unique queue
    64                  */
    65                 channel.exchangeDeclare(brokerSet, "direct");
    66                 channel.queueDeclare(brokerName, false, true, true, null);
    67                 channel.queueBind(brokerName, brokerSet, brokerName);
    68 
    69                 /*
    70                  * Multi queue
    71                  */
    72                 channel.exchangeDeclare("multi#" + brokerSet, "fanout");
    73                 channel.queueDeclare("multi#" + brokerName, false, true, true, null);
    74                 channel.queueBind("multi#" + brokerName, "multi#" + brokerSet, "");
    75 
    76                 /*
    77                  * Consumer
    78                  */
    79 
    80                 consumer = new QueueingConsumer(channel);
    81                 channel.basicConsume(brokerName, true, consumer);
    82                 channel.basicConsume(brokerName + "#multi", true, consumer);
     22        @Override
     23        public Set<String> getRemoteObjects() {
     24                return getBroker().getRemoteObjs().keySet();
    8325        }
    8426
    8527        @Override
    86         public Set<String> getRemoteObjects() {
    87                 return this.broker.getRemoteObjs().keySet();
     28        public void spawnObject(String reference, String className, Properties env) throws Exception {
     29                RemoteObject remote = (RemoteObject) Class.forName(className).newInstance();
     30                getBroker().bind(reference, remote, env);
    8831        }
    8932
    9033        @Override
    91         public void spawnObject(String reference, String className, Class<?> parameterTypes, Object... args) throws Exception {
    92                 RemoteObject remote = (RemoteObject) Class.forName(className).getConstructor(parameterTypes).newInstance(args);
    93                 this.broker.bind(reference, remote);
     34        public void spawnObject(String reference, String className) throws Exception {
     35                RemoteObject remote = (RemoteObject) Class.forName(className).newInstance();
     36                getBroker().bind(reference, remote);
    9437        }
    9538
    9639        @Override
    9740        public void deleteObject(String reference) throws RemoteException, IOException {
    98                 this.broker.unbind(reference);
     41                getBroker().unbind(reference);
    9942        }
    10043
    10144        @Override
    102         public HasObject hasObject(String reference) {
    103                 boolean hasIt = this.broker.getRemoteObjs().containsKey(reference);
    104                 return new HasObject(this.brokerName, reference, hasIt);
     45        public boolean hasObject(String reference) {
     46                return getBroker().getRemoteObjs().containsKey(reference);
    10547        }
    10648
Note: See TracChangeset for help on using the changeset viewer.