Changeset 36
- Timestamp:
- 06/11/13 18:25:36 (11 years ago)
- Location:
- trunk/objectmq
- Files:
-
- 3 added
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/objectmq/src/omq/client/listener/ResponseListener.java
r35 r36 8 8 9 9 import omq.client.proxy.Proxymq; 10 import omq.common. util.OmqConnectionFactory;10 import omq.common.broker.Broker; 11 11 import omq.common.util.ParameterQueue; 12 12 13 13 import com.rabbitmq.client.AMQP.BasicProperties; 14 14 import com.rabbitmq.client.Channel; 15 import com.rabbitmq.client.Connection;16 15 import com.rabbitmq.client.ConsumerCancelledException; 17 16 import com.rabbitmq.client.QueueingConsumer; … … 29 28 private static ResponseListener rListener; 30 29 31 private Connection connection;32 30 private Channel channel; 33 31 private QueueingConsumer consumer; 34 32 private boolean killed = false; 35 33 private Map<String, Map<String, byte[]>> results; 34 private Properties env; 36 35 37 36 /** … … 42 41 */ 43 42 protected ResponseListener(Properties env) throws Exception { 44 connection = OmqConnectionFactory.getNewConnection(env); 45 channel = connection.createChannel(); 43 this.env = env; 46 44 47 45 // Init the hashtable (it's concurrent) 48 46 this.results = new Hashtable<String, Map<String, byte[]>>(); 49 47 50 Map<String, Object> args = null; 51 52 String reply_queue = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE); 53 boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false")); 54 55 int ttl = Integer.parseInt(env.getProperty(ParameterQueue.MESSAGE_TTL_IN_QUEUES, "-1")); 56 if (ttl > 0) { 57 args = new HashMap<String, Object>(); 58 args.put("x-message-ttl", ttl); 59 } 60 61 channel.queueDeclare(reply_queue, durable, false, false, args); 62 63 // Declare a new consumer 64 consumer = new QueueingConsumer(channel); 65 channel.basicConsume(reply_queue, true, consumer); 48 startRPCQueue(); 66 49 } 67 50 … … 98 81 } catch (ShutdownSignalException e) { 99 82 e.printStackTrace(); 83 try { 84 if (channel.isOpen()) { 85 channel.close(); 86 } 87 startRPCQueue(); 88 } catch (Exception e1) { 89 e1.printStackTrace(); 90 try { 91 long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000")); 92 Thread.sleep(milis); 93 } catch (InterruptedException e2) { 94 e2.printStackTrace(); 95 } 96 } 100 97 } catch (ConsumerCancelledException e) { 101 98 e.printStackTrace(); … … 104 101 } 105 102 } 103 } 104 105 private void startRPCQueue() throws Exception { 106 channel = Broker.getNewChannel(); 107 108 Map<String, Object> args = null; 109 110 String reply_queue = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE); 111 boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false")); 112 113 int ttl = Integer.parseInt(env.getProperty(ParameterQueue.MESSAGE_TTL_IN_QUEUES, "-1")); 114 if (ttl > 0) { 115 args = new HashMap<String, Object>(); 116 args.put("x-message-ttl", ttl); 117 } 118 119 channel.queueDeclare(reply_queue, durable, false, false, args); 120 121 // Declare a new consumer 122 consumer = new QueueingConsumer(channel); 123 channel.basicConsume(reply_queue, true, consumer); 106 124 } 107 125 … … 158 176 } 159 177 160 public synchronized Channel getChannel() throws Exception {161 return connection.createChannel();162 }163 164 178 /** 165 179 * … … 190 204 killed = true; 191 205 channel.close(); 192 connection.close();193 206 } 194 207 -
trunk/objectmq/src/omq/client/proxy/Proxymq.java
r35 r36 15 15 import omq.client.annotation.SyncMethod; 16 16 import omq.client.listener.ResponseListener; 17 import omq.common.broker.Broker; 17 18 import omq.common.event.Event; 18 19 import omq.common.event.EventDispatcher; … … 28 29 29 30 import com.rabbitmq.client.AMQP.BasicProperties; 30 import com.rabbitmq.client.Channel;31 31 32 32 /** … … 48 48 private transient ResponseListener rListener; 49 49 private transient EventDispatcher dispatcher; 50 private transient Channel channel;50 // private transient Channel channel; 51 51 private transient Properties env; 52 52 private transient Map<String, byte[]> results; … … 85 85 this.dispatcher = EventDispatcher.getDispatcher(); 86 86 87 this.channel = rListener.getChannel(); 87 // TODO what is better to use a new channel or to use the same? 88 // this.channel = Broker.getChannel(); 88 89 this.env = env; 89 90 … … 97 98 @Override 98 99 public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable { 99 // long timeStart = (new Date()).getTime();100 100 // long timeStart = (new Date()).getTime(); 101 101 102 // Local methods only 102 103 String methodName = method.getName(); … … 120 121 Request request = createRequest(method, arguments); 121 122 122 //Log.saveTimeSendRequestLog("Client-time-request", request.getId(), method.getName(), timeStart); 123 123 // Log.saveTimeSendRequestLog("Client-time-request", request.getId(), 124 // method.getName(), timeStart); 125 124 126 Object response = null; 125 127 // Publish the request … … 128 130 publishAsyncRequest(request); 129 131 } else { 130 System.out.println("Publish sync request -> " + request.getId()); 132 System.out.println("Publish sync request -> " + request.getId()); 131 133 response = publishSyncRequest(request, method.getReturnType()); 132 133 //long timeEnd = (new Date()).getTime(); 134 //Log.saveTimeSendRequestLog("Client-time-response", request.getId(), method.getName(), timeEnd); 135 } 136 134 135 // long timeEnd = (new Date()).getTime(); 136 // Log.saveTimeSendRequestLog("Client-time-response", 137 // request.getId(), method.getName(), timeEnd); 138 } 139 137 140 return response; 138 141 } 139 142 140 private void publishMessage(Request request, String replyQueueName) throws IOException, SerializerException {143 private void publishMessage(Request request, String replyQueueName) throws Exception { 141 144 String corrId = request.getId(); 142 145 … … 150 153 // Publish the message 151 154 byte[] bytesRequest = Serializer.serialize(request); 152 channel.basicPublish(exchange, routingkey, props, bytesRequest); 153 //Log.saveLog("Client-Serialize", bytesRequest); 154 } 155 156 private void publishAsyncRequest(Request request) throws IOException, SerializerException { 155 // TODO See this 156 // channel.basicPublish(exchange, routingkey, props, bytesRequest); 157 Broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest); 158 // Log.saveLog("Client-Serialize", bytesRequest); 159 } 160 161 private void publishAsyncRequest(Request request) throws Exception { 157 162 // Get the environment properties 158 163 String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE); … … 221 226 } 222 227 resp = Serializer.deserializeResponse(results.get(corrId), type); 223 // Log.saveLog("Client-Deserialize", results.get(corrId));224 228 // Log.saveLog("Client-Deserialize", results.get(corrId)); 229 225 230 // Remove and indicate the key exists (a hashmap can contain a null 226 231 // object, using this we'll know whether a response has been -
trunk/objectmq/src/omq/common/broker/Broker.java
r34 r36 14 14 import omq.common.util.ParameterQueue; 15 15 import omq.common.util.Serializer; 16 import omq.exception.EnvironmentException;17 16 import omq.exception.InitBrokerException; 18 17 import omq.exception.RemoteException; … … 33 32 34 33 public static void initBroker(Properties env) throws Exception { 35 try { 36 Environment.getEnvironment(); 37 } catch (EnvironmentException ex) { // environment not set. 34 if (Environment.isVoid()) { 38 35 Environment.setEnvironment(env); 39 36 connection = OmqConnectionFactory.getNewConnection(env); … … 115 112 System.out.println("EventTrigger fanout exchange: " + UID + " Event topic: " + UID + " Event corrID: " + event.getCorrId()); 116 113 channel.exchangeDeclare(UID, "fanout"); 117 114 118 115 byte[] bytesResponse = Serializer.serialize(wrapper); 119 116 channel.basicPublish(UID, "", null, bytesResponse); 120 117 121 // Log.saveLog("Server-Serialize", bytesResponse);118 // Log.saveLog("Server-Serialize", bytesResponse); 122 119 } 123 120 -
trunk/objectmq/src/omq/common/event/EventDispatcher.java
r35 r36 40 40 listeners = new HashMap<String, Vector<EventListener>>(); 41 41 42 startEventQueue(); 43 44 } 45 46 private void startEventQueue() throws Exception { 42 47 // Get a new connection and a new channel 43 48 channel = Broker.getNewChannel(); … … 112 117 System.out.println("ShutdownSignalException e: " + e); 113 118 e.printStackTrace(); 119 try { 120 if (channel.isOpen()) { 121 channel.close(); 122 } 123 startEventQueue(); 124 } catch (Exception e1) { 125 try { 126 long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000")); 127 Thread.sleep(milis); 128 } catch (InterruptedException e2) { 129 e2.printStackTrace(); 130 } 131 e1.printStackTrace(); 132 } 114 133 } catch (ConsumerCancelledException e) { 115 134 System.out.println("ConsumerCancelledException e: " + e); -
trunk/objectmq/src/omq/common/util/Environment.java
r11 r36 4 4 5 5 import omq.exception.EnvironmentException; 6 7 6 8 7 /** … … 12 11 */ 13 12 public class Environment { 14 private static Properties env ;13 private static Properties env = null; 15 14 16 15 /** … … 36 35 env = environment; 37 36 } 37 38 public static boolean isVoid() { 39 return env == null; 40 } 38 41 }
Note: See TracChangeset
for help on using the changeset viewer.