Changeset 4
- Timestamp:
- 05/07/13 10:46:22 (12 years ago)
- Location:
- objectmq/src/omq
- Files:
-
- 2 added
- 2 deleted
- 7 edited
- 2 moved
Legend:
- Unmodified
- Added
- Removed
-
objectmq/src/omq/client/proxy/Proxymq.java
r3 r4 25 25 import omq.exception.NoContainsInstanceException; 26 26 27 28 27 import com.rabbitmq.client.Channel; 29 28 … … 35 34 * 36 35 */ 37 public class EvoProxyimplements InvocationHandler, Remote {36 public class Proxymq implements InvocationHandler, Remote { 38 37 39 38 /** … … 43 42 private static Map<String, Object> proxies = new Hashtable<String, Object>(); 44 43 45 private String objUid;44 private String uid; 46 45 private transient ResponseListener rListener; 47 46 private transient EventDispatcher dispatcher; … … 55 54 * EvoProxy Constructor. 56 55 * 57 * This constructor uses an objUid to know which object will call. It also58 * usesProperties to set where to send the messages59 * 60 * @param objUid61 * The objUid represents the unique identifier of a remote object56 * This constructor uses an uid to know which object will call. It also uses 57 * Properties to set where to send the messages 58 * 59 * @param uid 60 * The uid represents the unique identifier of a remote object 62 61 * @param env 63 62 * The environment is used to know where to send the messages 64 63 * @throws Exception 65 64 */ 66 public EvoProxy(String objUid, Properties env) throws Exception {67 this. objUid = objUid;65 public Proxymq(String uid, Properties env) throws Exception { 66 this.uid = uid; 68 67 this.rListener = ResponseListener.getRequestListener(); 69 68 this.dispatcher = EventDispatcher.getDispatcher(); … … 85 84 * EvoProxy Constructor. 86 85 * 87 * This constructor uses an objUid to know which object will call. It also88 * usesProperties to set where to send the messages89 * 90 * @param objUid91 * The objUid represents the unique identifier of a remote object86 * This constructor uses an uid to know which object will call. It also uses 87 * Properties to set where to send the messages 88 * 89 * @param uid 90 * The uid represents the unique identifier of a remote object 92 91 * @param clazz 93 92 * It represents the real class of the remote object. With this … … 98 97 * @throws Exception 99 98 */ 100 public EvoProxy(String objUid, Class<?> clazz, Properties env) throws Exception {101 this. objUid = objUid;99 public Proxymq(String uid, Class<?> clazz, Properties env) throws Exception { 100 this.uid = uid; 102 101 this.rListener = ResponseListener.getRequestListener(); 103 102 this.dispatcher = EventDispatcher.getDispatcher(); … … 132 131 } else if (methodName.equals("getListeners")) { 133 132 return getListeners(); 134 } 135 // else if (methodName.equals("notify")) { 136 // // notify fanout 137 // } 133 } 138 134 } 139 135 … … 176 172 timeout = sync.timeout(); 177 173 } 178 return new SyncRequest(this. objUid, corrId, methodName, args, timeout, retries);174 return new SyncRequest(this.uid, corrId, methodName, args, timeout, retries); 179 175 } else { 180 176 String topic = method.getAnnotation(AsyncMethod.class).generateEvent(); 181 return new AsyncRequest(this. objUid, corrId, methodName, args, topic);177 return new AsyncRequest(this.uid, corrId, methodName, args, topic); 182 178 } 183 179 } … … 254 250 * implements the specified interfaces 255 251 */ 256 public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, EvoProxyproxy) {252 public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, Proxymq proxy) { 257 253 if (proxies.containsKey(proxy.getRef())) { 258 254 System.out.println("Proxy trobat"); … … 279 275 public void addListener(EventListener eventListener) throws Exception { 280 276 if (eventListener.getTopic() == null) { 281 eventListener.setTopic( objUid);277 eventListener.setTopic(uid); 282 278 } 283 279 listeners.put(eventListener.getTopic(), eventListener); … … 295 291 @Override 296 292 public String getRef() { 297 return objUid;293 return uid; 298 294 } 299 295 -
objectmq/src/omq/client/remote/response/ResponseListener.java
r3 r4 5 5 import java.util.Properties; 6 6 7 import omq.client.proxy. EvoProxy;7 import omq.client.proxy.Proxymq; 8 8 import omq.common.message.response.Response; 9 9 import omq.common.remote.RemoteListener; … … 157 157 158 158 // Revisar això 159 public void registerProxy( EvoProxyproxy) {159 public void registerProxy(Proxymq proxy) { 160 160 if (!results.containsKey(proxy.getRef())) { 161 161 results.put(proxy.getRef(), proxy.getResults()); -
objectmq/src/omq/common/event/EventDispatcher.java
r3 r4 6 6 import java.util.Vector; 7 7 8 import omq.common.remote. RevoConnectionFactory;8 import omq.common.remote.OmqConnectionFactory; 9 9 import omq.common.util.ParameterQueue; 10 10 import omq.common.util.Serializer; … … 43 43 44 44 // Get a new connection and a new channel 45 connection = RevoConnectionFactory.getConnection(env);45 connection = OmqConnectionFactory.getConnection(env); 46 46 channel = connection.createChannel(); 47 47 -
objectmq/src/omq/common/event/EventTrigger.java
r3 r4 6 6 import java.util.Properties; 7 7 8 import omq.common.remote. RevoConnectionFactory;8 import omq.common.remote.OmqConnectionFactory; 9 9 import omq.common.util.RevoEnvironment; 10 10 import omq.common.util.Serializer; … … 28 28 29 29 private EventTrigger(Properties env) throws IOException, KeyManagementException, NoSuchAlgorithmException { 30 connection = RevoConnectionFactory.getConnection(env);30 connection = OmqConnectionFactory.getConnection(env); 31 31 channel = connection.createChannel(); 32 32 } -
objectmq/src/omq/common/message/response/ProxyResponse.java
r3 r4 3 3 import java.util.Properties; 4 4 5 import omq.client.proxy. EvoProxy;5 import omq.client.proxy.Proxymq; 6 6 7 7 … … 31 31 @Override 32 32 public Object getResp(Properties env) throws Exception { 33 if (! EvoProxy.containsProxy(reference)) {33 if (!Proxymq.containsProxy(reference)) { 34 34 Class<?> clazz = Class.forName(remoteInterface); 35 EvoProxy evoProxy = new EvoProxy(reference, clazz, env);35 Proxymq evoProxy = new Proxymq(reference, clazz, env); 36 36 Class<?>[] array = { clazz }; 37 return EvoProxy.newProxyInstance(clazz.getClassLoader(), array, evoProxy);37 return Proxymq.newProxyInstance(clazz.getClassLoader(), array, evoProxy); 38 38 } 39 return EvoProxy.getInstance(reference);39 return Proxymq.getInstance(reference); 40 40 } 41 41 -
objectmq/src/omq/common/remote/OmqConnectionFactory.java
r3 r4 17 17 * 18 18 */ 19 public class RevoConnectionFactory {19 public class OmqConnectionFactory { 20 20 public static Connection getConnection(Properties env) throws IOException, KeyManagementException, NoSuchAlgorithmException { 21 21 // Get login info of rabbitmq -
objectmq/src/omq/common/remote/RemoteListener.java
r3 r4 37 37 38 38 private void startConnection(Properties env) throws Exception { 39 connection = RevoConnectionFactory.getConnection(env);39 connection = OmqConnectionFactory.getConnection(env); 40 40 channel = connection.createChannel(); 41 41 } -
objectmq/src/omq/common/util/ParameterQueue.java
r3 r4 35 35 public static String RPC_TYPE = "direct"; 36 36 37 public static String NUM_THREADS = "omq.num_threads"; 38 37 39 public static String REGISTRY_NAME = "REGISTRY"; 38 40 -
objectmq/src/omq/server/remote/request/RemoteObject.java
r3 r4 1 1 package omq.server.remote.request; 2 2 3 import java.io.IOException; 3 4 import java.lang.reflect.InvocationTargetException; 4 5 import java.lang.reflect.Method; 5 import java.util.ArrayList;6 6 import java.util.Collection; 7 7 import java.util.HashMap; 8 8 import java.util.Map; 9 import java.util.Properties; 9 10 import java.util.Vector; 10 11 11 12 import omq.Remote; 13 import omq.common.broker.Broker; 12 14 import omq.common.event.EventListener; 13 15 import omq.common.event.EventTrigger; 14 import omq.common.message.response.CollectionResponse;15 16 import omq.common.message.response.DefaultResponse; 16 17 import omq.common.message.response.ExceptionResponse; 17 import omq.common.message.response.ProxyResponse;18 18 import omq.common.message.response.Response; 19 19 import omq.common.util.ParameterQueue; 20 import omq.exception.SerializerException; 20 21 21 22 import com.rabbitmq.client.Channel; 23 import com.rabbitmq.client.Connection; 24 import com.rabbitmq.client.ConsumerCancelledException; 25 import com.rabbitmq.client.QueueingConsumer; 26 import com.rabbitmq.client.QueueingConsumer.Delivery; 27 import com.rabbitmq.client.ShutdownSignalException; 22 28 23 29 /** … … 26 32 * 27 33 */ 28 public abstract class RemoteObject implements Remote {34 public abstract class RemoteObject extends Thread implements Remote { 29 35 30 36 private static final long serialVersionUID = -1778953938739846450L; 31 37 32 38 private String UID; 39 private RemoteWrapper remoteWrapper; 40 private Connection connection; 41 private Channel channel; 42 private QueueingConsumer consumer; 43 private boolean killed = false; 33 44 34 45 private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>(); … … 44 55 } 45 56 46 public RemoteObject(String UID) throws Exception { 47 this.UID = UID; 48 registerToRequestListener(); 49 declareEventTopic(); 50 } 51 52 public RemoteObject() throws Exception { 53 this.UID = java.util.UUID.randomUUID().toString(); 54 System.out.println("New Object its UID is " + this.UID); 55 registerToRequestListener(); 56 declareEventTopic(); 57 } 58 59 /** 60 * Registers this object in the RequestListener 61 * 62 * @throws Exception 63 */ 64 private void registerToRequestListener() throws Exception { 65 RequestListener rListener = RequestListener.getRequestListener(); 66 rListener.addObj(this); 67 } 68 69 private void declareEventTopic() throws Exception { 70 RequestListener rListener = RequestListener.getRequestListener(); 71 Channel channel = rListener.getChannel(); 57 public void start(String reference, Properties env) throws Exception { 58 this.UID = reference; 59 60 // Get num threads to use 61 int numThreads = Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS)); 62 remoteWrapper = new RemoteWrapper(this, numThreads); 63 64 // Get info about which exchange and queue will use 65 String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE); 66 String queue = UID; 67 String routingKey = UID; 68 69 // Start connection and channel 70 connection = Broker.getConnection(); 71 channel = connection.createChannel(); 72 73 // Declares and bindings 74 channel.exchangeDeclare(exchange, "direct"); 75 channel.queueDeclare(queue, false, false, false, null); 76 channel.queueBind(queue, exchange, routingKey); 77 78 // Declare the event topic fanout 72 79 channel.exchangeDeclare(UID, "fanout"); 73 channel.close(); 80 81 // Declare a new consumer 82 consumer = new QueueingConsumer(channel); 83 channel.basicConsume(queue, true, consumer); 84 85 // Start this listener 86 this.start(); 87 } 88 89 @Override 90 public void run() { 91 while (!killed) { 92 try { 93 Delivery delivery = consumer.nextDelivery(); 94 remoteWrapper.notifyDelivery(delivery); 95 } catch (InterruptedException i) { 96 i.printStackTrace(); 97 } catch (ShutdownSignalException e) { 98 e.printStackTrace(); 99 } catch (ConsumerCancelledException e) { 100 e.printStackTrace(); 101 } catch (SerializerException e) { 102 e.printStackTrace(); 103 } catch (Exception e) { 104 e.printStackTrace(); 105 } 106 } 74 107 } 75 108 … … 79 112 } 80 113 114 public void kill() throws IOException { 115 interrupt(); 116 killed = true; 117 channel.close(); 118 connection.close(); 119 remoteWrapper.stopRemoteWrapper(); 120 } 121 81 122 @Override 82 123 public Response invokeMethod(String methodName, Vector<Object> args) throws Exception { 83 Response resp = null;84 85 124 Object[] arguments = new Object[args.size()]; 86 125 87 126 for (int i = 0; i < args.size(); i++) { 88 127 Object arg = args.get(i); 89 if (arg instanceof Remote) { 90 arg = RequestListener.getRequestListener().getObj(((Remote) arg).getRef()); 91 } 128 // TODO: what happens if the object is a remote object? 92 129 arguments[i] = arg; 93 130 } … … 98 135 try { 99 136 Object result = method.invoke(this, arguments); 100 // TODO see if a result is a collection and if it has some remote 101 // objects 102 if (result instanceof Remote) { 103 resp = getProxyResponse((Remote) result); 104 } else if (result instanceof Collection<?>) { 105 Collection<?> collection = (Collection<?>) result; 106 boolean containsRemote = false; 107 for (Object o : collection) { 108 if (o instanceof Remote) { 109 containsRemote = true; 110 break; 111 } 112 } 113 if (containsRemote) { 114 Collection<Response> responses = new ArrayList<Response>(); 115 for (Object o : collection) { 116 if (o instanceof Remote) { 117 responses.add(getProxyResponse((Remote) o)); 118 } else { 119 responses.add(new DefaultResponse(this.getRef(), o)); 120 } 121 } 122 String collectionType = collection.getClass().getCanonicalName(); 123 resp = new CollectionResponse(this.getRef(), collectionType, responses); 124 } else { 125 resp = new DefaultResponse(this.getRef(), result); 126 } 127 } else { 128 resp = new DefaultResponse(this.getRef(), result); 129 } 137 return new DefaultResponse(this.getRef(), result); 130 138 } catch (InvocationTargetException e) { 131 resp = new ExceptionResponse(this.getRef(), e.getTargetException()); 132 } 133 134 return resp; 139 return new ExceptionResponse(this.getRef(), e.getTargetException()); 140 } 135 141 } 136 142 … … 186 192 } 187 193 188 private Response getProxyResponse(Remote r) {189 String reference = r.getRef();190 String remoteInterface = r.getClass().getInterfaces()[0].getCanonicalName();191 return new ProxyResponse(this.getRef(), reference, remoteInterface);192 }193 194 194 @Override 195 195 public void notify(Object obj) throws Exception {
Note: See TracChangeset
for help on using the changeset viewer.