Changeset 70 for trunk/src/main/java


Ignore:
Timestamp:
06/28/13 12:41:11 (11 years ago)
Author:
stoda
Message:

MultiProxymq? added

Location:
trunk/src/main/java/omq
Files:
1 added
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/main/java/omq/client/proxy/Proxymq.java

    r62 r70  
    55import java.lang.reflect.InvocationHandler;
    66import java.lang.reflect.Method;
    7 import java.lang.reflect.Proxy;
    87import java.util.Collection;
    98import java.util.HashMap;
     
    5049
    5150        private String uid;
     51        private transient String exchange;
     52        private transient String multiExchange;
     53        private transient String replyQueueName;
    5254        private transient String serializerType;
    5355        private transient Broker broker;
     
    9698                // this.channel = Broker.getChannel();
    9799                env = broker.getEnvironment();
     100                exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
     101                multiExchange = multi + exchange;
     102                replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
    98103
    99104                // set the serializer type
     
    134139                if (request.isAsync()) {
    135140                        logger.debug("Publish async request -> " + request.getId());
    136                         publishAsyncRequest(request);
     141                        publishMessage(request, replyQueueName);
    137142                } else {
    138143                        logger.debug("Publish sync request -> " + request.getId());
     
    151156
    152157                if (request.isMulti()) {
    153                         exchange = multi + env.getProperty(ParameterQueue.RPC_EXCHANGE);
     158                        exchange = multiExchange;
    154159                        routingkey = "";
    155160                } else {
    156                         exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
     161                        exchange = this.exchange;
    157162                        routingkey = uid;
    158163                }
     
    166171        }
    167172
    168         private void publishAsyncRequest(Request request) throws Exception {
    169                 // Get the environment properties
    170                 String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
    171                 publishMessage(request, replyQueueName);
    172         }
    173 
    174173        private Object publishSyncRequest(Request request, Class<?> type) throws Exception {
    175174                String corrId = request.getId();
     
    177176                int retries = request.getRetries();
    178177                long timeout = request.getTimeout();
    179 
    180                 // Get the environment properties
    181                 String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
    182178
    183179                // Publish the message
     
    296292
    297293        /**
    298          * Returns an instance of a proxy class for the specified interfaces that
    299          * dispatches method invocations to the specified invocation handler. * @param
    300          * loader
    301          *
    302          * @param loader
    303          *            the class loader to define the proxy class
    304          *
    305          * @param interfaces
    306          *            the list of interfaces for the proxy class to implement
    307          * @param proxy
    308          *            the invocation handler to dispatch method invocations to
    309          * @return a proxy instance with the specified invocation handler of a proxy
    310          *         class that is defined by the specified class loader and that
    311          *         implements the specified interfaces
    312          */
    313         public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, Proxymq proxy) {
    314                 return Proxy.newProxyInstance(loader, interfaces, proxy);
    315         }
    316 
    317         /**
    318294         * Gets the Map used internally to retreive the response of the server
    319295         *
  • trunk/src/main/java/omq/common/broker/Broker.java

    r66 r70  
    22
    33import java.io.IOException;
     4import java.lang.reflect.Proxy;
    45import java.net.URL;
    56import java.util.HashMap;
     
    1011import omq.Remote;
    1112import omq.client.listener.ResponseListener;
     13import omq.client.proxy.MultiProxymq;
    1214import omq.client.proxy.Proxymq;
    1315import omq.common.event.Event;
     
    4648        private Map<String, RemoteObject> remoteObjs;
    4749        private Map<String, Object> proxies = new Hashtable<String, Object>();
     50        private Map<String, Object> multiProxies = new Hashtable<String, Object>();
    4851
    4952        public Broker(Properties env) throws Exception {
     
    7376                        responseListener.kill();
    7477                        eventDispatcher.kill();
    75                         //TODO proxies = null; ??
     78                        // TODO proxies = null; ??
    7679                }
    7780                // Stop all the remote objects working
     
    136139                                Proxymq proxy = new Proxymq(reference, contract, this);
    137140                                Class<?>[] array = { contract };
    138                                 Object newProxy = Proxymq.newProxyInstance(contract.getClassLoader(), array, proxy);
     141                                Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy);
    139142                                proxies.put(reference, newProxy);
    140143                                return (T) newProxy;
     
    147150        }
    148151
     152        @SuppressWarnings("unchecked")
     153        public synchronized <T extends Remote> T lookupMulti(String reference, Class<T> contract) throws RemoteException {
     154                try {
     155                        if (!multiProxies.containsKey(reference)) {
     156                                MultiProxymq proxy = new MultiProxymq(reference, contract, this);
     157                                Class<?>[] array = { contract };
     158                                Object newProxy = Proxy.newProxyInstance(contract.getClassLoader(), array, proxy);
     159                                multiProxies.put(reference, newProxy);
     160                                return (T) newProxy;
     161                        }
     162                        return (T) multiProxies.get(reference);
     163
     164                } catch (Exception e) {
     165                        throw new RemoteException(e);
     166                }
     167        }
     168
    149169        public void bind(String reference, RemoteObject remote) throws RemoteException {
    150170                try {
     
    155175                }
    156176        }
    157        
     177
    158178        public void startTriggerEvent(String reference, RemoteObject remote) throws RemoteException {
    159179                try {
Note: See TracChangeset for help on using the changeset viewer.