Changeset 35


Ignore:
Timestamp:
06/11/13 12:42:38 (11 years ago)
Author:
stoda
Message:
 
Location:
trunk/objectmq
Files:
10 edited

Legend:

Unmodified
Added
Removed
  • trunk/objectmq/src/omq/Remote.java

    r15 r35  
    2525        public void notifyEvent(Event event) throws IOException, SerializerException;
    2626
    27         public void addListener(EventListener eventListener) throws Exception;
     27        public void addListener(EventListener<?> eventListener) throws Exception;
    2828
    29         public void removeListener(EventListener eventListener) throws Exception;
     29        public void removeListener(EventListener<?> eventListener) throws Exception;
    3030
    31         public Collection<EventListener> getListeners() throws Exception;
     31        public Collection<EventListener<?>> getListeners() throws Exception;
    3232}
  • trunk/objectmq/src/omq/client/listener/ResponseListener.java

    r34 r35  
    22
    33import java.io.IOException;
     4import java.util.HashMap;
    45import java.util.Hashtable;
    56import java.util.Map;
     
    4748                this.results = new Hashtable<String, Map<String, byte[]>>();
    4849
     50                Map<String, Object> args = null;
     51
    4952                String reply_queue = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
    50                 channel.queueDeclare(reply_queue, false, false, false, null);
     53                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false"));
     54
     55                int ttl = Integer.parseInt(env.getProperty(ParameterQueue.MESSAGE_TTL_IN_QUEUES, "-1"));
     56                if (ttl > 0) {
     57                        args = new HashMap<String, Object>();
     58                        args.put("x-message-ttl", ttl);
     59                }
     60
     61                channel.queueDeclare(reply_queue, durable, false, false, args);
    5162
    5263                // Declare a new consumer
     
    7384
    7485                                // Stores the new response
    75                                 Map<String, byte[]> proxyResults = results
    76                                                 .get(props.getAppId());
     86                                Map<String, byte[]> proxyResults = results.get(props.getAppId());
    7787
    7888                                // Put the result into the proxy results and notify him
     
    119129         * @throws Exception
    120130         */
    121         public static ResponseListener getRequestListener(Properties env)
    122                         throws Exception {
     131        public static ResponseListener getRequestListener(Properties env) throws Exception {
    123132                if (rListener == null) {
    124133                        rListener = new ResponseListener(env);
     
    148157                return rListener;
    149158        }
    150        
     159
    151160        public synchronized Channel getChannel() throws Exception {
    152161                return connection.createChannel();
  • trunk/objectmq/src/omq/client/proxy/Proxymq.java

    r34 r35  
    5151        private transient Properties env;
    5252        private transient Map<String, byte[]> results;
    53         private transient Map<String, EventListener> listeners;
     53        private transient Map<String, EventListener<?>> listeners;
    5454
    5555        private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>();
     
    8888                this.env = env;
    8989
    90                 listeners = new HashMap<String, EventListener>();
     90                listeners = new HashMap<String, EventListener<?>>();
    9191
    9292                // Create a new hashmap and registry it in rListener
     
    107107                                return getRef();
    108108                        } else if (methodName.equals("addListener")) {
    109                                 addListener((EventListener) arguments[0]);
     109                                addListener((EventListener<?>) arguments[0]);
    110110                                return null;
    111111                        } else if (methodName.equals("removeListener")) {
    112                                 removeListener((EventListener) arguments[0]);
     112                                removeListener((EventListener<?>) arguments[0]);
    113113                                return null;
    114114                        } else if (methodName.equals("getListeners")) {
     
    303303
    304304        @Override
    305         public void addListener(EventListener eventListener) throws Exception {
     305        public void addListener(EventListener<?> eventListener) throws Exception {
    306306                if (eventListener.getTopic() == null) {
    307307                        eventListener.setTopic(uid);
     
    312312
    313313        @Override
    314         public void removeListener(EventListener eventListener) throws Exception {
     314        public void removeListener(EventListener<?> eventListener) throws Exception {
    315315                listeners.remove(eventListener.getTopic());
    316316                dispatcher.removeListener(eventListener);
     
    318318
    319319        @Override
    320         public Collection<EventListener> getListeners() throws Exception {
     320        public Collection<EventListener<?>> getListeners() throws Exception {
    321321                return listeners.values();
    322322        }
  • trunk/objectmq/src/omq/common/event/EventDispatcher.java

    r30 r35  
    2424 *
    2525 */
     26@SuppressWarnings("rawtypes")
    2627public class EventDispatcher extends Thread {
    2728        private static EventDispatcher dispatcher;
     
    4344
    4445                String event_queue = env.getProperty(ParameterQueue.EVENT_REPLY_QUEUE);
    45                 channel.queueDeclare(event_queue, false, false, false, null);
     46                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false"));
     47                channel.queueDeclare(event_queue, durable, false, false, null);
    4648
    4749                // Declare a new consumer
     
    9698
    9799                                System.out.println("Event received -> Topic: " + event.getTopic() + "CorrId: " + event.getCorrId());
    98                                 //Log.saveLog("Client-Deserialize", delivery.getBody());
    99                                
    100                                 //long timeEnd = (new Date()).getTime();
    101                                 //Log.saveTimeSendRequestLog("Client-time-response", event.getCorrId(), "Event!",  timeEnd);
    102                                
     100                                // Log.saveLog("Client-Deserialize", delivery.getBody());
     101
     102                                // long timeEnd = (new Date()).getTime();
     103                                // Log.saveTimeSendRequestLog("Client-time-response",
     104                                // event.getCorrId(), "Event!", timeEnd);
     105
    103106                                // Dispatch it
    104107                                dispatch(event.getTopic(), event);
     
    171174                        for (final EventListener listener : listeners.get(topic)) {
    172175                                new Thread() {
     176                                        @SuppressWarnings("unchecked")
    173177                                        public void run() {
    174178                                                listener.notifyEvent(event);
  • trunk/objectmq/src/omq/common/util/ParameterQueue.java

    r34 r35  
    1212         */
    1313
    14         public static String SERIALIZERNAME = "omq.serializer";
     14        public static String SERIALIZER_NAME = "omq.serializer";
     15
     16        /**
     17         * Set whether the messages must be compressed or not
     18         */
    1519        public static String ENABLECOMPRESSION = "omq.compression";
    1620
     21        /**
     22         * Set the ip where the rabbitmq server is.
     23         */
    1724        public static String SERVER_HOST = "omq.host";
     25
     26        /**
     27         * Set the port that rabbitmq uses.
     28         */
    1829        public static String SERVER_PORT = "omq.port";
    19         public static String SERVER_REGISTRY = "omq.registry";
    2030
     31        /**
     32         * Set the clients username
     33         */
    2134        public static String USER_NAME = "omq.username";
     35
     36        /**
     37         * Set the clients password
     38         */
    2239        public static String USER_PASS = "omq.pass";
    2340
     41        /**
     42         * Set the exchange where the objectmq are listening
     43         */
    2444        public static String RPC_EXCHANGE = "omq.rpc_exchange";
    2545
     46        /**
     47         * Set the clients reply queue. Every client must have a different queue
     48         * name.
     49         */
    2650        public static String RPC_REPLY_QUEUE = "omq.reply_queue_rpc";
     51
     52        /**
     53         * Set the clients event queue. Every client must have a different queue
     54         * name.
     55         */
    2756        public static String EVENT_REPLY_QUEUE = "omq.reply_queue_event";
     57
     58        /**
     59         * Set if the queues must be durable. The queues won't be lost when rabbitmq
     60         * crashes if DURABLE_QUEUES is set trues.
     61         */
     62        public static String DURABLE_QUEUES = "omq.durable_queue";
     63
     64        /**
     65         * The MESSAGE_TTL_IN_QUEUES controls for how long a message published to
     66         * the queues can live before it is discarded. A message that has been in
     67         * the queue for longer than the configured TTL is said to be dead.
     68         *
     69         * This property must be a non-negative 32 bit integer (0 <= n <= 2^32-1),
     70         * describing the TTL period in milliseconds.
     71         */
     72        public static String MESSAGE_TTL_IN_QUEUES = "omq.message_ttl_queue";
     73
     74        // TODO persisten messages? the messages will be saved in the disk if this
     75        // flag is set true
    2876
    2977        public static String ENABLE_SSL = "omq.enable_ssl";
  • trunk/objectmq/src/omq/common/util/Serializer.java

    r34 r35  
    77import omq.common.message.Request;
    88import omq.common.message.Response;
     9import omq.common.util.Serializers.GsonImp;
    910import omq.common.util.Serializers.ISerializer;
     11import omq.common.util.Serializers.JavaImp;
     12import omq.common.util.Serializers.KryoImp;
    1013import omq.exception.EnvironmentException;
    1114import omq.exception.SerializerException;
     
    1821 */
    1922public class Serializer {
     23        public static String kryo = KryoImp.class.getCanonicalName();
     24        public static String java = JavaImp.class.getCanonicalName();
     25        public static String gson = GsonImp.class.getCanonicalName();
     26
    2027        public static ISerializer serializer;
    2128
     
    3643                        try {
    3744                                Properties env = Environment.getEnvironment();
    38                                 String className = env.getProperty(ParameterQueue.SERIALIZERNAME, "omq.common.util.Serializers.JavaImp");
     45                                String className = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.java);
    3946
    4047                                if (className == null || className.isEmpty()) {
  • trunk/objectmq/src/omq/server/InvocationThread.java

    r34 r35  
    4242                                System.out.println("Invoke method: " + methodName + " CorrID: " + requestID);
    4343
    44                                 // Changed ---------------------------------------
    45                                 Object result = null;
    46                                 if ("commit".equalsIgnoreCase(methodName)) {
    47                                         Object[] arguments = request.getArguments();
    48                                         arguments[1] = ((String) arguments[1]) + "@@" + requestID;
    49                                         result = obj.invokeMethod(methodName, arguments);
    50                                 } else {
    51                                         result = obj.invokeMethod(request.getMethod(), request.getArguments());
    52                                 }
    53                                 // -----------------------------------------------
    54 
    55                                 // // Invoke the method
    56                                 // Object result = obj.invokeMethod(request.getMethod(),
    57                                 // request.getArguments());
     44                                 // Invoke the method
     45                                 Object result = obj.invokeMethod(request.getMethod(),
     46                                 request.getArguments());
    5847
    5948                                if (!request.isAsync()) {
  • trunk/objectmq/src/omq/server/RemoteObject.java

    r34 r35  
    101101                                                Thread.sleep(milis);
    102102                                        } catch (InterruptedException e2) {
    103                                                 // TODO Auto-generated catch block
    104103                                                e2.printStackTrace();
    105104                                        }
     
    211210                String queue = UID;
    212211                String routingKey = UID;
     212                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false"));
    213213
    214214                // Start channel
     
    218218                System.out.println("RemoteObject: " + UID + " declaring direct exchange: " + exchange + ", Queue: " + queue);
    219219                channel.exchangeDeclare(exchange, "direct");
    220                 channel.queueDeclare(queue, false, false, false, null);
     220                channel.queueDeclare(queue, durable, false, false, null);
    221221                channel.queueBind(queue, exchange, routingKey);
    222222
     
    231231
    232232        @Override
    233         public void addListener(EventListener eventListener) throws Exception {
    234         }
    235 
    236         @Override
    237         public void removeListener(EventListener eventListener) throws Exception {
    238         }
    239 
    240         @Override
    241         public Collection<EventListener> getListeners() throws Exception {
     233        public void addListener(EventListener<?> eventListener) throws Exception {
     234        }
     235
     236        @Override
     237        public void removeListener(EventListener<?> eventListener) throws Exception {
     238        }
     239
     240        @Override
     241        public Collection<EventListener<?>> getListeners() throws Exception {
    242242                return null;
    243243        }
  • trunk/objectmq/test/calculatorTest/ClientTest.java

    r34 r35  
    77import omq.common.broker.Broker;
    88import omq.common.util.ParameterQueue;
     9import omq.common.util.Serializer;
    910
    1011import org.junit.BeforeClass;
     
    2425                env.setProperty(ParameterQueue.SERVER_HOST, "127.0.0.1");
    2526                env.setProperty(ParameterQueue.SERVER_PORT, "5672");
    26                 // env.setProperty(ParameterQueue.SERIALIZERNAME,
    27                 // "omq.common.util.Serializers.KryoImp");
    28                 env.setProperty(ParameterQueue.SERIALIZERNAME, "omq.common.util.Serializers.GsonImp");
     27                env.setProperty(ParameterQueue.DURABLE_QUEUES, "false");
     28                env.setProperty(ParameterQueue.SERIALIZER_NAME, Serializer.kryo);
    2929                env.setProperty(ParameterQueue.ENABLECOMPRESSION, "false");
    3030
  • trunk/objectmq/test/calculatorTest/ServerTest.java

    r34 r35  
    55import omq.common.broker.Broker;
    66import omq.common.util.ParameterQueue;
     7import omq.common.util.Serializer;
    78
    89public class ServerTest {
     
    1819                env.setProperty(ParameterQueue.SERVER_HOST, "127.0.0.1");
    1920                env.setProperty(ParameterQueue.SERVER_PORT, "5672");
    20                 // env.setProperty(ParameterQueue.SERIALIZERNAME,
    21                 // "omq.common.util.Serializers.KryoImp");
    22                 env.setProperty(ParameterQueue.SERIALIZERNAME, "omq.common.util.Serializers.GsonImp");
     21                env.setProperty(ParameterQueue.DURABLE_QUEUES, "true");
     22                env.setProperty(ParameterQueue.SERIALIZER_NAME, Serializer.kryo);
    2323                env.setProperty(ParameterQueue.ENABLECOMPRESSION, "false");
    2424
Note: See TracChangeset for help on using the changeset viewer.