- Timestamp:
- 05/23/13 15:05:54 (11 years ago)
- Location:
- trunk/objectmq/src/omq
- Files:
-
- 1 added
- 10 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/objectmq/src/omq/client/proxy/Proxymq.java
r15 r18 20 20 import omq.common.message.Request; 21 21 import omq.common.message.Response; 22 import omq.common.util.Log; 22 23 import omq.common.util.ParameterQueue; 23 24 import omq.common.util.Serializer; … … 126 127 } 127 128 } 128 129 private void publish AsyncRequest(Request request) throws IOException, SerializerException{129 130 private void publishMessage(Request request, String replyQueueName) throws IOException, SerializerException{ 130 131 String corrId = request.getId(); 131 132 132 133 // Get the environment properties 133 134 String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE); 134 String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);135 135 String routingkey = this.uid; 136 136 … … 139 139 140 140 // Publish the message 141 channel.basicPublish(exchange, routingkey, props, Serializer.serialize(request)); 141 byte[] bytesRequest = Serializer.serialize(request); 142 channel.basicPublish(exchange, routingkey, props, bytesRequest); 143 Log.saveLog("Client-Serialize", bytesRequest); 144 } 145 146 private void publishAsyncRequest(Request request) throws IOException, SerializerException { 147 // Get the environment properties 148 String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE); 149 publishMessage(request, replyQueueName); 142 150 } 143 151 … … 149 157 150 158 // Get the environment properties 151 String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);152 159 String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE); 153 String routingkey = this.uid;154 155 // Add the correlation ID and create a replyTo property156 BasicProperties props = new BasicProperties.Builder().appId(uid).correlationId(corrId).replyTo(replyQueueName).build();157 160 158 161 // Publish the message … … 160 163 while (i < retries) { 161 164 try { 162 channel.basicPublish(exchange, routingkey, props, Serializer.serialize(request));165 publishMessage(request, replyQueueName); 163 166 return getResult(corrId, timeout, type); 164 167 } catch (TimeoutException te) { … … 208 211 } 209 212 resp = Serializer.deserializeResponse(results.get(corrId), type); 213 Log.saveLog("Client-Deserialize", results.get(corrId)); 210 214 // Remove and indicate the key exists (a hashmap can contain a null 211 215 // object, using this we'll know whether a response has been -
trunk/objectmq/src/omq/common/broker/Broker.java
r15 r18 7 7 import omq.client.proxy.Proxymq; 8 8 import omq.client.remote.response.ResponseListener; 9 import omq.common.event.Event; 9 10 import omq.common.event.EventDispatcher; 11 import omq.common.event.EventWrapper; 10 12 import omq.common.util.Environment; 11 13 import omq.common.util.OmqConnectionFactory; 14 import omq.common.util.Serializer; 12 15 import omq.exception.EnvironmentException; 13 16 import omq.exception.RemoteException; 17 import omq.exception.SerializerException; 14 18 import omq.server.remote.request.RemoteObject; 15 19 … … 91 95 } 92 96 } 97 98 99 public static void trigger(Event event) throws IOException, SerializerException{ 100 String UID = event.getTopic(); 101 EventWrapper wrapper = new EventWrapper(event); 102 channel.exchangeDeclare(UID, "fanout"); 103 channel.basicPublish(UID, "", null, Serializer.serialize(wrapper)); 104 } 93 105 94 106 } -
trunk/objectmq/src/omq/common/util/ParameterQueue.java
r11 r18 30 30 31 31 public static String ENABLE_SSL = "revo.enable_ssl"; 32 public static String DEBUGFILE = "revo.debug_file"; 33 32 34 33 35 /* -
trunk/objectmq/src/omq/common/util/Serializer.java
r14 r18 37 37 String className = env.getProperty(ParameterQueue.SERIALIZERNAME, "omq.common.util.Serializers.JavaImp"); 38 38 39 if (className == null || className.isEmpty()) { 40 throw new ClassNotFoundException("Class name is null or empty."); 41 } 42 39 43 serializer = (ISerializer) Class.forName(className).newInstance(); 40 44 } catch (Exception ex) { -
trunk/objectmq/src/omq/server/remote/request/InvocationThread.java
r10 r18 5 5 import omq.common.message.Request; 6 6 import omq.common.message.Response; 7 import omq.common.util.Log; 7 8 import omq.common.util.Serializer; 8 9 … … 35 36 // Deserialize the json 36 37 Request request = Serializer.deserializeRequest(delivery.getBody(), obj); 38 Log.saveLog("Server-Deserialize", delivery.getBody()); 37 39 38 40 // Invoke the method … … 46 48 BasicProperties props = delivery.getProperties(); 47 49 48 BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef()).correlationId(props.getCorrelationId()).build(); 50 BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef()) 51 .correlationId(props.getCorrelationId()).build(); 49 52 50 channel.basicPublish("", props.getReplyTo(), replyProps, Serializer.serialize(resp)); 53 byte[] bytesResponse = Serializer.serialize(resp); 54 channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse); 55 56 Log.saveLog("Server-Serialize", bytesResponse); 51 57 } 52 58 -
trunk/objectmq/src/omq/ztest/calculator/CalculatorImpl.java
r17 r18 4 4 5 5 import omq.client.annotation.AsyncMethod; 6 import omq.common.broker.Broker; 6 7 import omq.exception.SerializerException; 7 8 import omq.server.remote.request.RemoteObject; … … 33 34 34 35 public void divideByZero() throws IOException, SerializerException { 35 ZeroEvent ze = new ZeroEvent("my zero event"); 36 notifyEvent(ze); 36 ZeroEvent ze = new ZeroEvent("my zero event", "zero-event"); 37 Broker.trigger(ze); 38 //notifyEvent(ze); 37 39 } 38 40 -
trunk/objectmq/src/omq/ztest/calculator/CalculatorTest.java
r17 r18 22 22 23 23 // Set host info of rabbimq (where it is) 24 env.setProperty(ParameterQueue.SERVER_HOST, "1 27.0.0.1");24 env.setProperty(ParameterQueue.SERVER_HOST, "10.30.239.228"); 25 25 env.setProperty(ParameterQueue.SERVER_PORT, "5672"); 26 26 // env.setProperty(ParameterQueue.SERIALIZERNAME, … … 31 31 // Set info about where the message will be sent 32 32 env.setProperty(ParameterQueue.RPC_EXCHANGE, "rpc_exchange"); 33 env.setProperty(ParameterQueue.DEBUGFILE, "c:\\middlewareDebug"); 33 34 34 35 // Set info about the queue & the exchange where the ResponseListener … … 75 76 @Test 76 77 public void notifyEvent() throws Exception { 77 ZeroListener zL = new ZeroListener( );78 ZeroListener zL = new ZeroListener("zero-event"); 78 79 79 80 remoteCalc.addListener(zL); -
trunk/objectmq/src/omq/ztest/calculator/ServerTest.java
r17 r18 16 16 17 17 // Get host info of rabbimq (where it is) 18 env.setProperty(ParameterQueue.SERVER_HOST, "1 27.0.0.1");18 env.setProperty(ParameterQueue.SERVER_HOST, "10.30.239.228"); 19 19 env.setProperty(ParameterQueue.SERVER_PORT, "5672"); 20 20 // env.setProperty(ParameterQueue.SERIALIZERNAME, -
trunk/objectmq/src/omq/ztest/calculator/ZeroEvent.java
r16 r18 12 12 } 13 13 14 public ZeroEvent(String corrId ) {15 super(corrId );14 public ZeroEvent(String corrId, String topic) { 15 super(corrId, topic); 16 16 } 17 17 -
trunk/objectmq/src/omq/ztest/calculator/ZeroListener.java
r16 r18 5 5 public class ZeroListener extends EventListener<ZeroEvent> { 6 6 7 public ZeroListener(String topic){ 8 super(topic); 9 } 10 7 11 @Override 8 12 public void notifyEvent(ZeroEvent event) {
Note: See TracChangeset
for help on using the changeset viewer.