- Timestamp:
- 05/20/13 19:45:19 (11 years ago)
- Location:
- trunk/objectmq/src/omq
- Files:
-
- 2 added
- 9 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/objectmq/src/omq/Remote.java
r14 r15 3 3 import java.io.IOException; 4 4 import java.io.Serializable; 5 import java.util.Collection; 5 6 6 7 import omq.common.event.Event; 8 import omq.common.event.EventListener; 7 9 import omq.exception.SerializerException; 8 10 … … 22 24 23 25 public void notifyEvent(Event event) throws IOException, SerializerException; 26 27 public void addListener(EventListener eventListener) throws Exception; 28 29 public void removeListener(EventListener eventListener) throws Exception; 30 31 public Collection<EventListener> getListeners() throws Exception; 24 32 } -
trunk/objectmq/src/omq/client/proxy/Proxymq.java
r14 r15 5 5 import java.lang.reflect.Method; 6 6 import java.lang.reflect.Proxy; 7 import java.util.Collection; 7 8 import java.util.HashMap; 8 9 import java.util.Hashtable; … … 15 16 import omq.client.remote.response.ResponseListener; 16 17 import omq.common.event.Event; 18 import omq.common.event.EventDispatcher; 19 import omq.common.event.EventListener; 17 20 import omq.common.message.Request; 18 21 import omq.common.message.Response; … … 44 47 private String uid; 45 48 private transient ResponseListener rListener; 49 private transient EventDispatcher dispatcher; 46 50 private transient Channel channel; 47 51 private transient Properties env; 48 52 private transient Map<String, byte[]> results; 49 // private transient Map<Method, CallType> methodTypes;53 private transient Map<String, EventListener> listeners; 50 54 51 55 private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>(); … … 79 83 this.uid = uid; 80 84 this.rListener = ResponseListener.getRequestListener(); 85 this.dispatcher = EventDispatcher.getDispatcher(); 81 86 82 87 this.channel = rListener.getChannel(); 83 88 this.env = env; 89 90 listeners = new HashMap<String, EventListener>(); 84 91 85 92 // Create a new hashmap and registry it in rListener … … 97 104 if (methodName.equals("getRef")) { 98 105 return getRef(); 106 } else if (methodName.equals("addListener")) { 107 addListener((EventListener) arguments[0]); 108 return null; 109 } else if (methodName.equals("removeListener")) { 110 removeListener((EventListener) arguments[0]); 111 return null; 112 } else if (methodName.equals("getListeners")) { 113 return getListeners(); 99 114 } 100 115 } … … 270 285 @Override 271 286 public void notifyEvent(Event event) throws IOException, SerializerException { 272 287 } 288 289 @Override 290 public void addListener(EventListener eventListener) throws Exception { 291 if (eventListener.getTopic() == null) { 292 eventListener.setTopic(uid); 293 } 294 listeners.put(eventListener.getTopic(), eventListener); 295 dispatcher.addListener(eventListener); 296 } 297 298 @Override 299 public void removeListener(EventListener eventListener) throws Exception { 300 listeners.remove(eventListener.getTopic()); 301 dispatcher.removeListener(eventListener); 302 } 303 304 @Override 305 public Collection<EventListener> getListeners() throws Exception { 306 return listeners.values(); 273 307 } 274 308 -
trunk/objectmq/src/omq/common/broker/Broker.java
r14 r15 7 7 import omq.client.proxy.Proxymq; 8 8 import omq.client.remote.response.ResponseListener; 9 import omq.common.event.EventDispatcher; 9 10 import omq.common.util.Environment; 10 11 import omq.common.util.OmqConnectionFactory; … … 19 20 private static Connection connection; 20 21 private static Channel channel; 22 private static boolean clientStarted = false; 21 23 22 24 public static void initBroker(Properties env) throws Exception { … … 47 49 Properties environment = Environment.getEnvironment(); 48 50 49 if (ResponseListener.isVoid()) { 50 ResponseListener.init(environment); 51 if (!clientStarted) { 52 initClient(environment); 53 clientStarted = true; 51 54 } 55 52 56 if (!Proxymq.containsProxy(reference)) { 53 57 Proxymq proxy = new Proxymq(reference, contract, environment); … … 79 83 } 80 84 85 private static void initClient(Properties environment) throws Exception { 86 if (ResponseListener.isVoid()) { 87 ResponseListener.init(environment); 88 } 89 if (EventDispatcher.isVoid()) { 90 EventDispatcher.init(environment); 91 } 92 } 93 81 94 } -
trunk/objectmq/src/omq/common/event/Event.java
r14 r15 18 18 private String topic; 19 19 20 public Event(String corrId , String topic) {20 public Event(String corrId) { 21 21 this.corrId = corrId; 22 this.topic = topic;23 22 } 24 23 -
trunk/objectmq/src/omq/common/event/EventDispatcher.java
r14 r15 161 161 } 162 162 163 public static boolean isVoid() { 164 return dispatcher == null; 165 } 166 163 167 } -
trunk/objectmq/src/omq/server/remote/request/RemoteObject.java
r14 r15 4 4 import java.lang.reflect.Method; 5 5 import java.util.ArrayList; 6 import java.util.Collection; 6 7 import java.util.HashMap; 7 8 import java.util.List; … … 12 13 import omq.common.broker.Broker; 13 14 import omq.common.event.Event; 15 import omq.common.event.EventListener; 14 16 import omq.common.util.ParameterQueue; 15 17 import omq.common.util.Serializer; … … 202 204 } 203 205 206 @Override 207 public void addListener(EventListener eventListener) throws Exception { 208 } 209 210 @Override 211 public void removeListener(EventListener eventListener) throws Exception { 212 } 213 214 @Override 215 public Collection<EventListener> getListeners() throws Exception { 216 return null; 217 } 218 204 219 } -
trunk/objectmq/src/omq/ztest/calculator/Calculator.java
r9 r15 13 13 @AsyncMethod 14 14 public void mult(int x, int y); 15 16 @AsyncMethod 17 public void divideByZero(); 15 18 16 19 } -
trunk/objectmq/src/omq/ztest/calculator/CalculatorImpl.java
r9 r15 28 28 } 29 29 30 public void divideByZero() { 31 32 } 33 30 34 } -
trunk/objectmq/src/omq/ztest/calculator/CalculatorTest.java
r11 r15 12 12 13 13 public class CalculatorTest { 14 private static CalculatorImpl calc;15 14 private static Calculator remoteCalc; 16 17 @BeforeClass 18 public static void startServer() throws Exception { 19 Properties env = new Properties(); 20 env.setProperty(ParameterQueue.USER_NAME, "guest"); 21 env.setProperty(ParameterQueue.USER_PASS, "guest"); 22 23 // Get host info of rabbimq (where it is) 24 env.setProperty(ParameterQueue.SERVER_HOST, "10.30.239.228"); 25 env.setProperty(ParameterQueue.SERVER_PORT, "5672"); 26 env.setProperty(ParameterQueue.SERIALIZERNAME, "omq.common.util.Serializers.KryoImp"); 27 env.setProperty(ParameterQueue.ENABLECOMPRESSION, "true"); 28 29 // Get info about the queue & the exchange where the RemoteListener will 30 // listen to. 31 env.setProperty(ParameterQueue.RPC_EXCHANGE, "rpc_exchange"); 32 env.setProperty(ParameterQueue.RPC_QUEUE, "rpc_queue"); 33 env.setProperty(ParameterQueue.RPC_ROUTING_KEY, "rpc"); 34 35 calc = new CalculatorImpl(); 36 37 Broker.initBroker(env); 38 Broker.bind(Calculator.class.getSimpleName(), calc); 39 } 15 private static Calculator remoteCalc2; 40 16 41 17 @BeforeClass … … 46 22 47 23 // Set host info of rabbimq (where it is) 48 env.setProperty(ParameterQueue.SERVER_HOST, "1 0.30.239.228");24 env.setProperty(ParameterQueue.SERVER_HOST, "127.0.0.1"); 49 25 env.setProperty(ParameterQueue.SERVER_PORT, "5672"); 50 26 env.setProperty(ParameterQueue.SERIALIZERNAME, "omq.common.util.Serializers.KryoImp"); 51 env.setProperty(ParameterQueue.ENABLECOMPRESSION, " true");52 27 env.setProperty(ParameterQueue.ENABLECOMPRESSION, "false"); 28 53 29 // Set info about where the message will be sent 54 30 env.setProperty(ParameterQueue.RPC_EXCHANGE, "rpc_exchange"); 55 env.setProperty(ParameterQueue.RPC_ROUTING_KEY, "rpc");56 31 57 32 // Set info about the queue & the exchange where the ResponseListener … … 61 36 62 37 Broker.initBroker(env); 63 remoteCalc = (Calculator) Broker.lookup(Calculator.class.getSimpleName(), Calculator.class); 38 remoteCalc = (Calculator) Broker.lookup("calculator1", Calculator.class); 39 remoteCalc2 = (Calculator) Broker.lookup("calculator2", Calculator.class); 64 40 } 65 41 … … 76 52 77 53 @Test 54 public void add2() throws Exception { 55 int x = 10; 56 int y = 20; 57 58 int sync = remoteCalc2.add(x, y); 59 int sum = x + y; 60 61 assertEquals(sum, sync); 62 } 63 64 @Test 78 65 public void mult() throws Exception { 79 66 int x = 5; … … 82 69 remoteCalc.mult(x, y); 83 70 Thread.sleep(200); 84 85 int mult = x * y;86 87 assertEquals(mult, calc.getMult());88 71 } 89 72 }
Note: See TracChangeset
for help on using the changeset viewer.