Changeset 70 for trunk/src/main/java
- Timestamp:
- 06/28/13 12:41:11 (11 years ago)
- Location:
- trunk/src/main/java/omq
- Files:
-
- 1 added
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/src/main/java/omq/client/proxy/Proxymq.java
r62 r70 5 5 import java.lang.reflect.InvocationHandler; 6 6 import java.lang.reflect.Method; 7 import java.lang.reflect.Proxy;8 7 import java.util.Collection; 9 8 import java.util.HashMap; … … 50 49 51 50 private String uid; 51 private transient String exchange; 52 private transient String multiExchange; 53 private transient String replyQueueName; 52 54 private transient String serializerType; 53 55 private transient Broker broker; … … 96 98 // this.channel = Broker.getChannel(); 97 99 env = broker.getEnvironment(); 100 exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE); 101 multiExchange = multi + exchange; 102 replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE); 98 103 99 104 // set the serializer type … … 134 139 if (request.isAsync()) { 135 140 logger.debug("Publish async request -> " + request.getId()); 136 publish AsyncRequest(request);141 publishMessage(request, replyQueueName); 137 142 } else { 138 143 logger.debug("Publish sync request -> " + request.getId()); … … 151 156 152 157 if (request.isMulti()) { 153 exchange = multi + env.getProperty(ParameterQueue.RPC_EXCHANGE);158 exchange = multiExchange; 154 159 routingkey = ""; 155 160 } else { 156 exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);161 exchange = this.exchange; 157 162 routingkey = uid; 158 163 } … … 166 171 } 167 172 168 private void publishAsyncRequest(Request request) throws Exception {169 // Get the environment properties170 String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);171 publishMessage(request, replyQueueName);172 }173 174 173 private Object publishSyncRequest(Request request, Class<?> type) throws Exception { 175 174 String corrId = request.getId(); … … 177 176 int retries = request.getRetries(); 178 177 long timeout = request.getTimeout(); 179 180 // Get the environment properties181 String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);182 178 183 179 // Publish the message … … 296 292 297 293 /** 298 * Returns an instance of a proxy class for the specified interfaces that299 * dispatches method invocations to the specified invocation handler. * @param300 * loader301 *302 * @param loader303 * the class loader to define the proxy class304 *305 * @param interfaces306 * the list of interfaces for the proxy class to implement307 * @param proxy308 * the invocation handler to dispatch method invocations to309 * @return a proxy instance with the specified invocation handler of a proxy310 * class that is defined by the specified class loader and that311 * implements the specified interfaces312 */313 public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, Proxymq proxy) {314 return Proxy.newProxyInstance(loader, interfaces, proxy);315 }316 317 /**318 294 * Gets the Map used internally to retreive the response of the server 319 295 * -
trunk/src/main/java/omq/common/broker/Broker.java
r66 r70 2 2 3 3 import java.io.IOException; 4 import java.lang.reflect.Proxy; 4 5 import java.net.URL; 5 6 import java.util.HashMap; … … 10 11 import omq.Remote; 11 12 import omq.client.listener.ResponseListener; 13 import omq.client.proxy.MultiProxymq; 12 14 import omq.client.proxy.Proxymq; 13 15 import omq.common.event.Event; … … 46 48 private Map<String, RemoteObject> remoteObjs; 47 49 private Map<String, Object> proxies = new Hashtable<String, Object>(); 50 private Map<String, Object> multiProxies = new Hashtable<String, Object>(); 48 51 49 52 public Broker(Properties env) throws Exception { … … 73 76 responseListener.kill(); 74 77 eventDispatcher.kill(); 75 // TODO proxies = null; ??78 // TODO proxies = null; ?? 76 79 } 77 80 // Stop all the remote objects working … … 136 139 Proxymq proxy = new Proxymq(reference, contract, this); 137 140 Class<?>[] array = { contract }; 138 Object newProxy = Proxy mq.newProxyInstance(contract.getClassLoader(), array, proxy);141 Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy); 139 142 proxies.put(reference, newProxy); 140 143 return (T) newProxy; … … 147 150 } 148 151 152 @SuppressWarnings("unchecked") 153 public synchronized <T extends Remote> T lookupMulti(String reference, Class<T> contract) throws RemoteException { 154 try { 155 if (!multiProxies.containsKey(reference)) { 156 MultiProxymq proxy = new MultiProxymq(reference, contract, this); 157 Class<?>[] array = { contract }; 158 Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy); 159 multiProxies.put(reference, newProxy); 160 return (T) newProxy; 161 } 162 return (T) multiProxies.get(reference); 163 164 } catch (Exception e) { 165 throw new RemoteException(e); 166 } 167 } 168 149 169 public void bind(String reference, RemoteObject remote) throws RemoteException { 150 170 try { … … 155 175 } 156 176 } 157 177 158 178 public void startTriggerEvent(String reference, RemoteObject remote) throws RemoteException { 159 179 try {
Note: See TracChangeset
for help on using the changeset viewer.