Changeset 58


Ignore:
Timestamp:
06/25/13 16:38:31 (11 years ago)
Author:
stoda
Message:

@MultiMethod? + @SyncMethod? implemented and tested

Location:
trunk/src
Files:
1 added
1 deleted
6 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/main/java/omq/client/proxy/Proxymq.java

    r57 r58  
    22
    33import java.io.IOException;
     4import java.lang.reflect.Array;
    45import java.lang.reflect.InvocationHandler;
    56import java.lang.reflect.Method;
    67import java.lang.reflect.Proxy;
    7 import java.util.ArrayList;
    88import java.util.Collection;
    99import java.util.HashMap;
    1010import java.util.Hashtable;
    11 import java.util.List;
    1211import java.util.Map;
    1312import java.util.Properties;
    14 
    15 import org.apache.log4j.Logger;
    1613
    1714import omq.Remote;
     
    3431import omq.exception.TimeoutException;
    3532
     33import org.apache.log4j.Logger;
     34
    3635import com.rabbitmq.client.AMQP.BasicProperties;
    3736
     
    190189                        try {
    191190                                publishMessage(request, replyQueueName);
    192                                 return getResult(corrId, timeout, type);
     191                                if (request.isMulti()) {
     192                                        return getResults(corrId, 2, timeout, type);
     193                                } else {
     194                                        return getResult(corrId, timeout, type);
     195                                }
     196
    193197                        } catch (TimeoutException te) {
    194198                                logger.error(te);
     
    203207                String methodName = method.getName();
    204208                boolean multi = false;
     209                int wait = 0;
    205210
    206211                if (method.getAnnotation(MultiMethod.class) != null) {
    207212                        multi = true;
     213                        wait = method.getAnnotation(MultiMethod.class).waitNum();
    208214                }
    209215
     
    219225                                timeout = sync.timeout();
    220226                        }
    221                         return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout, multi);
     227                        return Request.newSyncRequest(corrId, methodName, arguments, retries, timeout, multi, wait);
    222228                } else {
    223229                        return Request.newAsyncRequest(corrId, methodName, arguments, multi);
     
    229235
    230236                // Wait for the results.
    231                 long localTimeout = 0;
     237                long localTimeout = timeout;
    232238                long start = System.currentTimeMillis();
    233239                synchronized (results) {
    234240                        // Due to we are using notifyAll(), we need to control the real time
    235241                        while (!results.containsKey(corrId) && (timeout - localTimeout) >= 0) {
    236                                 results.wait(timeout);
     242                                results.wait(localTimeout);
    237243                                localTimeout = System.currentTimeMillis() - start;
    238244                        }
     
    258264        }
    259265
    260         @SuppressWarnings("unused")
    261266        private Object getResults(String corrId, int wait, long timeout, Class<?> type) throws Exception {
    262267                Response resp = null;
    263                 List<Object> list = new ArrayList<Object>();
     268                Class<?> actualType = type.getComponentType();
     269
     270                Object array = Array.newInstance(actualType, wait);
    264271
    265272                int i = 0;
     
    279286                                }
    280287                                // Remove the corrId to receive new replies
    281                                 resp = serializer.deserializeResponse(results.remove(corrId), type);
    282                                 list.add(resp.getResult());
    283 
     288                                resp = serializer.deserializeResponse(results.remove(corrId), actualType);
     289                                System.out.println("/n/n/n/n/nResult type: "+resp.getResult()+" /n/n/n/n/n");
     290                                Array.set(array, i, resp.getResult());
    284291                        }
    285292                        i++;
     
    289296                }
    290297
    291                 return list;
     298                return array;
    292299        }
    293300
  • trunk/src/main/java/omq/common/message/Request.java

    r55 r58  
    1616
    1717        private transient boolean multi;
     18        private transient int wait;
    1819        private transient long timeout;
    1920        private transient int retries;
     
    4748        }
    4849
    49         public static Request newSyncRequest(String id, String method, Object[] params, int retries, long timeout, boolean multi) {
     50        public static Request newSyncRequest(String id, String method, Object[] params, int retries, long timeout, boolean multi, int wait) {
    5051                Request req = new Request(id, method, false, params, multi);
    5152                req.setRetries(retries);
    5253                req.setTimeout(timeout);
     54                req.setWait(wait);
    5355                return req;
    5456        }
     
    113115                this.multi = multi;
    114116        }
     117
     118        public int getWait() {
     119                return wait;
     120        }
     121
     122        public void setWait(int wait) {
     123                this.wait = wait;
     124        }
    115125}
  • trunk/src/main/java/omq/server/RemoteObject.java

    r57 r58  
    3939
    4040        private String UID;
     41        private String multiQueue;
    4142        private Properties env;
    4243        private transient Broker broker;
     
    6667                this.broker = broker;
    6768                UID = reference;
     69                multiQueue = UID + System.currentTimeMillis();
    6870                env = broker.getEnvironment();
    6971                serializer = broker.getSerializer();
     
    225227                // Multi info
    226228                String multiExchange = multi + exchange;
    227                 //TODO: change this
    228                 String multiQueue = UID + System.currentTimeMillis();
    229229
    230230                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false"));
  • trunk/src/test/java/omq/test/multiProcess/MultiProcessTest.java

    r57 r58  
    2121public class MultiProcessTest {
    2222        public static Broker broker;
    23         public static Number remoteNumber;
     23        public static NumberClient remoteNumber;
    2424
    2525        public MultiProcessTest(String type) throws Exception {
     
    4444
    4545                broker = new Broker(env);
    46                 remoteNumber = broker.lookup("number", Number.class);
     46                remoteNumber = broker.lookup("number", NumberClient.class);
    4747        }
    4848
     
    8585                int x = 10;
    8686                remoteNumber.setNumber(x);
    87                 int a = remoteNumber.getNumer();
     87                int a = remoteNumber.getNumber();
    8888                assertEquals(0, a);
    89                 int b = remoteNumber.getNumer();
     89                int b = remoteNumber.getNumber();
    9090                assertEquals(x, b);
    9191                remoteNumber.setNumber(0);
     
    9898                remoteNumber.setMultiNumber(x);
    9999                Thread.sleep(200);
    100                 int a = remoteNumber.getNumer();
    101                 int b = remoteNumber.getNumer();
     100                int a = remoteNumber.getNumber();
     101                int b = remoteNumber.getNumber();
    102102                assertEquals(x, a);
    103103                assertEquals(x, b);
     104                int[] number = remoteNumber.getMultiNumber();
     105                assertEquals(x, number[0]);
     106                assertEquals(x, number[1]);
    104107                remoteNumber.setMultiNumber(0);
    105108                Thread.sleep(200);
  • trunk/src/test/java/omq/test/multiProcess/Number.java

    r54 r58  
    22
    33import omq.Remote;
    4 import omq.client.annotation.AsyncMethod;
    5 import omq.client.annotation.MultiMethod;
    6 import omq.client.annotation.RemoteInterface;
    7 import omq.client.annotation.SyncMethod;
    84
    9 @RemoteInterface
    105public interface Number extends Remote {
    11         @SyncMethod(timeout = 1000)
    126        public void setNumber(int x);
    137
    14         @SyncMethod(timeout = 1000)
    15         public int getNumer();
     8        public int getNumber();
    169
    17         @MultiMethod
    18         @AsyncMethod
    1910        public void setMultiNumber(int x);
     11
     12        public int getMultiNumber();
    2013}
  • trunk/src/test/java/omq/test/multiProcess/NumberImpl.java

    r54 r58  
    11package omq.test.multiProcess;
    22
    3 import omq.client.annotation.AsyncMethod;
    4 import omq.client.annotation.MultiMethod;
    5 import omq.client.annotation.SyncMethod;
    63import omq.server.RemoteObject;
    74
     
    2017        }
    2118
    22         @Override
    23         @SyncMethod
    2419        public void setNumber(int x) {
    2520                this.x = x;
    2621        }
    2722
    28         @Override
    29         @SyncMethod(timeout = 1000)
    30         public int getNumer() {
     23        public int getNumber() {
    3124                return x;
    3225        }
    3326
    34         @Override
    35         @MultiMethod
    36         @AsyncMethod
    3727        public void setMultiNumber(int x) {
    3828                this.x = x;
    3929        }
    4030
     31        public int getMultiNumber() {
     32                return x;
     33        }
     34
    4135}
Note: See TracChangeset for help on using the changeset viewer.