Changeset 55


Ignore:
Timestamp:
06/21/13 16:55:58 (11 years ago)
Author:
stoda
Message:

@MultiMethod? implemented and working with @AsyncMethod? annotation.
TODO: @Multi with SyncMethod?(waitNum = x) -> Must return a List<?>
Refactoring in Proxymq if it's necessary

Location:
trunk
Files:
1 added
4 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/main/java/omq/client/proxy/Proxymq.java

    r54 r55  
    4848        private static final long serialVersionUID = 1L;
    4949        private static final Logger logger = Logger.getLogger(Proxymq.class.getName());
     50        private static final String multi = "multi#";
    5051        private static Map<String, Object> proxies = new Hashtable<String, Object>();
    5152
     
    5657        private transient EventDispatcher dispatcher;
    5758        private transient Serializer serializer;
    58         // private transient Channel channel;
    5959        private transient Properties env;
    6060        private transient Map<String, byte[]> results;
     
    111111        @Override
    112112        public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
    113                 // long timeStart = (new Date()).getTime();
    114 
    115113                // Local methods only
    116114                String methodName = method.getName();
     
    151149
    152150                // Get the environment properties
    153                 String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
    154                 String routingkey = this.uid;
     151                String exchange;
     152                String routingkey;
     153
     154                if (request.isMulti()) {
     155                        exchange = multi + env.getProperty(ParameterQueue.RPC_EXCHANGE);
     156                        routingkey = "";
     157                } else {
     158                        exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
     159                        routingkey = uid;
     160                }
    155161
    156162                // Add the correlation ID and create a replyTo property
     
    159165                // Publish the message
    160166                byte[] bytesRequest = serializer.serialize(serializerType, request);
    161                 // TODO See this
    162                 // channel.basicPublish(exchange, routingkey, props, bytesRequest);
    163167                broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest);
    164168        }
     
    213217                                timeout = sync.timeout();
    214218                        }
    215                         return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout);
     219                        return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout, multi);
    216220                } else {
    217221                        return Request.newAsyncRequest(corrId, methodName, arguments, multi);
  • trunk/src/main/java/omq/common/message/Request.java

    r54 r55  
    3535        }
    3636
    37         public Request(String id2, String method2, boolean b, Object[] params2, boolean multi2) {
    38                 // TODO Auto-generated constructor stub
     37        private Request(String id, String method, boolean async, Object[] params, boolean multi) {
     38                this.id = id;
     39                this.method = method;
     40                this.async = async;
     41                this.params = params;
     42                this.multi = multi;
    3943        }
    4044
     
    4347        }
    4448
    45         public static Request newSyncRequest(String id, String method, Object[] params, int retries, long timeout) {
    46                 Request req = new Request(id, method, false, params);
     49        public static Request newSyncRequest(String id, String method, Object[] params, int retries, long timeout, boolean multi) {
     50                Request req = new Request(id, method, false, params, multi);
    4751                req.setRetries(retries);
    4852                req.setTimeout(timeout);
  • trunk/src/main/java/omq/server/RemoteObject.java

    r54 r55  
    223223                String queue = UID;
    224224                String routingKey = UID;
     225                // Multi info
     226                String multiExchange = multi + exchange;
     227                String multiQueue = UID + System.currentTimeMillis();
     228
    225229                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false"));
    226230
     
    231235                logger.info("RemoteObject: " + UID + " declaring direct exchange: " + exchange + ", Queue: " + queue);
    232236                channel.exchangeDeclare(exchange, "direct");
    233                 channel.exchangeDeclare(multi + exchange, "fanout");
    234237                channel.queueDeclare(queue, durable, false, false, null);
    235238                channel.queueBind(queue, exchange, routingKey);
    236                 channel.queueBind(queue, multi + exchange, routingKey);
     239
     240                channel.exchangeDeclare(multiExchange, "fanout");
     241                channel.queueDeclare(multiQueue, durable, false, false, null);
     242                channel.queueBind(multiQueue, multiExchange, "");
    237243
    238244                // Declare the event topic fanout
     
    243249                consumer = new QueueingConsumer(channel);
    244250                channel.basicConsume(queue, true, consumer);
     251                channel.basicConsume(multiQueue, true, consumer);
    245252        }
    246253
  • trunk/src/test/java/omq/test/multiProcess/MultiProcessTest.java

    r54 r55  
    9090                assertEquals(x, b);
    9191                remoteNumber.setNumber(0);
     92                remoteNumber.setNumber(0);
    9293        }
    9394
Note: See TracChangeset for help on using the changeset viewer.