Ignore:
Timestamp:
10/19/13 13:20:44 (11 years ago)
Author:
stoda
Message:

Error detected: there should be only one thread listening to the multiexchange queue.
TODO: change this. Make refactor in the invocationthread. Change remotethreadpool to achieve this behavior.
AInvocationThread <- InvocationTHread

<- MultiInvocationTHread

Location:
branches/supervisor/src
Files:
13 added
8 edited

Legend:

Unmodified
Added
Removed
  • branches/supervisor/src/main/java/omq/Remote.java

    r105 r107  
    2222         */
    2323        public String getRef();
     24       
     25        public String getUID();
     26
     27        public void setUID(String uID);
    2428}
  • branches/supervisor/src/main/java/omq/client/proxy/Proxymq.java

    r105 r107  
    4747
    4848        private String reference;
     49        private String UID;
    4950        private transient String exchange;
    5051        private transient String multiExchange;
     
    7273         * Proxymq Constructor.
    7374         *
    74          * This constructor uses an reference to know which object will call. It also uses
    75          * Properties to set where to send the messages
     75         * This constructor uses an reference to know which object will call. It
     76         * also uses Properties to set where to send the messages
    7677         *
    7778         * @param reference
    78          *            The reference represents the unique identifier of a remote object
     79         *            The reference represents the unique identifier of a remote
     80         *            object
    7981         * @param clazz
    8082         *            It represents the real class of the remote object. With this
     
    119121                                return getRef();
    120122                        }
     123                        if (methodName.equals("getUID")) {
     124                                return getUID();
     125                        }
     126                        if (methodName.equals("setUID")) {
     127                                setUID((String) arguments[0]);
     128                                return null;
     129                        }
    121130                        if (methodName.equals("equals")) {
    122131                                if (arguments[0] instanceof Remote) {
     
    168177                }
    169178
     179                // TODO look this carefully
     180                String appId = UID == null ? reference : UID;
     181
    170182                // Add the correlation ID and create a replyTo property
    171                 BasicProperties props = new BasicProperties.Builder().appId(reference).correlationId(corrId).replyTo(replyQueueName).type(serializerType)
    172                                 .deliveryMode(deliveryMode).build();
     183                BasicProperties props = new BasicProperties.Builder().appId(appId).correlationId(corrId).replyTo(replyQueueName)
     184                                .type(serializerType).deliveryMode(deliveryMode).build();
    173185
    174186                // Publish the message
    175187                byte[] bytesRequest = serializer.serialize(serializerType, request);
    176188                broker.publishMessge(exchange, routingkey, props, bytesRequest);
    177                 logger.debug("Proxymq: " + reference + " invokes '" + request.getMethod() + "' , corrID: " + corrId + ", exchange: " + exchange + ", replyQueue: "
    178                                 + replyQueueName + ", serializerType: " + serializerType + ", multi call: " + request.isMulti() + ", async call: " + request.isAsync()
    179                                 + ", delivery mode: " + deliveryMode);
     189                logger.debug("Proxymq: " + reference + " invokes '" + request.getMethod() + "' , corrID: " + corrId + ", exchange: " + exchange
     190                                + ", replyQueue: " + replyQueueName + ", serializerType: " + serializerType + ", multi call: " + request.isMulti()
     191                                + ", async call: " + request.isAsync() + ", delivery mode: " + deliveryMode);
    180192        }
    181193
     
    362374        }
    363375
     376        @Override
     377        public String getUID() {
     378                return UID;
     379        }
     380
     381        @Override
     382        public void setUID(String uID) {
     383                this.UID = uID;
     384        }
     385
    364386}
  • branches/supervisor/src/main/java/omq/common/broker/RemoteBroker.java

    r103 r107  
    66
    77import omq.Remote;
    8 import omq.client.annotation.SyncMethod;
    98import omq.exception.RemoteException;
    109import omq.exception.RetryException;
     
    1918        public void deleteObject(String reference) throws RemoteException, IOException;
    2019
    21         @SyncMethod(retry = 1, timeout = 1000)
    2220        public boolean hasObject(String reference) throws RetryException;
    2321
    24         @SyncMethod(retry = 1, timeout = 1000)
    2522        public HasObject hasObjectInfo(String reference) throws RetryException;
    2623
  • branches/supervisor/src/main/java/omq/common/broker/RemoteBrokerImpl.java

    r103 r107  
    5050        @Override
    5151        public HasObject hasObjectInfo(String reference) throws RetryException {
     52                System.out.println("Hola soc un broker"+ getRef() + ", "+getUID()+ ", fil: "+Thread.currentThread().getId());
    5253                if (getBroker().getRemoteObjs().containsKey(reference)) {
    5354                        RemoteObject r = getBroker().getRemoteObjs().get(reference);
    5455                        int numThreads = r.getPool().getWorkers().size();
    55                         return new HasObject(this.getRef(), reference, true, numThreads);
     56                        return new HasObject(this.getUID(), reference, true, numThreads);
    5657                }
    57                 return new HasObject(this.getRef(), reference, false, 0);
     58                return new HasObject(this.getUID(), reference, false, 0);
    5859        }
    5960
  • branches/supervisor/src/main/java/omq/common/broker/RemoteMultiBroker.java

    r106 r107  
    11package omq.common.broker;
    2 
    3 import java.util.Set;
    42
    53import omq.Remote;
     
    97
    108public interface RemoteMultiBroker extends Remote {
    11         @MultiMethod
    12         @SyncMethod(retry = 1, timeout = 1000)
    13         public Set<String> getRemoteObjects();
    149
    1510        @MultiMethod
  • branches/supervisor/src/main/java/omq/server/InvocationThread.java

    r106 r107  
    172172                channel = broker.getNewChannel();
    173173
    174                 /*
    175                  * Default queue, Round Robin behaviour
    176                  */
    177 
    178174                // Get info about which exchange and queue will use
    179175                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE, "");
     
    242238                 */
    243239
     240                // Disable Round Robin behavior
    244241                boolean autoAck = false;
    245242
  • branches/supervisor/src/main/java/omq/supervisor/SupervisorImpl.java

    r106 r107  
    11package omq.supervisor;
    22
    3 import java.util.ArrayList;
     3import java.lang.reflect.Proxy;
    44import java.util.HashMap;
    5 import java.util.List;
    65import java.util.Map;
    76import java.util.Set;
    87
     8import omq.client.proxy.Proxymq;
    99import omq.common.broker.HasObject;
    1010import omq.common.broker.RemoteBroker;
     
    1515import org.apache.log4j.Logger;
    1616
     17import com.rabbitmq.client.AMQP.Queue.DeclareOk;
    1718import com.rabbitmq.client.Channel;
    18 import com.rabbitmq.client.AMQP.Queue.DeclareOk;
    1919
    2020public class SupervisorImpl extends RemoteObject implements Supervisor, Runnable {
     
    2929        private long sleep;
    3030        private Map<String, OmqSettings> objectSettings;
    31         // TODO: Set<?>
    3231        private RemoteMultiBroker multiBroker;
    3332        private Map<String, RemoteBroker> brokerMap;
    34         private List<RemoteBroker> brokers;
    3533
    3634        public SupervisorImpl(String brokerSet, long sleep) {
    3735                this.brokerSet = brokerSet;
    3836                this.sleep = sleep;
    39                 brokers = new ArrayList<RemoteBroker>();
     37                brokerMap = new HashMap<String, RemoteBroker>();
    4038                objectSettings = new HashMap<String, OmqSettings>();
    4139        }
     
    7270                if (brokerSet.equals(brokerSet) && !brokerMap.containsKey(brokerName)) {
    7371                        logger.info("Broker " + brokerName + " subscrived");
    74                         RemoteBroker broker = getBroker().lookup(brokerName, RemoteBroker.class);
     72                        // RemoteBroker broker = getBroker().lookup(brokerSet,
     73                        // RemoteBroker.class);
     74                        Proxymq proxy = new Proxymq(brokerSet, RemoteBroker.class, getBroker());
     75                        Class<?>[] array = { RemoteBroker.class };
     76                        RemoteBroker broker = (RemoteBroker) Proxy.newProxyInstance(RemoteBroker.class.getClassLoader(), array, proxy);
     77                        broker.setUID(brokerName);
    7578                        brokerMap.put(brokerSet, broker);
    7679                } else {
     
    8689                        throw new Exception("JAJAJAJAJA");
    8790                }
     91               
     92                HasObject[] hasList = multiBroker.hasObjectInfo(reference);
     93
     94                int minObjects = settings.getMinNumberObjects();
     95                int numBrokers = hasList.length;
     96                int numObjects = 0;
     97
     98                for (HasObject h : hasList) {
     99                        if (h.hasObject()) {
     100                                numObjects++;
     101                        }
     102                }
     103                System.out.println("NumObjects " + numObjects + " numBrokers " + numBrokers);
     104
     105                int i = 0;
     106                while (numObjects <= minObjects && i < numBrokers) {
     107                        HasObject h = hasList[i++];
     108                        if (h.hasObject()) {
     109                                brokerMap.get(h.getBrokerName()).spawnObject(reference, settings.getClassName());
     110                                numObjects++;
     111                        }
     112                }
    88113                objectSettings.put(reference, settings);
     114
    89115        }
    90116
     
    114140        @Override
    115141        public void unbindObject(OmqSettings settings, HasObject[] hasList, int numObjects) throws Exception {
    116                 String reference = settings.getReference();
    117 
    118                 int minObjects = settings.getMinNumberObjects();
    119 
    120                 for (RemoteBroker broker : brokers) {
    121                         if (broker.hasObject(reference) && (numObjects - 1) >= minObjects) {
    122                                 broker.deleteObject(reference);
    123                                 break;
    124                         }
    125                 }
     142                // String reference = settings.getReference();
     143                //
     144                // int minObjects = settings.getMinNumberObjects();
     145                //
     146                // for (RemoteBroker broker : brokers) {
     147                // if (broker.hasObject(reference) && (numObjects - 1) >= minObjects) {
     148                // broker.deleteObject(reference);
     149                // break;
     150                // }
     151                // }
    126152
    127153        }
     
    166192        }
    167193
    168         public List<RemoteBroker> getBrokers() {
    169                 return brokers;
    170         }
    171 
    172         public void setBrokers(List<RemoteBroker> brokers) {
    173                 this.brokers = brokers;
    174         }
    175 
    176194        public String getBrokerSet() {
    177195                return brokerSet;
  • branches/supervisor/src/test/java/omq/test/supervisor/SleepTest.java

    r106 r107  
    2525                env1.setProperty(ParameterQueue.RABBIT_HOST, "127.0.0.1");
    2626                env1.setProperty(ParameterQueue.RABBIT_PORT, "5672");
    27                 env1.setProperty(ParameterQueue.MIN_POOL_THREADS, "4");
    28                 env1.setProperty(ParameterQueue.MAX_POOL_THREADS, "4");
     27                env1.setProperty(ParameterQueue.MIN_POOL_THREADS, "1");
     28                env1.setProperty(ParameterQueue.MAX_POOL_THREADS, "1");
    2929
    3030                Broker broker = new Broker(env1);
     
    3737                env2.setProperty(ParameterQueue.RABBIT_HOST, "127.0.0.1");
    3838                env2.setProperty(ParameterQueue.RABBIT_PORT, "5672");
    39                 env2.setProperty(ParameterQueue.MIN_POOL_THREADS, "4");
    40                 env2.setProperty(ParameterQueue.MAX_POOL_THREADS, "4");
     39                env2.setProperty(ParameterQueue.MIN_POOL_THREADS, "1");
     40                env2.setProperty(ParameterQueue.MAX_POOL_THREADS, "1");
    4141
    4242                Broker broker2 = new Broker(env2);
Note: See TracChangeset for help on using the changeset viewer.