Changeset 104 for branches/supervisor


Ignore:
Timestamp:
10/16/13 13:01:04 (11 years ago)
Author:
stoda
Message:

MultiMethod? without numWait -> it depends on the timeout

Location:
branches/supervisor
Files:
2 added
4 edited

Legend:

Unmodified
Added
Removed
  • branches/supervisor/src/main/java/omq/client/annotation/MultiMethod.java

    r83 r104  
    1515@Target(ElementType.METHOD)
    1616public @interface MultiMethod {
    17         /**
    18          * If @MultiMethod is followed by @SyncMethod waitNum indicates how many
    19          * responses we will wait for.
    20          *
    21          * @return length of the array of responses we are waiting for.
    22          */
    23         int waitNum() default 1;
    2417}
  • branches/supervisor/src/main/java/omq/client/proxy/Proxymq.java

    r99 r104  
    44import java.lang.reflect.InvocationHandler;
    55import java.lang.reflect.Method;
     6import java.util.ArrayList;
    67import java.util.HashMap;
     8import java.util.List;
    79import java.util.Map;
    810import java.util.Properties;
     
    201203                                publishMessage(request, replyQueueName);
    202204                                if (request.isMulti()) {
    203                                         return getResults(corrId, request.getWait(), timeout, type);
     205                                        return getResults(corrId, timeout, type);
    204206                                } else {
    205207                                        return getResult(corrId, timeout, type);
     
    228230                String methodName = method.getName();
    229231                boolean multi = false;
    230                 int wait = 0;
    231232
    232233                if (method.getAnnotation(MultiMethod.class) != null) {
    233234                        multi = true;
    234                         wait = method.getAnnotation(MultiMethod.class).waitNum();
    235235                }
    236236
     
    246246                                timeout = sync.timeout();
    247247                        }
    248                         return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout, multi, wait);
     248                        return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout, multi);
    249249                } else {
    250250                        return Request.newAsyncRequest(corrId, methodName, arguments, multi);
     
    291291         * @param corrId
    292292         *            - Correlation Id of the request
    293          * @param wait
    294          *            - Array length
    295293         * @param timeout
    296294         *            - Timeout read in @SyncMethod.timeout(). If the timeout is set
     
    302300         * @throws Exception
    303301         */
    304         private Object getResults(String corrId, int wait, long timeout, Class<?> type) throws Exception {
     302        private Object getResults(String corrId, long timeout, Class<?> type) throws Exception {
    305303                Response resp = null;
    306304                // Get the component type of an array
    307305                Class<?> actualType = type.getComponentType();
    308306
    309                 Object array = Array.newInstance(actualType, wait);
     307                List<Object> list = new ArrayList<Object>();
    310308
    311309                int i = 0;
     
    313311                long start = System.currentTimeMillis();
    314312
    315                 while (i < wait) {
     313                while (true) {
    316314                        synchronized (results) {
    317315                                // Due to we are using notifyAll(), we need to control the real
     
    322320                                }
    323321                                if ((timeout - localTimeout) <= 0) {
    324                                         throw new TimeoutException("Timeout exception time: " + timeout);
     322                                        break;
    325323                                }
    326324                                // Remove the corrId to receive new replies
    327325                                resp = serializer.deserializeResponse(results.remove(corrId), actualType);
    328                                 Array.set(array, i, resp.getResult());
     326                                list.add(resp.getResult());
    329327                        }
    330328                        i++;
    331329                }
     330
     331                if (i == 0) {
     332                        results.remove(corrId);
     333                        throw new TimeoutException("Timeout exception time: " + timeout);
     334                }
     335
    332336                synchronized (results) {
    333337                        results.put(corrId, null);
     338                }
     339
     340                Object array = Array.newInstance(actualType, i);
     341                i = 0;
     342                for (Object o : list) {
     343                        Array.set(array, i++, o);
    334344                }
    335345
  • branches/supervisor/src/main/java/omq/common/message/Request.java

    r83 r104  
    2525
    2626        private transient boolean multi;
    27         private transient int wait;
    2827        private transient long timeout;
    2928        private transient int retries;
     
    7877         * @param multi
    7978         *            - If the method is multi
    80          * @param wait
    81          *            - If the method is multi how many responses will be listened
    8279         * @return - new SyncRequest
    8380         */
    84         public static Request newSyncRequest(String id, String method, Object[] params, int retries, long timeout, boolean multi, int wait) {
     81        public static Request newSyncRequest(String id, String method, Object[] params, int retries, long timeout, boolean multi) {
    8582                Request req = new Request(id, method, false, params, multi);
    8683                req.setRetries(retries);
    8784                req.setTimeout(timeout);
    88                 req.setWait(wait);
    8985                return req;
    9086        }
     
    163159        }
    164160
    165         public int getWait() {
    166                 return wait;
    167         }
    168 
    169         public void setWait(int wait) {
    170                 this.wait = wait;
    171         }
    172161}
  • branches/supervisor/src/test/java/omq/test/multiProcess/NumberClient.java

    r83 r104  
    2424        public void setMultiNumber(int x);
    2525
    26         @MultiMethod(waitNum = 2)
     26        @MultiMethod()
    2727        @SyncMethod(timeout = 1000)
    2828        public int[] getMultiNumber();
Note: See TracChangeset for help on using the changeset viewer.