Changeset 15


Ignore:
Timestamp:
05/20/13 19:45:19 (11 years ago)
Author:
stoda
Message:

Listeners added (not tested)
little test to shut up Vilella

Location:
trunk/objectmq/src/omq
Files:
2 added
9 edited

Legend:

Unmodified
Added
Removed
  • trunk/objectmq/src/omq/Remote.java

    r14 r15  
    33import java.io.IOException;
    44import java.io.Serializable;
     5import java.util.Collection;
    56
    67import omq.common.event.Event;
     8import omq.common.event.EventListener;
    79import omq.exception.SerializerException;
    810
     
    2224
    2325        public void notifyEvent(Event event) throws IOException, SerializerException;
     26
     27        public void addListener(EventListener eventListener) throws Exception;
     28
     29        public void removeListener(EventListener eventListener) throws Exception;
     30
     31        public Collection<EventListener> getListeners() throws Exception;
    2432}
  • trunk/objectmq/src/omq/client/proxy/Proxymq.java

    r14 r15  
    55import java.lang.reflect.Method;
    66import java.lang.reflect.Proxy;
     7import java.util.Collection;
    78import java.util.HashMap;
    89import java.util.Hashtable;
     
    1516import omq.client.remote.response.ResponseListener;
    1617import omq.common.event.Event;
     18import omq.common.event.EventDispatcher;
     19import omq.common.event.EventListener;
    1720import omq.common.message.Request;
    1821import omq.common.message.Response;
     
    4447        private String uid;
    4548        private transient ResponseListener rListener;
     49        private transient EventDispatcher dispatcher;
    4650        private transient Channel channel;
    4751        private transient Properties env;
    4852        private transient Map<String, byte[]> results;
    49         // private transient Map<Method, CallType> methodTypes;
     53        private transient Map<String, EventListener> listeners;
    5054
    5155        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
     
    7983                this.uid = uid;
    8084                this.rListener = ResponseListener.getRequestListener();
     85                this.dispatcher = EventDispatcher.getDispatcher();
    8186
    8287                this.channel = rListener.getChannel();
    8388                this.env = env;
     89
     90                listeners = new HashMap<String, EventListener>();
    8491
    8592                // Create a new hashmap and registry it in rListener
     
    97104                        if (methodName.equals("getRef")) {
    98105                                return getRef();
     106                        } else if (methodName.equals("addListener")) {
     107                                addListener((EventListener) arguments[0]);
     108                                return null;
     109                        } else if (methodName.equals("removeListener")) {
     110                                removeListener((EventListener) arguments[0]);
     111                                return null;
     112                        } else if (methodName.equals("getListeners")) {
     113                                return getListeners();
    99114                        }
    100115                }
     
    270285        @Override
    271286        public void notifyEvent(Event event) throws IOException, SerializerException {
    272 
     287        }
     288
     289        @Override
     290        public void addListener(EventListener eventListener) throws Exception {
     291                if (eventListener.getTopic() == null) {
     292                        eventListener.setTopic(uid);
     293                }
     294                listeners.put(eventListener.getTopic(), eventListener);
     295                dispatcher.addListener(eventListener);
     296        }
     297
     298        @Override
     299        public void removeListener(EventListener eventListener) throws Exception {
     300                listeners.remove(eventListener.getTopic());
     301                dispatcher.removeListener(eventListener);
     302        }
     303
     304        @Override
     305        public Collection<EventListener> getListeners() throws Exception {
     306                return listeners.values();
    273307        }
    274308
  • trunk/objectmq/src/omq/common/broker/Broker.java

    r14 r15  
    77import omq.client.proxy.Proxymq;
    88import omq.client.remote.response.ResponseListener;
     9import omq.common.event.EventDispatcher;
    910import omq.common.util.Environment;
    1011import omq.common.util.OmqConnectionFactory;
     
    1920        private static Connection connection;
    2021        private static Channel channel;
     22        private static boolean clientStarted = false;
    2123
    2224        public static void initBroker(Properties env) throws Exception {
     
    4749                        Properties environment = Environment.getEnvironment();
    4850
    49                         if (ResponseListener.isVoid()) {
    50                                 ResponseListener.init(environment);
     51                        if (!clientStarted) {
     52                                initClient(environment);
     53                                clientStarted = true;
    5154                        }
     55
    5256                        if (!Proxymq.containsProxy(reference)) {
    5357                                Proxymq proxy = new Proxymq(reference, contract, environment);
     
    7983        }
    8084
     85        private static void initClient(Properties environment) throws Exception {
     86                if (ResponseListener.isVoid()) {
     87                        ResponseListener.init(environment);
     88                }
     89                if (EventDispatcher.isVoid()) {
     90                        EventDispatcher.init(environment);
     91                }
     92        }
     93
    8194}
  • trunk/objectmq/src/omq/common/event/Event.java

    r14 r15  
    1818        private String topic;
    1919
    20         public Event(String corrId, String topic) {
     20        public Event(String corrId) {
    2121                this.corrId = corrId;
    22                 this.topic = topic;
    2322        }
    2423
  • trunk/objectmq/src/omq/common/event/EventDispatcher.java

    r14 r15  
    161161        }
    162162
     163        public static boolean isVoid() {
     164                return dispatcher == null;
     165        }
     166
    163167}
  • trunk/objectmq/src/omq/server/remote/request/RemoteObject.java

    r14 r15  
    44import java.lang.reflect.Method;
    55import java.util.ArrayList;
     6import java.util.Collection;
    67import java.util.HashMap;
    78import java.util.List;
     
    1213import omq.common.broker.Broker;
    1314import omq.common.event.Event;
     15import omq.common.event.EventListener;
    1416import omq.common.util.ParameterQueue;
    1517import omq.common.util.Serializer;
     
    202204        }
    203205
     206        @Override
     207        public void addListener(EventListener eventListener) throws Exception {
     208        }
     209
     210        @Override
     211        public void removeListener(EventListener eventListener) throws Exception {
     212        }
     213
     214        @Override
     215        public Collection<EventListener> getListeners() throws Exception {
     216                return null;
     217        }
     218
    204219}
  • trunk/objectmq/src/omq/ztest/calculator/Calculator.java

    r9 r15  
    1313        @AsyncMethod
    1414        public void mult(int x, int y);
     15       
     16        @AsyncMethod
     17        public void divideByZero();
    1518
    1619}
  • trunk/objectmq/src/omq/ztest/calculator/CalculatorImpl.java

    r9 r15  
    2828        }
    2929
     30        public void divideByZero() {
     31               
     32        }
     33
    3034}
  • trunk/objectmq/src/omq/ztest/calculator/CalculatorTest.java

    r11 r15  
    1212
    1313public class CalculatorTest {
    14         private static CalculatorImpl calc;
    1514        private static Calculator remoteCalc;
    16 
    17         @BeforeClass
    18         public static void startServer() throws Exception {
    19                 Properties env = new Properties();
    20                 env.setProperty(ParameterQueue.USER_NAME, "guest");
    21                 env.setProperty(ParameterQueue.USER_PASS, "guest");
    22 
    23                 // Get host info of rabbimq (where it is)
    24                 env.setProperty(ParameterQueue.SERVER_HOST, "10.30.239.228");
    25                 env.setProperty(ParameterQueue.SERVER_PORT, "5672");
    26                 env.setProperty(ParameterQueue.SERIALIZERNAME, "omq.common.util.Serializers.KryoImp");
    27                 env.setProperty(ParameterQueue.ENABLECOMPRESSION, "true");
    28 
    29                 // Get info about the queue & the exchange where the RemoteListener will
    30                 // listen to.
    31                 env.setProperty(ParameterQueue.RPC_EXCHANGE, "rpc_exchange");
    32                 env.setProperty(ParameterQueue.RPC_QUEUE, "rpc_queue");
    33                 env.setProperty(ParameterQueue.RPC_ROUTING_KEY, "rpc");
    34 
    35                 calc = new CalculatorImpl();
    36 
    37                 Broker.initBroker(env);
    38                 Broker.bind(Calculator.class.getSimpleName(), calc);
    39         }
     15        private static Calculator remoteCalc2;
    4016
    4117        @BeforeClass
     
    4622
    4723                // Set host info of rabbimq (where it is)
    48                 env.setProperty(ParameterQueue.SERVER_HOST, "10.30.239.228");
     24                env.setProperty(ParameterQueue.SERVER_HOST, "127.0.0.1");
    4925                env.setProperty(ParameterQueue.SERVER_PORT, "5672");
    5026                env.setProperty(ParameterQueue.SERIALIZERNAME, "omq.common.util.Serializers.KryoImp");
    51                 env.setProperty(ParameterQueue.ENABLECOMPRESSION, "true");
    52                
     27                env.setProperty(ParameterQueue.ENABLECOMPRESSION, "false");
     28
    5329                // Set info about where the message will be sent
    5430                env.setProperty(ParameterQueue.RPC_EXCHANGE, "rpc_exchange");
    55                 env.setProperty(ParameterQueue.RPC_ROUTING_KEY, "rpc");
    5631
    5732                // Set info about the queue & the exchange where the ResponseListener
     
    6136
    6237                Broker.initBroker(env);
    63                 remoteCalc = (Calculator) Broker.lookup(Calculator.class.getSimpleName(), Calculator.class);
     38                remoteCalc = (Calculator) Broker.lookup("calculator1", Calculator.class);
     39                remoteCalc2 = (Calculator) Broker.lookup("calculator2", Calculator.class);
    6440        }
    6541
     
    7652
    7753        @Test
     54        public void add2() throws Exception {
     55                int x = 10;
     56                int y = 20;
     57
     58                int sync = remoteCalc2.add(x, y);
     59                int sum = x + y;
     60
     61                assertEquals(sum, sync);
     62        }
     63
     64        @Test
    7865        public void mult() throws Exception {
    7966                int x = 5;
     
    8269                remoteCalc.mult(x, y);
    8370                Thread.sleep(200);
    84 
    85                 int mult = x * y;
    86 
    87                 assertEquals(mult, calc.getMult());
    8871        }
    8972}
Note: See TracChangeset for help on using the changeset viewer.