Changeset 72 for trunk/src/main/java
- Timestamp:
- 06/29/13 20:44:27 (11 years ago)
- Location:
- trunk/src/main/java/omq
- Files:
-
- 1 deleted
- 9 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/src/main/java/omq/Remote.java
r44 r72 1 1 package omq; 2 2 3 import java.io.IOException;4 3 import java.io.Serializable; 5 import java.util.Collection;6 7 import omq.common.event.Event;8 import omq.common.event.EventListener;9 import omq.exception.SerializerException;10 4 11 5 /** … … 22 16 */ 23 17 public String getRef(); 24 25 public void notifyEvent(Event event) throws IOException, SerializerException;26 27 public void addListener(EventListener<?> eventListener) throws Exception;28 29 public void removeListener(EventListener<?> eventListener) throws Exception;30 31 public Collection<EventListener<?>> getListeners() throws Exception;32 18 } -
trunk/src/main/java/omq/client/proxy/Proxymq.java
r70 r72 1 1 package omq.client.proxy; 2 2 3 import java.io.IOException;4 3 import java.lang.reflect.Array; 5 4 import java.lang.reflect.InvocationHandler; 6 5 import java.lang.reflect.Method; 7 import java.util.Collection;8 6 import java.util.HashMap; 9 7 import java.util.Map; … … 16 14 import omq.client.listener.ResponseListener; 17 15 import omq.common.broker.Broker; 18 import omq.common.event.Event;19 import omq.common.event.EventDispatcher;20 import omq.common.event.EventListener;21 16 import omq.common.message.Request; 22 17 import omq.common.message.Response; … … 25 20 import omq.exception.OmqException; 26 21 import omq.exception.RetryException; 27 import omq.exception.SerializerException;28 22 import omq.exception.TimeoutException; 29 23 … … 55 49 private transient Broker broker; 56 50 private transient ResponseListener rListener; 57 private transient EventDispatcher dispatcher;58 51 private transient Serializer serializer; 59 52 private transient Properties env; 60 53 private transient Map<String, byte[]> results; 61 private transient Map<String, EventListener<?>> listeners;62 54 63 55 private static final Map<String, Class<?>> primitiveClasses = new HashMap<String, Class<?>>(); … … 92 84 this.broker = broker; 93 85 rListener = broker.getResponseListener(); 94 dispatcher = broker.getEventDispatcher();95 86 serializer = broker.getSerializer(); 96 87 … … 105 96 serializerType = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.JAVA); 106 97 107 listeners = new HashMap<String, EventListener<?>>();108 109 98 // Create a new hashmap and registry it in rListener 110 99 results = new HashMap<String, byte[]>(); … … 121 110 if (methodName.equals("getRef")) { 122 111 return getRef(); 123 } else if (methodName.equals("addListener")) {124 addListener((EventListener<?>) arguments[0]);125 return null;126 } else if (methodName.equals("removeListener")) {127 removeListener((EventListener<?>) arguments[0]);128 return null;129 } else if (methodName.equals("getListeners")) {130 return getListeners();131 112 } 132 113 } … … 306 287 } 307 288 308 @Override309 public void notifyEvent(Event event) throws IOException, SerializerException {310 }311 312 @Override313 public void addListener(EventListener<?> eventListener) throws Exception {314 if (eventListener.getTopic() == null) {315 eventListener.setTopic(uid);316 }317 listeners.put(eventListener.getTopic(), eventListener);318 dispatcher.addListener(eventListener);319 }320 321 @Override322 public void removeListener(EventListener<?> eventListener) throws Exception {323 listeners.remove(eventListener.getTopic());324 dispatcher.removeListener(eventListener);325 }326 327 @Override328 public Collection<EventListener<?>> getListeners() throws Exception {329 return listeners.values();330 }331 332 289 } -
trunk/src/main/java/omq/common/broker/Broker.java
r70 r72 13 13 import omq.client.proxy.MultiProxymq; 14 14 import omq.client.proxy.Proxymq; 15 import omq.common.event.Event;16 import omq.common.event.EventDispatcher;17 import omq.common.event.EventWrapper;18 15 import omq.common.util.OmqConnectionFactory; 19 16 import omq.common.util.ParameterQueue; … … 21 18 import omq.exception.InitBrokerException; 22 19 import omq.exception.RemoteException; 23 import omq.exception.SerializerException;24 20 import omq.server.RemoteObject; 25 21 … … 41 37 private Channel channel; 42 38 private ResponseListener responseListener; 43 private EventDispatcher eventDispatcher;44 39 private Serializer serializer; 45 40 private boolean clientStarted = false; … … 75 70 if (clientStarted) { 76 71 responseListener.kill(); 77 eventDispatcher.kill();78 72 // TODO proxies = null; ?? 79 73 } … … 211 205 responseListener.start(); 212 206 } 213 if (eventDispatcher == null) {214 eventDispatcher = new EventDispatcher(this);215 eventDispatcher.start();216 }217 }218 219 /**220 * This method sends an event with its information221 *222 * @param event223 * @throws IOException224 * @throws SerializerException225 */226 public void trigger(Event event) throws IOException, SerializerException {227 String UID = event.getTopic();228 EventWrapper wrapper = new EventWrapper(event);229 logger.debug("EventTrigger fanout exchange: " + UID + " Event topic: " + UID + " Event corrID: " + event.getCorrId());230 channel.exchangeDeclare(UID, "fanout");231 232 byte[] bytesResponse = serializer.serialize(wrapper);233 channel.basicPublish(UID, "", null, bytesResponse);234 207 } 235 208 … … 318 291 } 319 292 320 public EventDispatcher getEventDispatcher() {321 return eventDispatcher;322 }323 324 293 public Serializer getSerializer() { 325 294 return serializer; -
trunk/src/main/java/omq/common/util/Serializer.java
r62 r72 4 4 import java.util.Properties; 5 5 6 import omq.common.event.Event;7 6 import omq.common.message.Request; 8 7 import omq.common.message.Response; … … 48 47 try { 49 48 String className = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.JAVA); 50 49 51 50 if (className == null || className.isEmpty()) { 52 51 throw new ClassNotFoundException("Class name is null or empty."); 53 52 } 54 53 55 54 serializer = getInstance(className); 56 55 } catch (Exception ex) { … … 62 61 } 63 62 64 public ISerializer getInstance(String type) throws SerializerException { 63 public ISerializer getInstance(String type) throws SerializerException { 65 64 if (KRYO.equals(type)) { 66 65 if (kryoSerializer == null) { … … 79 78 return javaSerializer; 80 79 } 81 80 82 81 throw new SerializerException("Serializer not found."); 83 82 } … … 148 147 } 149 148 150 public Event deserializeEvent(byte[] bytes) throws SerializerException {151 ISerializer instance = getInstance();152 153 Boolean enableCompression = getEnableCompression();154 if (enableCompression) {155 try {156 byte[] unZippedBytes = Zipper.unzip(bytes);157 return instance.deserializeEvent(unZippedBytes);158 } catch (IOException e) {159 throw new SerializerException(e.getMessage(), e);160 }161 } else {162 return instance.deserializeEvent(bytes);163 }164 }165 166 // public static void removeSerializers() {167 // logger.warn("Removing serializers");168 // serializer = null;169 // kryoSerializer = null;170 // javaSerializer = null;171 // gsonSerializer = null;172 // }173 149 } -
trunk/src/main/java/omq/common/util/Serializers/GsonImp.java
r50 r72 3 3 import java.util.List; 4 4 5 import omq.common.event.Event;6 5 import omq.common.message.Request; 7 6 import omq.common.message.Response; … … 75 74 } 76 75 77 @Override78 public Event deserializeEvent(byte[] bytes) throws SerializerException {79 try {80 String json = new String(bytes);81 82 JsonParser parser = new JsonParser();83 JsonObject jsonObj = parser.parse(json).getAsJsonObject();84 85 String type = jsonObj.get("type").getAsString();86 87 JsonElement jsonElement = jsonObj.get("event");88 Event event;89 90 event = (Event) gson.fromJson(jsonElement, Class.forName(type));91 92 return event;93 } catch (Exception e) {94 throw new SerializerException("Deserialize event", e.getCause());95 }96 }97 98 76 } -
trunk/src/main/java/omq/common/util/Serializers/ISerializer.java
r44 r72 1 1 package omq.common.util.Serializers; 2 2 3 import omq.common.event.Event;4 3 import omq.common.message.Request; 5 4 import omq.common.message.Response; … … 18 17 19 18 public Response deserializeResponse(byte[] bytes, Class<?> type) throws SerializerException; 20 21 public Event deserializeEvent(byte[] bytes) throws SerializerException;22 19 } -
trunk/src/main/java/omq/common/util/Serializers/JavaImp.java
r44 r72 6 6 import java.io.ObjectOutputStream; 7 7 8 import omq.common.event.Event;9 import omq.common.event.EventWrapper;10 8 import omq.common.message.Request; 11 9 import omq.common.message.Response; … … 51 49 } 52 50 53 @Override54 public Event deserializeEvent(byte[] bytes) throws SerializerException {55 EventWrapper wrapper = (EventWrapper) deserializeObject(bytes);56 return wrapper.getEvent();57 }58 59 51 public Object deserializeObject(byte[] bytes) throws SerializerException { 60 52 try { -
trunk/src/main/java/omq/common/util/Serializers/KryoImp.java
r44 r72 3 3 import java.io.ByteArrayOutputStream; 4 4 5 import com.esotericsoftware.kryo.Kryo;6 import com.esotericsoftware.kryo.io.Input;7 import com.esotericsoftware.kryo.io.Output;8 9 import omq.common.event.Event;10 import omq.common.event.EventWrapper;11 5 import omq.common.message.Request; 12 6 import omq.common.message.Response; 13 7 import omq.exception.SerializerException; 14 8 import omq.server.RemoteObject; 9 10 import com.esotericsoftware.kryo.Kryo; 11 import com.esotericsoftware.kryo.io.Input; 12 import com.esotericsoftware.kryo.io.Output; 15 13 16 14 /** … … 53 51 } 54 52 55 @Override56 public Event deserializeEvent(byte[] bytes) throws SerializerException {57 EventWrapper wrapper = (EventWrapper) deserializeObject(bytes, EventWrapper.class);58 return wrapper.getEvent();59 }60 61 53 public Object deserializeObject(byte[] bytes, Class<?> type) throws SerializerException { 62 54 try { -
trunk/src/main/java/omq/server/RemoteObject.java
r67 r72 4 4 import java.lang.reflect.Method; 5 5 import java.util.ArrayList; 6 import java.util.Collection;7 6 import java.util.HashMap; 8 7 import java.util.List; … … 10 9 import java.util.Properties; 11 10 12 import org.apache.log4j.Logger;13 14 11 import omq.Remote; 15 12 import omq.common.broker.Broker; 16 import omq.common.event.Event;17 import omq.common.event.EventListener;18 import omq.common.event.EventWrapper;19 13 import omq.common.util.ParameterQueue; 20 import omq.common.util.Serializer;21 14 import omq.exception.SerializerException; 15 16 import org.apache.log4j.Logger; 22 17 23 18 import com.rabbitmq.client.Channel; … … 42 37 private Properties env; 43 38 private transient Broker broker; 44 private transient Serializer serializer;45 39 private transient RemoteWrapper remoteWrapper; 46 40 private transient Map<String, List<Class<?>>> params; … … 69 63 multiQueue = UID + System.currentTimeMillis(); 70 64 env = broker.getEnvironment(); 71 serializer = broker.getSerializer();72 65 73 66 params = new HashMap<String, List<Class<?>>>(); … … 93 86 this.broker = broker; 94 87 UID = reference; 95 serializer = broker.getSerializer();96 88 if (channel == null || !channel.isOpen()) { 97 89 channel = broker.getChannel(); … … 141 133 } 142 134 143 @Override144 public void notifyEvent(Event event) throws IOException, SerializerException {145 String corrID = java.util.UUID.randomUUID().toString();146 event.setTopic(UID);147 event.setCorrId(corrID);148 EventWrapper wrapper = new EventWrapper(event);149 channel.exchangeDeclare(UID, "fanout");150 channel.basicPublish(UID, "", null, serializer.serialize(wrapper));151 logger.debug("Sending event-> topic: " + UID + ", corrID: " + corrID);152 }153 154 135 public void kill() throws IOException { 155 136 logger.warn("Killing objectmq: " + this.getRef()); … … 265 246 } 266 247 267 @Override268 public void addListener(EventListener<?> eventListener) throws Exception {269 }270 271 @Override272 public void removeListener(EventListener<?> eventListener) throws Exception {273 }274 275 @Override276 public Collection<EventListener<?>> getListeners() throws Exception {277 return null;278 }279 280 248 }
Note: See TracChangeset
for help on using the changeset viewer.