Changeset 70


Ignore:
Timestamp:
06/28/13 12:41:11 (11 years ago)
Author:
stoda
Message:

MultiProxymq? added

Location:
trunk
Files:
3 added
5 deleted
2 edited
1 moved

Legend:

Unmodified
Added
Removed
  • trunk/src/main/java/omq/client/proxy/Proxymq.java

    r62 r70  
    55import java.lang.reflect.InvocationHandler;
    66import java.lang.reflect.Method;
    7 import java.lang.reflect.Proxy;
    87import java.util.Collection;
    98import java.util.HashMap;
     
    5049
    5150        private String uid;
     51        private transient String exchange;
     52        private transient String multiExchange;
     53        private transient String replyQueueName;
    5254        private transient String serializerType;
    5355        private transient Broker broker;
     
    9698                // this.channel = Broker.getChannel();
    9799                env = broker.getEnvironment();
     100                exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
     101                multiExchange = multi + exchange;
     102                replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
    98103
    99104                // set the serializer type
     
    134139                if (request.isAsync()) {
    135140                        logger.debug("Publish async request -> " + request.getId());
    136                         publishAsyncRequest(request);
     141                        publishMessage(request, replyQueueName);
    137142                } else {
    138143                        logger.debug("Publish sync request -> " + request.getId());
     
    151156
    152157                if (request.isMulti()) {
    153                         exchange = multi + env.getProperty(ParameterQueue.RPC_EXCHANGE);
     158                        exchange = multiExchange;
    154159                        routingkey = "";
    155160                } else {
    156                         exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
     161                        exchange = this.exchange;
    157162                        routingkey = uid;
    158163                }
     
    166171        }
    167172
    168         private void publishAsyncRequest(Request request) throws Exception {
    169                 // Get the environment properties
    170                 String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
    171                 publishMessage(request, replyQueueName);
    172         }
    173 
    174173        private Object publishSyncRequest(Request request, Class<?> type) throws Exception {
    175174                String corrId = request.getId();
     
    177176                int retries = request.getRetries();
    178177                long timeout = request.getTimeout();
    179 
    180                 // Get the environment properties
    181                 String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
    182178
    183179                // Publish the message
     
    296292
    297293        /**
    298          * Returns an instance of a proxy class for the specified interfaces that
    299          * dispatches method invocations to the specified invocation handler. * @param
    300          * loader
    301          *
    302          * @param loader
    303          *            the class loader to define the proxy class
    304          *
    305          * @param interfaces
    306          *            the list of interfaces for the proxy class to implement
    307          * @param proxy
    308          *            the invocation handler to dispatch method invocations to
    309          * @return a proxy instance with the specified invocation handler of a proxy
    310          *         class that is defined by the specified class loader and that
    311          *         implements the specified interfaces
    312          */
    313         public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, Proxymq proxy) {
    314                 return Proxy.newProxyInstance(loader, interfaces, proxy);
    315         }
    316 
    317         /**
    318294         * Gets the Map used internally to retreive the response of the server
    319295         *
  • trunk/src/main/java/omq/common/broker/Broker.java

    r66 r70  
    22
    33import java.io.IOException;
     4import java.lang.reflect.Proxy;
    45import java.net.URL;
    56import java.util.HashMap;
     
    1011import omq.Remote;
    1112import omq.client.listener.ResponseListener;
     13import omq.client.proxy.MultiProxymq;
    1214import omq.client.proxy.Proxymq;
    1315import omq.common.event.Event;
     
    4648        private Map<String, RemoteObject> remoteObjs;
    4749        private Map<String, Object> proxies = new Hashtable<String, Object>();
     50        private Map<String, Object> multiProxies = new Hashtable<String, Object>();
    4851
    4952        public Broker(Properties env) throws Exception {
     
    7376                        responseListener.kill();
    7477                        eventDispatcher.kill();
    75                         //TODO proxies = null; ??
     78                        // TODO proxies = null; ??
    7679                }
    7780                // Stop all the remote objects working
     
    136139                                Proxymq proxy = new Proxymq(reference, contract, this);
    137140                                Class<?>[] array = { contract };
    138                                 Object newProxy = Proxymq.newProxyInstance(contract.getClassLoader(), array, proxy);
     141                                Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy);
    139142                                proxies.put(reference, newProxy);
    140143                                return (T) newProxy;
     
    147150        }
    148151
     152        @SuppressWarnings("unchecked")
     153        public synchronized <T extends Remote> T lookupMulti(String reference, Class<T> contract) throws RemoteException {
     154                try {
     155                        if (!multiProxies.containsKey(reference)) {
     156                                MultiProxymq proxy = new MultiProxymq(reference, contract, this);
     157                                Class<?>[] array = { contract };
     158                                Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy);
     159                                multiProxies.put(reference, newProxy);
     160                                return (T) newProxy;
     161                        }
     162                        return (T) multiProxies.get(reference);
     163
     164                } catch (Exception e) {
     165                        throw new RemoteException(e);
     166                }
     167        }
     168
    149169        public void bind(String reference, RemoteObject remote) throws RemoteException {
    150170                try {
     
    155175                }
    156176        }
    157        
     177
    158178        public void startTriggerEvent(String reference, RemoteObject remote) throws RemoteException {
    159179                try {
  • trunk/src/test/java/omq/test/event/MessageTest.java

    r67 r70  
    33import static org.junit.Assert.assertEquals;
    44
     5import java.util.Arrays;
     6import java.util.Collection;
    57import java.util.Properties;
    68
     
    911import omq.common.util.Serializer;
    1012
     13import org.junit.After;
    1114import org.junit.BeforeClass;
    1215import org.junit.Test;
     16import org.junit.runner.RunWith;
     17import org.junit.runners.Parameterized;
     18import org.junit.runners.Parameterized.Parameters;
    1319
    14 public class EventTest {
    15         private static EventTriggerImpl trigger;
     20@RunWith(value = Parameterized.class)
     21public class MessageTest {
     22        private static final String NAME = "message";
     23
     24        private static Broker broker;
     25        private static Message serverProxy;
     26        private static MessageImpl clientM1;
     27        private static MessageImpl clientM2;
     28
     29        public MessageTest(String type) throws Exception {
     30                Properties env = new Properties();
     31                env.setProperty(ParameterQueue.USER_NAME, "guest");
     32                env.setProperty(ParameterQueue.USER_PASS, "guest");
     33
     34                // Set host info of rabbimq (where it is)
     35                env.setProperty(ParameterQueue.SERVER_HOST, "127.0.0.1");
     36                env.setProperty(ParameterQueue.SERVER_PORT, "5672");
     37                env.setProperty(ParameterQueue.DURABLE_QUEUES, "false");
     38                env.setProperty(ParameterQueue.SERIALIZER_NAME, type);
     39                env.setProperty(ParameterQueue.ENABLECOMPRESSION, "false");
     40
     41                // Set info about where the message will be sent
     42                env.setProperty(ParameterQueue.RPC_EXCHANGE, "rpc_exchange");
     43
     44                // Set info about the queue & the exchange where the ResponseListener
     45                // will listen to.
     46                env.setProperty(ParameterQueue.RPC_REPLY_QUEUE, "reply_queue");
     47                env.setProperty(ParameterQueue.EVENT_REPLY_QUEUE, "event_queue");
     48
     49                broker = new Broker(env);
     50                serverProxy = broker.lookupMulti(NAME, Message.class);
     51        }
     52
     53        @Parameters
     54        public static Collection<Object[]> data() {
     55                Object[][] data = new Object[][] { { Serializer.JAVA }, { Serializer.GSON }, { Serializer.KRYO } };
     56                return Arrays.asList(data);
     57        }
    1658
    1759        @BeforeClass
    18         public static void server() throws Exception {
     60        public static void client() throws Exception {
    1961                Properties env = new Properties();
    2062                env.setProperty(ParameterQueue.USER_NAME, "guest");
     
    3173                env.setProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000");
    3274
    33                 trigger = new EventTriggerImpl();
    3475                Broker broker = new Broker(env);
     76                clientM1 = new MessageImpl();
     77                broker.bind(NAME, clientM1);
    3578
    36                 broker.startTriggerEvent("trigger", trigger);
     79                Broker broker2 = new Broker(env);
     80                clientM2 = new MessageImpl();
     81                broker2.bind(NAME, clientM2);
     82        }
    3783
    38                 System.out.println("Server started");
     84        @After
     85        public void stop() throws Exception {
     86                broker.stopBroker();
    3987        }
    4088
    4189        @Test
    42         public void eventTest() throws Exception {
    43                 String expected = "This is an event";
    44                 String actual = null;
     90        public void test() throws Exception {
     91                String expected = "Hello";
    4592
    46                 Properties env = new Properties();
    47                 env.setProperty(ParameterQueue.USER_NAME, "guest");
    48                 env.setProperty(ParameterQueue.USER_PASS, "guest");
     93                serverProxy.setMessage(expected);
     94                Thread.sleep(200);
     95                assertEquals(expected, clientM1.getMessage());
     96                assertEquals(expected, clientM2.getMessage());
     97                serverProxy.setMessage("");
     98                Thread.sleep(200);
     99        }
    49100
    50                 // Set host info of rabbimq (where it is)
    51                 env.setProperty(ParameterQueue.SERVER_HOST, "127.0.0.1");
    52                 env.setProperty(ParameterQueue.SERVER_PORT, "5672");
    53                 env.setProperty(ParameterQueue.DURABLE_QUEUES, "false");
    54                 env.setProperty(ParameterQueue.SERIALIZER_NAME, Serializer.JAVA);
    55                 env.setProperty(ParameterQueue.ENABLECOMPRESSION, "false");
    56 
    57                 // Set info about where the message will be sent
    58                 env.setProperty(ParameterQueue.RPC_EXCHANGE, "rpc_exchange");
    59                 // env.setProperty(ParameterQueue.DEBUGFILE, "c:\\middlewareDebug");
    60 
    61                 // Set info about the queue & the exchange where the ResponseListener
    62                 // will listen to.
    63                 env.setProperty(ParameterQueue.RPC_REPLY_QUEUE, "reply_queue");
    64                 env.setProperty(ParameterQueue.EVENT_REPLY_QUEUE, "event_queue");
    65 
    66                 Broker broker = new Broker(env);
    67                 EventTrigger et = broker.lookup("trigger", EventTrigger.class);
    68 
    69                 MessageListener ms = new MessageListener();
    70                 et.addListener(ms);
    71                 trigger.triggerEvent(expected);
    72                 Thread.sleep(500);
    73                 actual = ms.getMessage();
    74 
    75                 assertEquals(expected, actual);
    76         }
    77101}
Note: See TracChangeset for help on using the changeset viewer.