- Timestamp:
- 05/20/13 15:35:46 (11 years ago)
- Location:
- trunk/objectmq/src/omq
- Files:
-
- 4 added
- 10 edited
- 1 moved
Legend:
- Unmodified
- Added
- Removed
-
trunk/objectmq/src/omq/Remote.java
r9 r14 1 1 package omq; 2 2 3 import java.io.IOException; 3 4 import java.io.Serializable; 5 6 import omq.common.event.Event; 7 import omq.exception.SerializerException; 4 8 5 9 /** … … 16 20 */ 17 21 public String getRef(); 22 23 public void notifyEvent(Event event) throws IOException, SerializerException; 18 24 } -
trunk/objectmq/src/omq/client/proxy/Proxymq.java
r10 r14 14 14 import omq.client.annotation.SyncMethod; 15 15 import omq.client.remote.response.ResponseListener; 16 import omq.common.event.Event; 16 17 import omq.common.message.Request; 17 18 import omq.common.message.Response; … … 267 268 } 268 269 270 @Override 271 public void notifyEvent(Event event) throws IOException, SerializerException { 272 273 } 274 269 275 } -
trunk/objectmq/src/omq/common/broker/Broker.java
r11 r14 1 1 package omq.common.broker; 2 2 3 import java.io.IOException; 3 4 import java.util.Properties; 4 5 … … 6 7 import omq.client.proxy.Proxymq; 7 8 import omq.client.remote.response.ResponseListener; 8 import omq.common.remote.OmqConnectionFactory;9 9 import omq.common.util.Environment; 10 import omq.common.util.OmqConnectionFactory; 10 11 import omq.exception.EnvironmentException; 11 12 import omq.exception.RemoteException; … … 24 25 } catch (EnvironmentException ex) { // environment not set. 25 26 Environment.setEnvironment(env); 26 connection = OmqConnectionFactory.get Connection(env);27 connection = OmqConnectionFactory.getNewConnection(env); 27 28 channel = connection.createChannel(); 28 29 } … … 36 37 public static Channel getChannel() throws Exception { 37 38 return channel; 39 } 40 41 public static Channel getNewChannel() throws IOException { 42 return connection.createChannel(); 38 43 } 39 44 -
trunk/objectmq/src/omq/common/remote/RemoteListener.java
r9 r14 4 4 import java.io.IOException; 5 5 import java.util.Properties; 6 7 import omq.common.util.OmqConnectionFactory; 6 8 7 9 import com.rabbitmq.client.Channel; … … 14 16 * 15 17 */ 18 //TODO aquesta classe es pot eliminar 16 19 public abstract class RemoteListener extends Thread { 17 20 private static String defaultXml = "eventListener.xml"; … … 37 40 38 41 private void startConnection(Properties env) throws Exception { 39 connection = OmqConnectionFactory.get Connection(env);42 connection = OmqConnectionFactory.getNewConnection(env); 40 43 channel = connection.createChannel(); 41 44 } -
trunk/objectmq/src/omq/common/util/OmqConnectionFactory.java
r13 r14 1 package omq.common. remote;1 package omq.common.util; 2 2 3 3 import java.io.IOException; … … 6 6 import java.util.Properties; 7 7 8 import omq.common.util.ParameterQueue; 9 10 8 import com.rabbitmq.client.Channel; 11 9 import com.rabbitmq.client.Connection; 12 10 import com.rabbitmq.client.ConnectionFactory; … … 18 16 */ 19 17 public class OmqConnectionFactory { 20 public static Connection getConnection(Properties env) throws IOException, KeyManagementException, NoSuchAlgorithmException { 18 private static Connection connection; 19 20 public static void init(Properties env) throws KeyManagementException, NoSuchAlgorithmException, IOException { 21 if (connection == null) { 22 connection = getNewConnection(env); 23 } 24 } 25 26 public static Connection getNewConnection(Properties env) throws IOException, KeyManagementException, NoSuchAlgorithmException { 21 27 // Get login info of rabbitmq 22 28 String username = env.getProperty(ParameterQueue.USER_NAME); … … 40 46 return factory.newConnection(); 41 47 } 48 49 public static Channel getNewChannel() throws IOException { 50 return connection.createChannel(); 51 } 42 52 } -
trunk/objectmq/src/omq/common/util/Serializer.java
r11 r14 4 4 import java.util.Properties; 5 5 6 import omq.common.event.Event; 6 7 import omq.common.message.Request; 7 8 import omq.common.message.Response; … … 19 20 public static ISerializer serializer; 20 21 21 private static Boolean getEnableCompression() {22 private static Boolean getEnableCompression() { 22 23 Boolean enableCompression = false; 23 24 try { 24 25 Properties env = Environment.getEnvironment(); 25 26 enableCompression = Boolean.valueOf(env.getProperty(ParameterQueue.ENABLECOMPRESSION, "false")); 26 } catch (EnvironmentException e) { } 27 27 } catch (EnvironmentException e) { 28 } 29 28 30 return enableCompression; 29 31 } 30 31 public static ISerializer getInstance() throws SerializerException { 32 33 public static ISerializer getInstance() throws SerializerException { 32 34 if (serializer == null) { 33 try {34 Properties env = Environment.getEnvironment(); 35 try { 36 Properties env = Environment.getEnvironment(); 35 37 String className = env.getProperty(ParameterQueue.SERIALIZERNAME, "omq.common.util.Serializers.JavaImp"); 36 38 37 39 serializer = (ISerializer) Class.forName(className).newInstance(); 38 } catch (Exception ex) {40 } catch (Exception ex) { 39 41 throw new SerializerException(ex.getMessage(), ex); 40 42 } 41 43 } 42 44 43 45 return serializer; 44 46 } … … 46 48 public static byte[] serialize(Object obj) throws SerializerException { 47 49 ISerializer instance = getInstance(); 48 49 Boolean enableCompression = getEnableCompression(); 50 if (enableCompression){50 51 Boolean enableCompression = getEnableCompression(); 52 if (enableCompression) { 51 53 byte[] objSerialized = instance.serialize(obj); 52 54 try { … … 54 56 } catch (IOException e) { 55 57 throw new SerializerException(e.getMessage(), e); 56 } 57 } else {58 } 59 } else { 58 60 return instance.serialize(obj); 59 } 61 } 60 62 } 61 63 62 64 public static Request deserializeRequest(byte[] bytes, RemoteObject obj) throws SerializerException { 63 65 ISerializer instance = getInstance(); 64 66 65 67 Boolean enableCompression = getEnableCompression(); 66 if (enableCompression){68 if (enableCompression) { 67 69 try { 68 70 byte[] unZippedBytes = Zipper.unzip(bytes); … … 71 73 throw new SerializerException(e.getMessage(), e); 72 74 } 73 } else {75 } else { 74 76 return instance.deserializeRequest(bytes, obj); 75 } 77 } 76 78 } 77 79 78 80 public static Response deserializeResponse(byte[] bytes, Class<?> type) throws SerializerException { 79 81 ISerializer instance = getInstance(); 80 82 81 83 Boolean enableCompression = getEnableCompression(); 82 if (enableCompression){84 if (enableCompression) { 83 85 try { 84 86 byte[] unZippedBytes = Zipper.unzip(bytes); … … 87 89 throw new SerializerException(e.getMessage(), e); 88 90 } 89 } else {91 } else { 90 92 return instance.deserializeResponse(bytes, type); 91 } 93 } 94 } 95 96 public static Event deserializeEvent(byte[] bytes) throws SerializerException { 97 ISerializer instance = getInstance(); 98 99 Boolean enableCompression = getEnableCompression(); 100 if (enableCompression) { 101 try { 102 byte[] unZippedBytes = Zipper.unzip(bytes); 103 return instance.deserializeEvent(unZippedBytes); 104 } catch (IOException e) { 105 throw new SerializerException(e.getMessage(), e); 106 } 107 } else { 108 return instance.deserializeEvent(bytes); 109 } 92 110 } 93 111 } -
trunk/objectmq/src/omq/common/util/Serializers/GsonImp.java
r10 r14 3 3 import java.util.List; 4 4 5 import omq.common.event.Event; 5 6 import omq.common.message.Request; 6 7 import omq.common.message.Response; … … 71 72 } 72 73 74 @Override 75 public Event deserializeEvent(byte[] unZippedBytes) throws SerializerException { 76 // TODO deserializeEvent class<?> ¿? 77 return null; 78 } 79 80 81 73 82 } -
trunk/objectmq/src/omq/common/util/Serializers/ISerializer.java
r9 r14 1 1 package omq.common.util.Serializers; 2 2 3 import omq.common.event.Event; 3 4 import omq.common.message.Request; 4 5 import omq.common.message.Response; … … 17 18 18 19 public Response deserializeResponse(byte[] bytes, Class<?> type) throws SerializerException; 20 21 public Event deserializeEvent(byte[] bytes) throws SerializerException; 19 22 } -
trunk/objectmq/src/omq/common/util/Serializers/JavaImp.java
r9 r14 6 6 import java.io.ObjectOutputStream; 7 7 8 import omq.common.event.Event; 8 9 import omq.common.message.Request; 9 10 import omq.common.message.Response; … … 43 44 @Override 44 45 public Request deserializeRequest(byte[] bytes, RemoteObject obj) throws SerializerException { 46 return (Request) deserliazeObject(bytes); 47 } 48 49 @Override 50 public Response deserializeResponse(byte[] bytes, Class<?> type) throws SerializerException { 51 return (Response) deserliazeObject(bytes); 52 } 53 54 @Override 55 public Event deserializeEvent(byte[] bytes) throws SerializerException { 56 return (Event) deserliazeObject(bytes); 57 } 58 59 private Object deserliazeObject(byte[] bytes) throws SerializerException { 45 60 try { 46 61 ByteArrayInputStream input = new ByteArrayInputStream(bytes); 47 62 ObjectInputStream objInput = new ObjectInputStream(input); 48 63 49 Object request= objInput.readObject();64 Object obj = objInput.readObject(); 50 65 51 66 objInput.close(); 52 67 input.close(); 53 68 54 return (Request) request; 55 } catch (Exception e) { 56 throw new SerializerException("Deserialize -> " + e.getMessage(), e); 57 } 58 } 59 60 @Override 61 public Response deserializeResponse(byte[] bytes, Class<?> type) throws SerializerException { 62 try { 63 ByteArrayInputStream input = new ByteArrayInputStream(bytes); 64 ObjectInputStream objInput = new ObjectInputStream(input); 65 66 Object response = objInput.readObject(); 67 68 objInput.close(); 69 input.close(); 70 71 return (Response) response; 69 return obj; 72 70 } catch (Exception e) { 73 71 throw new SerializerException("Deserialize -> " + e.getMessage(), e); -
trunk/objectmq/src/omq/common/util/Serializers/KryoImp.java
r9 r14 7 7 import com.esotericsoftware.kryo.io.Output; 8 8 9 import omq.common.event.Event; 9 10 import omq.common.message.Request; 10 11 import omq.common.message.Response; … … 43 44 @Override 44 45 public Request deserializeRequest(byte[] bytes, RemoteObject obj) throws SerializerException { 45 try { 46 Input input = new Input(bytes); 47 Request request = kryo.readObject(input, Request.class); 48 49 input.close(); 50 return request; 51 } catch (Exception e) { 52 throw new SerializerException("Deserialize -> " + e.getMessage(), e); 53 } 46 return (Request) deserializeObject(bytes, Request.class); 54 47 } 55 48 56 49 @Override 57 50 public Response deserializeResponse(byte[] bytes, Class<?> type) throws SerializerException { 51 return (Response) deserializeObject(bytes, Response.class); 52 } 53 54 @Override 55 public Event deserializeEvent(byte[] bytes) throws SerializerException { 56 return (Event) deserializeObject(bytes, Event.class); 57 } 58 59 private Object deserializeObject(byte[] bytes, Class<?> type) throws SerializerException { 58 60 try { 59 61 Input input = new Input(bytes); 60 Response response = kryo.readObject(input, Response.class);62 Object obj = kryo.readObject(input, type); 61 63 62 64 input.close(); 63 return response;65 return obj; 64 66 } catch (Exception e) { 65 67 throw new SerializerException("Deserialize -> " + e.getMessage(), e); -
trunk/objectmq/src/omq/server/remote/request/RemoteObject.java
r10 r14 11 11 import omq.Remote; 12 12 import omq.common.broker.Broker; 13 import omq.common.event.Event; 13 14 import omq.common.util.ParameterQueue; 15 import omq.common.util.Serializer; 14 16 import omq.exception.SerializerException; 15 17 16 18 import com.rabbitmq.client.Channel; 17 import com.rabbitmq.client.Connection;18 19 import com.rabbitmq.client.ConsumerCancelledException; 19 20 import com.rabbitmq.client.QueueingConsumer; … … 33 34 private transient RemoteWrapper remoteWrapper; 34 35 private transient Map<String, List<Class<?>>> params; 35 private transient Connection connection;36 36 private transient Channel channel; 37 37 private transient QueueingConsumer consumer; … … 51 51 52 52 public RemoteObject() { 53 } 54 55 public void start(String reference, Properties env) throws Exception { 56 this.UID = reference; 57 53 58 params = new HashMap<String, List<Class<?>>>(); 54 59 for (Method m : this.getClass().getMethods()) { … … 59 64 params.put(m.getName(), list); 60 65 } 61 }62 63 public void start(String reference, Properties env) throws Exception {64 this.UID = reference;65 66 66 67 // Get num threads to use … … 73 74 String routingKey = UID; 74 75 75 // Start connection and channel 76 connection = Broker.getConnection(); 77 channel = connection.createChannel(); 76 // Start channel 77 channel = Broker.getNewChannel(); 78 78 79 79 // Declares and bindings … … 118 118 } 119 119 120 @Override 121 public void notifyEvent(Event event) throws IOException, SerializerException { 122 event.setTopic(UID); 123 channel.exchangeDeclare(UID, "fanout"); 124 channel.basicPublish(UID, "", null, Serializer.serialize(event)); 125 } 126 120 127 public void kill() throws IOException { 121 128 interrupt(); 122 129 killed = true; 123 130 channel.close(); 124 connection.close();125 131 remoteWrapper.stopRemoteWrapper(); 126 132 }
Note: See TracChangeset
for help on using the changeset viewer.