Changeset 70
- Timestamp:
- 06/28/13 12:41:11 (11 years ago)
- Location:
- trunk
- Files:
-
- 3 added
- 5 deleted
- 2 edited
- 1 moved
Legend:
- Unmodified
- Added
- Removed
-
trunk/src/main/java/omq/client/proxy/Proxymq.java
r62 r70 5 5 import java.lang.reflect.InvocationHandler; 6 6 import java.lang.reflect.Method; 7 import java.lang.reflect.Proxy;8 7 import java.util.Collection; 9 8 import java.util.HashMap; … … 50 49 51 50 private String uid; 51 private transient String exchange; 52 private transient String multiExchange; 53 private transient String replyQueueName; 52 54 private transient String serializerType; 53 55 private transient Broker broker; … … 96 98 // this.channel = Broker.getChannel(); 97 99 env = broker.getEnvironment(); 100 exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE); 101 multiExchange = multi + exchange; 102 replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE); 98 103 99 104 // set the serializer type … … 134 139 if (request.isAsync()) { 135 140 logger.debug("Publish async request -> " + request.getId()); 136 publish AsyncRequest(request);141 publishMessage(request, replyQueueName); 137 142 } else { 138 143 logger.debug("Publish sync request -> " + request.getId()); … … 151 156 152 157 if (request.isMulti()) { 153 exchange = multi + env.getProperty(ParameterQueue.RPC_EXCHANGE);158 exchange = multiExchange; 154 159 routingkey = ""; 155 160 } else { 156 exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);161 exchange = this.exchange; 157 162 routingkey = uid; 158 163 } … … 166 171 } 167 172 168 private void publishAsyncRequest(Request request) throws Exception {169 // Get the environment properties170 String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);171 publishMessage(request, replyQueueName);172 }173 174 173 private Object publishSyncRequest(Request request, Class<?> type) throws Exception { 175 174 String corrId = request.getId(); … … 177 176 int retries = request.getRetries(); 178 177 long timeout = request.getTimeout(); 179 180 // Get the environment properties181 String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);182 178 183 179 // Publish the message … … 296 292 297 293 /** 298 * Returns an instance of a proxy class for the specified interfaces that299 * dispatches method invocations to the specified invocation handler. * @param300 * loader301 *302 * @param loader303 * the class loader to define the proxy class304 *305 * @param interfaces306 * the list of interfaces for the proxy class to implement307 * @param proxy308 * the invocation handler to dispatch method invocations to309 * @return a proxy instance with the specified invocation handler of a proxy310 * class that is defined by the specified class loader and that311 * implements the specified interfaces312 */313 public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, Proxymq proxy) {314 return Proxy.newProxyInstance(loader, interfaces, proxy);315 }316 317 /**318 294 * Gets the Map used internally to retreive the response of the server 319 295 * -
trunk/src/main/java/omq/common/broker/Broker.java
r66 r70 2 2 3 3 import java.io.IOException; 4 import java.lang.reflect.Proxy; 4 5 import java.net.URL; 5 6 import java.util.HashMap; … … 10 11 import omq.Remote; 11 12 import omq.client.listener.ResponseListener; 13 import omq.client.proxy.MultiProxymq; 12 14 import omq.client.proxy.Proxymq; 13 15 import omq.common.event.Event; … … 46 48 private Map<String, RemoteObject> remoteObjs; 47 49 private Map<String, Object> proxies = new Hashtable<String, Object>(); 50 private Map<String, Object> multiProxies = new Hashtable<String, Object>(); 48 51 49 52 public Broker(Properties env) throws Exception { … … 73 76 responseListener.kill(); 74 77 eventDispatcher.kill(); 75 // TODO proxies = null; ??78 // TODO proxies = null; ?? 76 79 } 77 80 // Stop all the remote objects working … … 136 139 Proxymq proxy = new Proxymq(reference, contract, this); 137 140 Class<?>[] array = { contract }; 138 Object newProxy = Proxy mq.newProxyInstance(contract.getClassLoader(), array, proxy);141 Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy); 139 142 proxies.put(reference, newProxy); 140 143 return (T) newProxy; … … 147 150 } 148 151 152 @SuppressWarnings("unchecked") 153 public synchronized <T extends Remote> T lookupMulti(String reference, Class<T> contract) throws RemoteException { 154 try { 155 if (!multiProxies.containsKey(reference)) { 156 MultiProxymq proxy = new MultiProxymq(reference, contract, this); 157 Class<?>[] array = { contract }; 158 Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy); 159 multiProxies.put(reference, newProxy); 160 return (T) newProxy; 161 } 162 return (T) multiProxies.get(reference); 163 164 } catch (Exception e) { 165 throw new RemoteException(e); 166 } 167 } 168 149 169 public void bind(String reference, RemoteObject remote) throws RemoteException { 150 170 try { … … 155 175 } 156 176 } 157 177 158 178 public void startTriggerEvent(String reference, RemoteObject remote) throws RemoteException { 159 179 try { -
trunk/src/test/java/omq/test/event/MessageTest.java
r67 r70 3 3 import static org.junit.Assert.assertEquals; 4 4 5 import java.util.Arrays; 6 import java.util.Collection; 5 7 import java.util.Properties; 6 8 … … 9 11 import omq.common.util.Serializer; 10 12 13 import org.junit.After; 11 14 import org.junit.BeforeClass; 12 15 import org.junit.Test; 16 import org.junit.runner.RunWith; 17 import org.junit.runners.Parameterized; 18 import org.junit.runners.Parameterized.Parameters; 13 19 14 public class EventTest { 15 private static EventTriggerImpl trigger; 20 @RunWith(value = Parameterized.class) 21 public class MessageTest { 22 private static final String NAME = "message"; 23 24 private static Broker broker; 25 private static Message serverProxy; 26 private static MessageImpl clientM1; 27 private static MessageImpl clientM2; 28 29 public MessageTest(String type) throws Exception { 30 Properties env = new Properties(); 31 env.setProperty(ParameterQueue.USER_NAME, "guest"); 32 env.setProperty(ParameterQueue.USER_PASS, "guest"); 33 34 // Set host info of rabbimq (where it is) 35 env.setProperty(ParameterQueue.SERVER_HOST, "127.0.0.1"); 36 env.setProperty(ParameterQueue.SERVER_PORT, "5672"); 37 env.setProperty(ParameterQueue.DURABLE_QUEUES, "false"); 38 env.setProperty(ParameterQueue.SERIALIZER_NAME, type); 39 env.setProperty(ParameterQueue.ENABLECOMPRESSION, "false"); 40 41 // Set info about where the message will be sent 42 env.setProperty(ParameterQueue.RPC_EXCHANGE, "rpc_exchange"); 43 44 // Set info about the queue & the exchange where the ResponseListener 45 // will listen to. 46 env.setProperty(ParameterQueue.RPC_REPLY_QUEUE, "reply_queue"); 47 env.setProperty(ParameterQueue.EVENT_REPLY_QUEUE, "event_queue"); 48 49 broker = new Broker(env); 50 serverProxy = broker.lookupMulti(NAME, Message.class); 51 } 52 53 @Parameters 54 public static Collection<Object[]> data() { 55 Object[][] data = new Object[][] { { Serializer.JAVA }, { Serializer.GSON }, { Serializer.KRYO } }; 56 return Arrays.asList(data); 57 } 16 58 17 59 @BeforeClass 18 public static void server() throws Exception {60 public static void client() throws Exception { 19 61 Properties env = new Properties(); 20 62 env.setProperty(ParameterQueue.USER_NAME, "guest"); … … 31 73 env.setProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"); 32 74 33 trigger = new EventTriggerImpl();34 75 Broker broker = new Broker(env); 76 clientM1 = new MessageImpl(); 77 broker.bind(NAME, clientM1); 35 78 36 broker.startTriggerEvent("trigger", trigger); 79 Broker broker2 = new Broker(env); 80 clientM2 = new MessageImpl(); 81 broker2.bind(NAME, clientM2); 82 } 37 83 38 System.out.println("Server started"); 84 @After 85 public void stop() throws Exception { 86 broker.stopBroker(); 39 87 } 40 88 41 89 @Test 42 public void eventTest() throws Exception { 43 String expected = "This is an event"; 44 String actual = null; 90 public void test() throws Exception { 91 String expected = "Hello"; 45 92 46 Properties env = new Properties(); 47 env.setProperty(ParameterQueue.USER_NAME, "guest"); 48 env.setProperty(ParameterQueue.USER_PASS, "guest"); 93 serverProxy.setMessage(expected); 94 Thread.sleep(200); 95 assertEquals(expected, clientM1.getMessage()); 96 assertEquals(expected, clientM2.getMessage()); 97 serverProxy.setMessage(""); 98 Thread.sleep(200); 99 } 49 100 50 // Set host info of rabbimq (where it is)51 env.setProperty(ParameterQueue.SERVER_HOST, "127.0.0.1");52 env.setProperty(ParameterQueue.SERVER_PORT, "5672");53 env.setProperty(ParameterQueue.DURABLE_QUEUES, "false");54 env.setProperty(ParameterQueue.SERIALIZER_NAME, Serializer.JAVA);55 env.setProperty(ParameterQueue.ENABLECOMPRESSION, "false");56 57 // Set info about where the message will be sent58 env.setProperty(ParameterQueue.RPC_EXCHANGE, "rpc_exchange");59 // env.setProperty(ParameterQueue.DEBUGFILE, "c:\\middlewareDebug");60 61 // Set info about the queue & the exchange where the ResponseListener62 // will listen to.63 env.setProperty(ParameterQueue.RPC_REPLY_QUEUE, "reply_queue");64 env.setProperty(ParameterQueue.EVENT_REPLY_QUEUE, "event_queue");65 66 Broker broker = new Broker(env);67 EventTrigger et = broker.lookup("trigger", EventTrigger.class);68 69 MessageListener ms = new MessageListener();70 et.addListener(ms);71 trigger.triggerEvent(expected);72 Thread.sleep(500);73 actual = ms.getMessage();74 75 assertEquals(expected, actual);76 }77 101 }
Note: See TracChangeset
for help on using the changeset viewer.