Changeset 58 for trunk/src/main/java


Ignore:
Timestamp:
06/25/13 16:38:31 (11 years ago)
Author:
stoda
Message:

@MultiMethod? + @SyncMethod? implemented and tested

Location:
trunk/src/main/java/omq
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/main/java/omq/client/proxy/Proxymq.java

    r57 r58  
    22
    33import java.io.IOException;
     4import java.lang.reflect.Array;
    45import java.lang.reflect.InvocationHandler;
    56import java.lang.reflect.Method;
    67import java.lang.reflect.Proxy;
    7 import java.util.ArrayList;
    88import java.util.Collection;
    99import java.util.HashMap;
    1010import java.util.Hashtable;
    11 import java.util.List;
    1211import java.util.Map;
    1312import java.util.Properties;
    14 
    15 import org.apache.log4j.Logger;
    1613
    1714import omq.Remote;
     
    3431import omq.exception.TimeoutException;
    3532
     33import org.apache.log4j.Logger;
     34
    3635import com.rabbitmq.client.AMQP.BasicProperties;
    3736
     
    190189                        try {
    191190                                publishMessage(request, replyQueueName);
    192                                 return getResult(corrId, timeout, type);
     191                                if (request.isMulti()) {
     192                                        return getResults(corrId, 2, timeout, type);
     193                                } else {
     194                                        return getResult(corrId, timeout, type);
     195                                }
     196
    193197                        } catch (TimeoutException te) {
    194198                                logger.error(te);
     
    203207                String methodName = method.getName();
    204208                boolean multi = false;
     209                int wait = 0;
    205210
    206211                if (method.getAnnotation(MultiMethod.class) != null) {
    207212                        multi = true;
     213                        wait = method.getAnnotation(MultiMethod.class).waitNum();
    208214                }
    209215
     
    219225                                timeout = sync.timeout();
    220226                        }
    221                         return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout, multi);
     227                        return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout, multi, wait);
    222228                } else {
    223229                        return Request.newAsyncRequest(corrId, methodName, arguments, multi);
     
    229235
    230236                // Wait for the results.
    231                 long localTimeout = 0;
     237                long localTimeout = timeout;
    232238                long start = System.currentTimeMillis();
    233239                synchronized (results) {
    234240                        // Due to we are using notifyAll(), we need to control the real time
    235241                        while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
    236                                 results.wait(timeout);
     242                                results.wait(localTimeout);
    237243                                localTimeout = System.currentTimeMillis() - start;
    238244                        }
     
    258264        }
    259265
    260         @SuppressWarnings("unused")
    261266        private Object getResults(String corrId, int wait, long timeout, Class<?> type) throws Exception {
    262267                Response resp = null;
    263                 List<Object> list = new ArrayList<Object>();
     268                Class<?> actualType = type.getComponentType();
     269
     270                Object array = Array.newInstance(actualType, wait);
    264271
    265272                int i = 0;
     
    279286                                }
    280287                                // Remove the corrId to receive new replies
    281                                 resp = serializer.deserializeResponse(results.remove(corrId), type);
    282                                 list.add(resp.getResult());
    283 
     288                                resp = serializer.deserializeResponse(results.remove(corrId), actualType);
     289                                System.out.println("/n/n/n/n/nResult type: "+resp.getResult()+" /n/n/n/n/n");
     290                                Array.set(array, i, resp.getResult());
    284291                        }
    285292                        i++;
     
    289296                }
    290297
    291                 return list;
     298                return array;
    292299        }
    293300
  • trunk/src/main/java/omq/common/message/Request.java

    r55 r58  
    1616
    1717        private transient boolean multi;
     18        private transient int wait;
    1819        private transient long timeout;
    1920        private transient int retries;
     
    4748        }
    4849
    49         public static Request newSyncRequest(String id, String method, Object[] params, int retries, long timeout, boolean multi) {
     50        public static Request newSyncRequest(String id, String method, Object[] params, int retries, long timeout, boolean multi, int wait) {
    5051                Request req = new Request(id, method, false, params, multi);
    5152                req.setRetries(retries);
    5253                req.setTimeout(timeout);
     54                req.setWait(wait);
    5355                return req;
    5456        }
     
    113115                this.multi = multi;
    114116        }
     117
     118        public int getWait() {
     119                return wait;
     120        }
     121
     122        public void setWait(int wait) {
     123                this.wait = wait;
     124        }
    115125}
  • trunk/src/main/java/omq/server/RemoteObject.java

    r57 r58  
    3939
    4040        private String UID;
     41        private String multiQueue;
    4142        private Properties env;
    4243        private transient Broker broker;
     
    6667                this.broker = broker;
    6768                UID = reference;
     69                multiQueue = UID + System.currentTimeMillis();
    6870                env = broker.getEnvironment();
    6971                serializer = broker.getSerializer();
     
    225227                // Multi info
    226228                String multiExchange = multi + exchange;
    227                 //TODO: change this
    228                 String multiQueue = UID + System.currentTimeMillis();
    229229
    230230                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false"));
Note: See TracChangeset for help on using the changeset viewer.