Changeset 54 for trunk/src/main/java/omq


Ignore:
Timestamp:
06/21/13 12:42:25 (11 years ago)
Author:
stoda
Message:

Adding @MultiMethod?
Broker is not a singleton.

Location:
trunk/src/main/java/omq
Files:
1 added
7 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/main/java/omq/client/listener/ResponseListener.java

    r53 r54  
    5555        @Override
    5656        public void run() {
     57                logger.info("ResponseListener started");
    5758                Delivery delivery;
    5859                String uid_request;
     
    122123
    123124                channel.queueDeclare(reply_queue, durable, false, false, args);
     125                logger.info("ResponseListener creating queue: " + reply_queue + ", durable: " + durable);
    124126
    125127                // Declare a new consumer
  • trunk/src/main/java/omq/client/proxy/Proxymq.java

    r53 r54  
    1515import omq.Remote;
    1616import omq.client.annotation.AsyncMethod;
     17import omq.client.annotation.MultiMethod;
    1718import omq.client.annotation.SyncMethod;
    1819import omq.client.listener.ResponseListener;
     
    133134                Request request = createRequest(method, arguments);
    134135
    135                 // Log.saveTimeSendRequestLog("Client-time-request", request.getId(),
    136                 // method.getName(), timeStart);
    137 
    138136                Object response = null;
    139137                // Publish the request
     
    144142                        logger.debug("Publish sync request -> " + request.getId());
    145143                        response = publishSyncRequest(request, method.getReturnType());
    146 
    147                         // long timeEnd = (new Date()).getTime();
    148                         // Log.saveTimeSendRequestLog("Client-time-response",
    149                         // request.getId(), method.getName(), timeEnd);
    150144                }
    151145
     
    168162                // channel.basicPublish(exchange, routingkey, props, bytesRequest);
    169163                broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest);
    170                 // Log.saveLog("Client-Serialize", bytesRequest);
    171164        }
    172165
     
    203196                String corrId = java.util.UUID.randomUUID().toString();
    204197                String methodName = method.getName();
     198                boolean multi = false;
     199
     200                if (method.getAnnotation(MultiMethod.class) != null) {
     201                        multi = true;
     202                }
    205203
    206204                // Since we need to know whether the method is async and if it has to
     
    217215                        return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout);
    218216                } else {
    219                         return Request.newAsyncRequest(corrId, methodName, arguments);
     217                        return Request.newAsyncRequest(corrId, methodName, arguments, multi);
    220218                }
    221219        }
     
    237235                        }
    238236                        resp = serializer.deserializeResponse(results.get(corrId), type);
    239                         // Log.saveLog("Client-Deserialize", results.get(corrId));
    240237
    241238                        // Remove and indicate the key exists (a hashmap can contain a null
  • trunk/src/main/java/omq/common/broker/Broker.java

    r53 r54  
    176176                if (responseListener == null) {
    177177                        responseListener = new ResponseListener(this);
     178                        responseListener.start();
    178179                }
    179180                if (eventDispatcher == null) {
    180181                        eventDispatcher = new EventDispatcher(this);
     182                        eventDispatcher.start();
    181183                }
    182184        }
     
    197199                byte[] bytesResponse = serializer.serialize(wrapper);
    198200                channel.basicPublish(UID, "", null, bytesResponse);
    199 
    200                 // Log.saveLog("Server-Serialize", bytesResponse);
    201201        }
    202202
  • trunk/src/main/java/omq/common/event/EventDispatcher.java

    r53 r54  
    5656                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false"));
    5757                channel.queueDeclare(event_queue, durable, false, false, null);
     58                logger.info("EventDispatcher creating queue: " + event_queue + ", durable: " + durable);
    5859
    5960                // Declare a new consumer
     
    7273        @Override
    7374        public void run() {
     75                logger.info("EventDispatcher started");
    7476                Delivery delivery;
    7577                Event event;
     
    8486
    8587                                logger.info("Event received -> Topic: " + event.getTopic() + "CorrId: " + event.getCorrId());
    86                                 // Log.saveLog("Client-Deserialize", delivery.getBody());
    87 
    88                                 // long timeEnd = (new Date()).getTime();
    89                                 // Log.saveTimeSendRequestLog("Client-time-response",
    90                                 // event.getCorrId(), "Event!", timeEnd);
    9188
    9289                                // Dispatch it
  • trunk/src/main/java/omq/common/message/Request.java

    r44 r54  
    1515        private boolean async = false;
    1616
     17        private transient boolean multi;
    1718        private transient long timeout;
    1819        private transient int retries;
     
    3435        }
    3536
     37        public Request(String id2, String method2, boolean b, Object[] params2, boolean multi2) {
     38                // TODO Auto-generated constructor stub
     39        }
     40
    3641        public static Request newSyncRequest(String id, String method, Object[] params) {
    3742                return new Request(id, method, false, params);
     
    4550        }
    4651
    47         public static Request newAsyncRequest(String id, String method, Object[] params) {
    48                 return new Request(id, method, true, params);
     52        public static Request newAsyncRequest(String id, String method, Object[] params, boolean multi) {
     53                return new Request(id, method, true, params, multi);
    4954        }
    5055
     
    97102        }
    98103
     104        public boolean isMulti() {
     105                return multi;
     106        }
     107
     108        public void setMulti(boolean multi) {
     109                this.multi = multi;
     110        }
    99111}
  • trunk/src/main/java/omq/server/InvocationThread.java

    r53 r54  
    4444                                // Deserialize the json
    4545                                Request request = serializer.deserializeRequest(serializerType, delivery.getBody(), obj);
    46                                 // Log.saveLog("Server-Deserialize", delivery.getBody());
    47 
    4846                                String methodName = request.getMethod();
    4947                                String requestID = request.getId();
     
    7775                                        byte[] bytesResponse = serializer.serialize(serializerType, resp);
    7876                                        channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse);
    79 
    80                                         // Log.saveLog("Server-Serialize", bytesResponse);
     77                                        logger.debug("Publish sync response -> Object: " + obj.getRef() + ", method: " + methodName + " corrID: " + requestID + " replyTo: "
     78                                                        + props.getReplyTo());
    8179                                }
    8280
  • trunk/src/main/java/omq/server/RemoteObject.java

    r53 r54  
    3535
    3636        private static final long serialVersionUID = -1778953938739846450L;
     37        private static final String multi = "multi#";
    3738        private static final Logger logger = Logger.getLogger(RemoteObject.class.getName());
    3839
     
    230231                logger.info("RemoteObject: " + UID + " declaring direct exchange: " + exchange + ", Queue: " + queue);
    231232                channel.exchangeDeclare(exchange, "direct");
     233                channel.exchangeDeclare(multi + exchange, "fanout");
    232234                channel.queueDeclare(queue, durable, false, false, null);
    233235                channel.queueBind(queue, exchange, routingKey);
     236                channel.queueBind(queue, multi + exchange, routingKey);
    234237
    235238                // Declare the event topic fanout
Note: See TracChangeset for help on using the changeset viewer.