Changeset 35
- Timestamp:
- 06/11/13 12:42:38 (11 years ago)
- Location:
- trunk/objectmq
- Files:
-
- 10 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/objectmq/src/omq/Remote.java
r15 r35 25 25 public void notifyEvent(Event event) throws IOException, SerializerException; 26 26 27 public void addListener(EventListener eventListener) throws Exception;27 public void addListener(EventListener<?> eventListener) throws Exception; 28 28 29 public void removeListener(EventListener eventListener) throws Exception;29 public void removeListener(EventListener<?> eventListener) throws Exception; 30 30 31 public Collection<EventListener > getListeners() throws Exception;31 public Collection<EventListener<?>> getListeners() throws Exception; 32 32 } -
trunk/objectmq/src/omq/client/listener/ResponseListener.java
r34 r35 2 2 3 3 import java.io.IOException; 4 import java.util.HashMap; 4 5 import java.util.Hashtable; 5 6 import java.util.Map; … … 47 48 this.results = new Hashtable<String, Map<String, byte[]>>(); 48 49 50 Map<String, Object> args = null; 51 49 52 String reply_queue = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE); 50 channel.queueDeclare(reply_queue, false, false, false, null); 53 boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false")); 54 55 int ttl = Integer.parseInt(env.getProperty(ParameterQueue.MESSAGE_TTL_IN_QUEUES, "-1")); 56 if (ttl > 0) { 57 args = new HashMap<String, Object>(); 58 args.put("x-message-ttl", ttl); 59 } 60 61 channel.queueDeclare(reply_queue, durable, false, false, args); 51 62 52 63 // Declare a new consumer … … 73 84 74 85 // Stores the new response 75 Map<String, byte[]> proxyResults = results 76 .get(props.getAppId()); 86 Map<String, byte[]> proxyResults = results.get(props.getAppId()); 77 87 78 88 // Put the result into the proxy results and notify him … … 119 129 * @throws Exception 120 130 */ 121 public static ResponseListener getRequestListener(Properties env) 122 throws Exception { 131 public static ResponseListener getRequestListener(Properties env) throws Exception { 123 132 if (rListener == null) { 124 133 rListener = new ResponseListener(env); … … 148 157 return rListener; 149 158 } 150 159 151 160 public synchronized Channel getChannel() throws Exception { 152 161 return connection.createChannel(); -
trunk/objectmq/src/omq/client/proxy/Proxymq.java
r34 r35 51 51 private transient Properties env; 52 52 private transient Map<String, byte[]> results; 53 private transient Map<String, EventListener > listeners;53 private transient Map<String, EventListener<?>> listeners; 54 54 55 55 private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>(); … … 88 88 this.env = env; 89 89 90 listeners = new HashMap<String, EventListener >();90 listeners = new HashMap<String, EventListener<?>>(); 91 91 92 92 // Create a new hashmap and registry it in rListener … … 107 107 return getRef(); 108 108 } else if (methodName.equals("addListener")) { 109 addListener((EventListener ) arguments[0]);109 addListener((EventListener<?>) arguments[0]); 110 110 return null; 111 111 } else if (methodName.equals("removeListener")) { 112 removeListener((EventListener ) arguments[0]);112 removeListener((EventListener<?>) arguments[0]); 113 113 return null; 114 114 } else if (methodName.equals("getListeners")) { … … 303 303 304 304 @Override 305 public void addListener(EventListener eventListener) throws Exception {305 public void addListener(EventListener<?> eventListener) throws Exception { 306 306 if (eventListener.getTopic() == null) { 307 307 eventListener.setTopic(uid); … … 312 312 313 313 @Override 314 public void removeListener(EventListener eventListener) throws Exception {314 public void removeListener(EventListener<?> eventListener) throws Exception { 315 315 listeners.remove(eventListener.getTopic()); 316 316 dispatcher.removeListener(eventListener); … … 318 318 319 319 @Override 320 public Collection<EventListener > getListeners() throws Exception {320 public Collection<EventListener<?>> getListeners() throws Exception { 321 321 return listeners.values(); 322 322 } -
trunk/objectmq/src/omq/common/event/EventDispatcher.java
r30 r35 24 24 * 25 25 */ 26 @SuppressWarnings("rawtypes") 26 27 public class EventDispatcher extends Thread { 27 28 private static EventDispatcher dispatcher; … … 43 44 44 45 String event_queue = env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE); 45 channel.queueDeclare(event_queue, false, false, false, null); 46 boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false")); 47 channel.queueDeclare(event_queue, durable, false, false, null); 46 48 47 49 // Declare a new consumer … … 96 98 97 99 System.out.println("Event received -> Topic: " + event.getTopic() + "CorrId: " + event.getCorrId()); 98 //Log.saveLog("Client-Deserialize", delivery.getBody()); 99 100 //long timeEnd = (new Date()).getTime(); 101 //Log.saveTimeSendRequestLog("Client-time-response", event.getCorrId(), "Event!", timeEnd); 102 100 // Log.saveLog("Client-Deserialize", delivery.getBody()); 101 102 // long timeEnd = (new Date()).getTime(); 103 // Log.saveTimeSendRequestLog("Client-time-response", 104 // event.getCorrId(), "Event!", timeEnd); 105 103 106 // Dispatch it 104 107 dispatch(event.getTopic(), event); … … 171 174 for (final EventListener listener : listeners.get(topic)) { 172 175 new Thread() { 176 @SuppressWarnings("unchecked") 173 177 public void run() { 174 178 listener.notifyEvent(event); -
trunk/objectmq/src/omq/common/util/ParameterQueue.java
r34 r35 12 12 */ 13 13 14 public static String SERIALIZERNAME = "omq.serializer"; 14 public static String SERIALIZER_NAME = "omq.serializer"; 15 16 /** 17 * Set whether the messages must be compressed or not 18 */ 15 19 public static String ENABLECOMPRESSION = "omq.compression"; 16 20 21 /** 22 * Set the ip where the rabbitmq server is. 23 */ 17 24 public static String SERVER_HOST = "omq.host"; 25 26 /** 27 * Set the port that rabbitmq uses. 28 */ 18 29 public static String SERVER_PORT = "omq.port"; 19 public static String SERVER_REGISTRY = "omq.registry";20 30 31 /** 32 * Set the clients username 33 */ 21 34 public static String USER_NAME = "omq.username"; 35 36 /** 37 * Set the clients password 38 */ 22 39 public static String USER_PASS = "omq.pass"; 23 40 41 /** 42 * Set the exchange where the objectmq are listening 43 */ 24 44 public static String RPC_EXCHANGE = "omq.rpc_exchange"; 25 45 46 /** 47 * Set the clients reply queue. Every client must have a different queue 48 * name. 49 */ 26 50 public static String RPC_REPLY_QUEUE = "omq.reply_queue_rpc"; 51 52 /** 53 * Set the clients event queue. Every client must have a different queue 54 * name. 55 */ 27 56 public static String EVENT_REPLY_QUEUE = "omq.reply_queue_event"; 57 58 /** 59 * Set if the queues must be durable. The queues won't be lost when rabbitmq 60 * crashes if DURABLE_QUEUES is set trues. 61 */ 62 public static String DURABLE_QUEUES = "omq.durable_queue"; 63 64 /** 65 * The MESSAGE_TTL_IN_QUEUES controls for how long a message published to 66 * the queues can live before it is discarded. A message that has been in 67 * the queue for longer than the configured TTL is said to be dead. 68 * 69 * This property must be a non-negative 32 bit integer (0 <= n <= 2^32-1), 70 * describing the TTL period in milliseconds. 71 */ 72 public static String MESSAGE_TTL_IN_QUEUES = "omq.message_ttl_queue"; 73 74 // TODO persisten messages? the messages will be saved in the disk if this 75 // flag is set true 28 76 29 77 public static String ENABLE_SSL = "omq.enable_ssl"; -
trunk/objectmq/src/omq/common/util/Serializer.java
r34 r35 7 7 import omq.common.message.Request; 8 8 import omq.common.message.Response; 9 import omq.common.util.Serializers.GsonImp; 9 10 import omq.common.util.Serializers.ISerializer; 11 import omq.common.util.Serializers.JavaImp; 12 import omq.common.util.Serializers.KryoImp; 10 13 import omq.exception.EnvironmentException; 11 14 import omq.exception.SerializerException; … … 18 21 */ 19 22 public class Serializer { 23 public static String kryo = KryoImp.class.getCanonicalName(); 24 public static String java = JavaImp.class.getCanonicalName(); 25 public static String gson = GsonImp.class.getCanonicalName(); 26 20 27 public static ISerializer serializer; 21 28 … … 36 43 try { 37 44 Properties env = Environment.getEnvironment(); 38 String className = env.getProperty(ParameterQueue.SERIALIZER NAME, "omq.common.util.Serializers.JavaImp");45 String className = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.java); 39 46 40 47 if (className == null || className.isEmpty()) { -
trunk/objectmq/src/omq/server/InvocationThread.java
r34 r35 42 42 System.out.println("Invoke method: " + methodName + " CorrID: " + requestID); 43 43 44 // Changed --------------------------------------- 45 Object result = null; 46 if ("commit".equalsIgnoreCase(methodName)) { 47 Object[] arguments = request.getArguments(); 48 arguments[1] = ((String) arguments[1]) + "@@" + requestID; 49 result = obj.invokeMethod(methodName, arguments); 50 } else { 51 result = obj.invokeMethod(request.getMethod(), request.getArguments()); 52 } 53 // ----------------------------------------------- 54 55 // // Invoke the method 56 // Object result = obj.invokeMethod(request.getMethod(), 57 // request.getArguments()); 44 // Invoke the method 45 Object result = obj.invokeMethod(request.getMethod(), 46 request.getArguments()); 58 47 59 48 if (!request.isAsync()) { -
trunk/objectmq/src/omq/server/RemoteObject.java
r34 r35 101 101 Thread.sleep(milis); 102 102 } catch (InterruptedException e2) { 103 // TODO Auto-generated catch block104 103 e2.printStackTrace(); 105 104 } … … 211 210 String queue = UID; 212 211 String routingKey = UID; 212 boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false")); 213 213 214 214 // Start channel … … 218 218 System.out.println("RemoteObject: " + UID + " declaring direct exchange: " + exchange + ", Queue: " + queue); 219 219 channel.exchangeDeclare(exchange, "direct"); 220 channel.queueDeclare(queue, false, false, false, null);220 channel.queueDeclare(queue, durable, false, false, null); 221 221 channel.queueBind(queue, exchange, routingKey); 222 222 … … 231 231 232 232 @Override 233 public void addListener(EventListener eventListener) throws Exception {234 } 235 236 @Override 237 public void removeListener(EventListener eventListener) throws Exception {238 } 239 240 @Override 241 public Collection<EventListener > getListeners() throws Exception {233 public void addListener(EventListener<?> eventListener) throws Exception { 234 } 235 236 @Override 237 public void removeListener(EventListener<?> eventListener) throws Exception { 238 } 239 240 @Override 241 public Collection<EventListener<?>> getListeners() throws Exception { 242 242 return null; 243 243 } -
trunk/objectmq/test/calculatorTest/ClientTest.java
r34 r35 7 7 import omq.common.broker.Broker; 8 8 import omq.common.util.ParameterQueue; 9 import omq.common.util.Serializer; 9 10 10 11 import org.junit.BeforeClass; … … 24 25 env.setProperty(ParameterQueue.SERVER_HOST, "127.0.0.1"); 25 26 env.setProperty(ParameterQueue.SERVER_PORT, "5672"); 26 // env.setProperty(ParameterQueue.SERIALIZERNAME, 27 // "omq.common.util.Serializers.KryoImp"); 28 env.setProperty(ParameterQueue.SERIALIZERNAME, "omq.common.util.Serializers.GsonImp"); 27 env.setProperty(ParameterQueue.DURABLE_QUEUES, "false"); 28 env.setProperty(ParameterQueue.SERIALIZER_NAME, Serializer.kryo); 29 29 env.setProperty(ParameterQueue.ENABLECOMPRESSION, "false"); 30 30 -
trunk/objectmq/test/calculatorTest/ServerTest.java
r34 r35 5 5 import omq.common.broker.Broker; 6 6 import omq.common.util.ParameterQueue; 7 import omq.common.util.Serializer; 7 8 8 9 public class ServerTest { … … 18 19 env.setProperty(ParameterQueue.SERVER_HOST, "127.0.0.1"); 19 20 env.setProperty(ParameterQueue.SERVER_PORT, "5672"); 20 // env.setProperty(ParameterQueue.SERIALIZERNAME, 21 // "omq.common.util.Serializers.KryoImp"); 22 env.setProperty(ParameterQueue.SERIALIZERNAME, "omq.common.util.Serializers.GsonImp"); 21 env.setProperty(ParameterQueue.DURABLE_QUEUES, "true"); 22 env.setProperty(ParameterQueue.SERIALIZER_NAME, Serializer.kryo); 23 23 env.setProperty(ParameterQueue.ENABLECOMPRESSION, "false"); 24 24
Note: See TracChangeset
for help on using the changeset viewer.