Changeset 4


Ignore:
Timestamp:
05/07/13 10:46:22 (11 years ago)
Author:
stoda
Message:

Queue per object added

Location:
objectmq/src/omq
Files:
2 added
2 deleted
7 edited
2 moved

Legend:

Unmodified
Added
Removed
  • objectmq/src/omq/client/proxy/Proxymq.java

    r3 r4  
    2525import omq.exception.NoContainsInstanceException;
    2626
    27 
    2827import com.rabbitmq.client.Channel;
    2928
     
    3534 *
    3635 */
    37 public class EvoProxy implements InvocationHandler, Remote {
     36public class Proxymq implements InvocationHandler, Remote {
    3837
    3938        /**
     
    4342        private static Map<String, Object> proxies = new Hashtable<String, Object>();
    4443
    45         private String objUid;
     44        private String uid;
    4645        private transient ResponseListener rListener;
    4746        private transient EventDispatcher dispatcher;
     
    5554         * EvoProxy Constructor.
    5655         *
    57          * This constructor uses an objUid to know which object will call. It also
    58          * uses Properties to set where to send the messages
    59          *
    60          * @param objUid
    61          *            The objUid represents the unique identifier of a remote object
     56         * This constructor uses an uid to know which object will call. It also uses
     57         * Properties to set where to send the messages
     58         *
     59         * @param uid
     60         *            The uid represents the unique identifier of a remote object
    6261         * @param env
    6362         *            The environment is used to know where to send the messages
    6463         * @throws Exception
    6564         */
    66         public EvoProxy(String objUid, Properties env) throws Exception {
    67                 this.objUid = objUid;
     65        public Proxymq(String uid, Properties env) throws Exception {
     66                this.uid = uid;
    6867                this.rListener = ResponseListener.getRequestListener();
    6968                this.dispatcher = EventDispatcher.getDispatcher();
     
    8584         * EvoProxy Constructor.
    8685         *
    87          * This constructor uses an objUid to know which object will call. It also
    88          * uses Properties to set where to send the messages
    89          *
    90          * @param objUid
    91          *            The objUid represents the unique identifier of a remote object
     86         * This constructor uses an uid to know which object will call. It also uses
     87         * Properties to set where to send the messages
     88         *
     89         * @param uid
     90         *            The uid represents the unique identifier of a remote object
    9291         * @param clazz
    9392         *            It represents the real class of the remote object. With this
     
    9897         * @throws Exception
    9998         */
    100         public EvoProxy(String objUid, Class<?> clazz, Properties env) throws Exception {
    101                 this.objUid = objUid;
     99        public Proxymq(String uid, Class<?> clazz, Properties env) throws Exception {
     100                this.uid = uid;
    102101                this.rListener = ResponseListener.getRequestListener();
    103102                this.dispatcher = EventDispatcher.getDispatcher();
     
    132131                        } else if (methodName.equals("getListeners")) {
    133132                                return getListeners();
    134                         }
    135                         // else if (methodName.equals("notify")) {
    136                         // // notify fanout
    137                         // }
     133                        }
    138134                }
    139135
     
    176172                                timeout = sync.timeout();
    177173                        }
    178                         return new SyncRequest(this.objUid, corrId, methodName, args, timeout, retries);
     174                        return new SyncRequest(this.uid, corrId, methodName, args, timeout, retries);
    179175                } else {
    180176                        String topic = method.getAnnotation(AsyncMethod.class).generateEvent();
    181                         return new AsyncRequest(this.objUid, corrId, methodName, args, topic);
     177                        return new AsyncRequest(this.uid, corrId, methodName, args, topic);
    182178                }
    183179        }
     
    254250         *         implements the specified interfaces
    255251         */
    256         public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, EvoProxy proxy) {
     252        public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, Proxymq proxy) {
    257253                if (proxies.containsKey(proxy.getRef())) {
    258254                        System.out.println("Proxy trobat");
     
    279275        public void addListener(EventListener eventListener) throws Exception {
    280276                if (eventListener.getTopic() == null) {
    281                         eventListener.setTopic(objUid);
     277                        eventListener.setTopic(uid);
    282278                }
    283279                listeners.put(eventListener.getTopic(), eventListener);
     
    295291        @Override
    296292        public String getRef() {
    297                 return objUid;
     293                return uid;
    298294        }
    299295
  • objectmq/src/omq/client/remote/response/ResponseListener.java

    r3 r4  
    55import java.util.Properties;
    66
    7 import omq.client.proxy.EvoProxy;
     7import omq.client.proxy.Proxymq;
    88import omq.common.message.response.Response;
    99import omq.common.remote.RemoteListener;
     
    157157
    158158        // Revisar això
    159         public void registerProxy(EvoProxy proxy) {
     159        public void registerProxy(Proxymq proxy) {
    160160                if (!results.containsKey(proxy.getRef())) {
    161161                        results.put(proxy.getRef(), proxy.getResults());
  • objectmq/src/omq/common/event/EventDispatcher.java

    r3 r4  
    66import java.util.Vector;
    77
    8 import omq.common.remote.RevoConnectionFactory;
     8import omq.common.remote.OmqConnectionFactory;
    99import omq.common.util.ParameterQueue;
    1010import omq.common.util.Serializer;
     
    4343
    4444                // Get a new connection and a new channel
    45                 connection = RevoConnectionFactory.getConnection(env);
     45                connection = OmqConnectionFactory.getConnection(env);
    4646                channel = connection.createChannel();
    4747
  • objectmq/src/omq/common/event/EventTrigger.java

    r3 r4  
    66import java.util.Properties;
    77
    8 import omq.common.remote.RevoConnectionFactory;
     8import omq.common.remote.OmqConnectionFactory;
    99import omq.common.util.RevoEnvironment;
    1010import omq.common.util.Serializer;
     
    2828
    2929        private EventTrigger(Properties env) throws IOException, KeyManagementException, NoSuchAlgorithmException {
    30                 connection = RevoConnectionFactory.getConnection(env);
     30                connection = OmqConnectionFactory.getConnection(env);
    3131                channel = connection.createChannel();
    3232        }
  • objectmq/src/omq/common/message/response/ProxyResponse.java

    r3 r4  
    33import java.util.Properties;
    44
    5 import omq.client.proxy.EvoProxy;
     5import omq.client.proxy.Proxymq;
    66
    77
     
    3131        @Override
    3232        public Object getResp(Properties env) throws Exception {
    33                 if (!EvoProxy.containsProxy(reference)) {
     33                if (!Proxymq.containsProxy(reference)) {
    3434                        Class<?> clazz = Class.forName(remoteInterface);
    35                         EvoProxy evoProxy = new EvoProxy(reference, clazz, env);
     35                        Proxymq evoProxy = new Proxymq(reference, clazz, env);
    3636                        Class<?>[] array = { clazz };
    37                         return EvoProxy.newProxyInstance(clazz.getClassLoader(), array, evoProxy);
     37                        return Proxymq.newProxyInstance(clazz.getClassLoader(), array, evoProxy);
    3838                }
    39                 return EvoProxy.getInstance(reference);
     39                return Proxymq.getInstance(reference);
    4040        }
    4141
  • objectmq/src/omq/common/remote/OmqConnectionFactory.java

    r3 r4  
    1717 *
    1818 */
    19 public class RevoConnectionFactory {
     19public class OmqConnectionFactory {
    2020        public static Connection getConnection(Properties env) throws IOException, KeyManagementException, NoSuchAlgorithmException {
    2121                // Get login info of rabbitmq
  • objectmq/src/omq/common/remote/RemoteListener.java

    r3 r4  
    3737
    3838        private void startConnection(Properties env) throws Exception {
    39                 connection = RevoConnectionFactory.getConnection(env);
     39                connection = OmqConnectionFactory.getConnection(env);
    4040                channel = connection.createChannel();
    4141        }
  • objectmq/src/omq/common/util/ParameterQueue.java

    r3 r4  
    3535        public static String RPC_TYPE = "direct";
    3636
     37        public static String NUM_THREADS = "omq.num_threads";
     38
    3739        public static String REGISTRY_NAME = "REGISTRY";
    3840
  • objectmq/src/omq/server/remote/request/RemoteObject.java

    r3 r4  
    11package omq.server.remote.request;
    22
     3import java.io.IOException;
    34import java.lang.reflect.InvocationTargetException;
    45import java.lang.reflect.Method;
    5 import java.util.ArrayList;
    66import java.util.Collection;
    77import java.util.HashMap;
    88import java.util.Map;
     9import java.util.Properties;
    910import java.util.Vector;
    1011
    1112import omq.Remote;
     13import omq.common.broker.Broker;
    1214import omq.common.event.EventListener;
    1315import omq.common.event.EventTrigger;
    14 import omq.common.message.response.CollectionResponse;
    1516import omq.common.message.response.DefaultResponse;
    1617import omq.common.message.response.ExceptionResponse;
    17 import omq.common.message.response.ProxyResponse;
    1818import omq.common.message.response.Response;
    19 
     19import omq.common.util.ParameterQueue;
     20import omq.exception.SerializerException;
    2021
    2122import com.rabbitmq.client.Channel;
     23import com.rabbitmq.client.Connection;
     24import com.rabbitmq.client.ConsumerCancelledException;
     25import com.rabbitmq.client.QueueingConsumer;
     26import com.rabbitmq.client.QueueingConsumer.Delivery;
     27import com.rabbitmq.client.ShutdownSignalException;
    2228
    2329/**
     
    2632 *
    2733 */
    28 public abstract class RemoteObject implements Remote {
     34public abstract class RemoteObject extends Thread implements Remote {
    2935
    3036        private static final long serialVersionUID = -1778953938739846450L;
    3137
    3238        private String UID;
     39        private RemoteWrapper remoteWrapper;
     40        private Connection connection;
     41        private Channel channel;
     42        private QueueingConsumer consumer;
     43        private boolean killed = false;
    3344
    3445        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
     
    4455        }
    4556
    46         public RemoteObject(String UID) throws Exception {
    47                 this.UID = UID;
    48                 registerToRequestListener();
    49                 declareEventTopic();
    50         }
    51 
    52         public RemoteObject() throws Exception {
    53                 this.UID = java.util.UUID.randomUUID().toString();
    54                 System.out.println("New Object its UID is " + this.UID);
    55                 registerToRequestListener();
    56                 declareEventTopic();
    57         }
    58 
    59         /**
    60          * Registers this object in the RequestListener
    61          *
    62          * @throws Exception
    63          */
    64         private void registerToRequestListener() throws Exception {
    65                 RequestListener rListener = RequestListener.getRequestListener();
    66                 rListener.addObj(this);
    67         }
    68 
    69         private void declareEventTopic() throws Exception {
    70                 RequestListener rListener = RequestListener.getRequestListener();
    71                 Channel channel = rListener.getChannel();
     57        public void start(String reference, Properties env) throws Exception {
     58                this.UID = reference;
     59
     60                // Get num threads to use
     61                int numThreads = Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS));
     62                remoteWrapper = new RemoteWrapper(this, numThreads);
     63
     64                // Get info about which exchange and queue will use
     65                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
     66                String queue = UID;
     67                String routingKey = UID;
     68
     69                // Start connection and channel
     70                connection = Broker.getConnection();
     71                channel = connection.createChannel();
     72
     73                // Declares and bindings
     74                channel.exchangeDeclare(exchange, "direct");
     75                channel.queueDeclare(queue, false, false, false, null);
     76                channel.queueBind(queue, exchange, routingKey);
     77
     78                // Declare the event topic fanout
    7279                channel.exchangeDeclare(UID, "fanout");
    73                 channel.close();
     80
     81                // Declare a new consumer
     82                consumer = new QueueingConsumer(channel);
     83                channel.basicConsume(queue, true, consumer);
     84
     85                // Start this listener
     86                this.start();
     87        }
     88
     89        @Override
     90        public void run() {
     91                while (!killed) {
     92                        try {
     93                                Delivery delivery = consumer.nextDelivery();
     94                                remoteWrapper.notifyDelivery(delivery);
     95                        } catch (InterruptedException i) {
     96                                i.printStackTrace();
     97                        } catch (ShutdownSignalException e) {
     98                                e.printStackTrace();
     99                        } catch (ConsumerCancelledException e) {
     100                                e.printStackTrace();
     101                        } catch (SerializerException e) {
     102                                e.printStackTrace();
     103                        } catch (Exception e) {
     104                                e.printStackTrace();
     105                        }
     106                }
    74107        }
    75108
     
    79112        }
    80113
     114        public void kill() throws IOException {
     115                interrupt();
     116                killed = true;
     117                channel.close();
     118                connection.close();
     119                remoteWrapper.stopRemoteWrapper();
     120        }
     121
    81122        @Override
    82123        public Response invokeMethod(String methodName, Vector<Object> args) throws Exception {
    83                 Response resp = null;
    84 
    85124                Object[] arguments = new Object[args.size()];
    86125
    87126                for (int i = 0; i < args.size(); i++) {
    88127                        Object arg = args.get(i);
    89                         if (arg instanceof Remote) {
    90                                 arg = RequestListener.getRequestListener().getObj(((Remote) arg).getRef());
    91                         }
     128                        // TODO: what happens if the object is a remote object?
    92129                        arguments[i] = arg;
    93130                }
     
    98135                try {
    99136                        Object result = method.invoke(this, arguments);
    100                         // TODO see if a result is a collection and if it has some remote
    101                         // objects
    102                         if (result instanceof Remote) {
    103                                 resp = getProxyResponse((Remote) result);
    104                         } else if (result instanceof Collection<?>) {
    105                                 Collection<?> collection = (Collection<?>) result;
    106                                 boolean containsRemote = false;
    107                                 for (Object o : collection) {
    108                                         if (o instanceof Remote) {
    109                                                 containsRemote = true;
    110                                                 break;
    111                                         }
    112                                 }
    113                                 if (containsRemote) {
    114                                         Collection<Response> responses = new ArrayList<Response>();
    115                                         for (Object o : collection) {
    116                                                 if (o instanceof Remote) {
    117                                                         responses.add(getProxyResponse((Remote) o));
    118                                                 } else {
    119                                                         responses.add(new DefaultResponse(this.getRef(), o));
    120                                                 }
    121                                         }
    122                                         String collectionType = collection.getClass().getCanonicalName();
    123                                         resp = new CollectionResponse(this.getRef(), collectionType, responses);
    124                                 } else {
    125                                         resp = new DefaultResponse(this.getRef(), result);
    126                                 }
    127                         } else {
    128                                 resp = new DefaultResponse(this.getRef(), result);
    129                         }
     137                        return new DefaultResponse(this.getRef(), result);
    130138                } catch (InvocationTargetException e) {
    131                         resp = new ExceptionResponse(this.getRef(), e.getTargetException());
    132                 }
    133 
    134                 return resp;
     139                        return new ExceptionResponse(this.getRef(), e.getTargetException());
     140                }
    135141        }
    136142
     
    186192        }
    187193
    188         private Response getProxyResponse(Remote r) {
    189                 String reference = r.getRef();
    190                 String remoteInterface = r.getClass().getInterfaces()[0].getCanonicalName();
    191                 return new ProxyResponse(this.getRef(), reference, remoteInterface);
    192         }
    193 
    194194        @Override
    195195        public void notify(Object obj) throws Exception {
Note: See TracChangeset for help on using the changeset viewer.