Changeset 83 for trunk/src/main/java/omq/common
- Timestamp:
- 07/08/13 13:29:24 (11 years ago)
- Location:
- trunk/src/main/java/omq/common
- Files:
-
- 11 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/src/main/java/omq/common/broker/Broker.java
r74 r83 16 16 import omq.common.util.ParameterQueue; 17 17 import omq.common.util.Serializer; 18 import omq.exception.AlreadyBoundException; 18 19 import omq.exception.InitBrokerException; 19 20 import omq.exception.RemoteException; … … 30 31 import com.rabbitmq.client.ShutdownSignalException; 31 32 33 /** 34 * A "broker" allows a new connection to a RabbitMQ server. Under this 35 * connection it can have binded object and proxies. 36 * 37 * @author Sergi Toda <sergi.toda@estudiants.urv.cat> 38 * 39 */ 32 40 public class Broker { 33 41 … … 65 73 } 66 74 75 /** 76 * This method stops the broker's connection and all the threads created 77 * 78 * @throws Exception 79 */ 67 80 public void stopBroker() throws Exception { 68 81 logger.warn("Stopping broker"); … … 84 97 environment = null; 85 98 remoteObjs = null; 86 // Serializer.removeSerializers();87 99 } 88 100 … … 95 107 } 96 108 109 /** 110 * This method close the broker's connection 111 * 112 * @throws IOException 113 */ 97 114 public void closeConnection() throws IOException { 98 115 logger.warn("Clossing connection"); … … 103 120 104 121 /** 122 * Return the broker's channel 105 123 * 106 124 * @return Broker's channel … … 121 139 } 122 140 141 /** 142 * Returns the remote object for specified reference. 143 * 144 * @param reference 145 * - Binding name 146 * @param contract 147 * - Remote Interface 148 * @return newProxy 149 * @throws RemoteException 150 */ 123 151 @SuppressWarnings("unchecked") 124 152 public synchronized <T extends Remote> T lookup(String reference, Class<T> contract) throws RemoteException { … … 126 154 127 155 if (!clientStarted) { 128 initClient(environment); 129 clientStarted = true; 156 initClient(); 130 157 } 131 158 … … 144 171 } 145 172 173 /** 174 * Returns the remote object for specified reference. This function returns 175 * an special type of proxy, every method invoked will be multi and 176 * asynchronous. 177 * 178 * @param reference 179 * - Binding name 180 * @param contract 181 * - Remote Interface 182 * @return newProxy 183 * @throws RemoteException 184 */ 146 185 @SuppressWarnings("unchecked") 147 186 public synchronized <T extends Remote> T lookupMulti(String reference, Class<T> contract) throws RemoteException { … … 161 200 } 162 201 163 public void bind(String reference, RemoteObject remote) throws RemoteException { 202 /** 203 * Binds the reference to the specified remote object. This function uses 204 * the broker's environment 205 * 206 * @param reference 207 * - Binding name 208 * @param remote 209 * - RemoteObject to bind 210 * @throws RemoteException 211 * If the remote operation failed 212 * @throws AlreadyBoundException 213 * If name is already bound. 214 */ 215 public void bind(String reference, RemoteObject remote) throws RemoteException, AlreadyBoundException { 164 216 bind(reference, remote, environment); 165 217 } 166 218 167 public void bind(String reference, RemoteObject remote, Properties env) throws RemoteException { 219 /** 220 * Binds the reference to the specified remote object. This function uses 221 * the broker's environment 222 * 223 * @param reference 224 * - Binding name 225 * @param remote 226 * - RemoteObject to bind 227 * @param env 228 * - RemoteObject environment. You can set how many threads will 229 * be listen to the reference, the multiqueue name and the 230 * properties of the object queue and multiqueue 231 * @throws RemoteException 232 * If the remote operation failed 233 * @throws AlreadyBoundException 234 * If name is already bound. 235 */ 236 public void bind(String reference, RemoteObject remote, Properties env) throws RemoteException, AlreadyBoundException { 237 if (remoteObjs.containsKey(reference)) { 238 throw new AlreadyBoundException(reference); 239 } 240 // Try to start the remtoeObject listeners 168 241 try { 169 242 remote.startRemoteObject(reference, this, env); … … 174 247 } 175 248 249 /** 250 * Unbinds a remoteObject from its reference and kills all the threads 251 * created. 252 * 253 * @param reference 254 * - Binding name 255 * @throws RemoteException 256 * If the remote operation failed 257 * @throws IOException 258 * If there are problems while killing the threads 259 */ 176 260 public void unbind(String reference) throws RemoteException, IOException { 177 261 if (remoteObjs.containsKey(reference)) { … … 184 268 } 185 269 186 public void rebind(String name, Remote obj) throws RemoteException { 187 188 } 189 190 /** 191 * This method ensures the client will have only one ResponseListener and 192 * only one EventDispatcher. Both with the same environment. 193 * 194 * @param environment 195 * @throws Exception 196 */ 197 private synchronized void initClient(Properties environment) throws Exception { 270 /** 271 * This method ensures the client will have only one ResponseListener. 272 * 273 * @throws Exception 274 */ 275 private synchronized void initClient() throws Exception { 198 276 if (responseListener == null) { 199 277 responseListener = new ResponseListener(this); 200 278 responseListener.start(); 279 clientStarted = true; 201 280 } 202 281 } -
trunk/src/main/java/omq/common/message/Request.java
r75 r83 3 3 import java.io.Serializable; 4 4 5 /** 6 * Serializable request information. This class is used to send the information 7 * to the server. It has information about which method is wanted to invoke, its 8 * parameters, its correlation id and if a response is needed -asynchronous 9 * method-. 10 * 11 * @author Sergi Toda <sergi.toda@estudiants.urv.cat> 12 * 13 */ 5 14 public class Request implements Serializable { 6 15 … … 20 29 private transient int retries; 21 30 31 // This constructor is used by kryo 22 32 public Request() { 23 33 } … … 38 48 } 39 49 50 /** 51 * This method creates a new synchronous request 52 * 53 * @param id 54 * - correlation id of this invocation 55 * @param method 56 * - method name wanted to call 57 * @param params 58 * - parameters of this method 59 * @return - new SyncRequest 60 */ 40 61 public static Request newSyncRequest(String id, String method, Object[] params) { 41 62 return new Request(id, method, false, params); 42 63 } 43 64 65 /** 66 * This method creates a new synchronous request 67 * 68 * @param id 69 * - correlation id of this invocation 70 * @param method 71 * - method name wanted to call 72 * @param params 73 * - parameters of this method 74 * @param retries 75 * - How many retries will be done 76 * @param timeout 77 * - Timeout for every retry 78 * @param multi 79 * - If the method is multi 80 * @param wait 81 * - If the method is multi how many responses will be listened 82 * @return - new SyncRequest 83 */ 44 84 public static Request newSyncRequest(String id, String method, Object[] params, int retries, long timeout, boolean multi, int wait) { 45 85 Request req = new Request(id, method, false, params, multi); … … 50 90 } 51 91 92 /** 93 * This method creates a new asynchronous request 94 * 95 * @param id 96 * - correlation id of this invocation 97 * @param method 98 * - method name wanted to call 99 * @param params 100 * - parameters of this method 101 * @param multi 102 * - If the method is multi 103 * @return new AsyncRequest 104 */ 52 105 public static Request newAsyncRequest(String id, String method, Object[] params, boolean multi) { 53 106 return new Request(id, method, true, params, multi); -
trunk/src/main/java/omq/common/message/Response.java
r44 r83 5 5 import omq.exception.OmqException; 6 6 7 /** 8 * Serializable response information. This class is used to send the information 9 * to the client proxy. It has information about which remoteObject has invoked 10 * the method and its correlation id. This class also has the result of the 11 * invoke if everything has gone fine in the server or an error otherwise. 12 * 13 * @author Sergi Toda <sergi.toda@estudiants.urv.cat> 14 * 15 */ 7 16 public class Response implements Serializable { 8 17 … … 17 26 private String idOmq; 18 27 28 // Used by kryo 19 29 public Response() { 20 30 } 21 31 32 /** 33 * Creates a new Response object to be serialized 34 * 35 * @param id 36 * - correlation id of the invoke 37 * @param idOmq 38 * - objectmq's identifier -bind reference- 39 * @param result 40 * - result of the invocation 41 * @param error 42 * - error thrown by the invocation 43 */ 22 44 public Response(String id, String idOmq, Object result, OmqException error) { 23 45 this.id = id; -
trunk/src/main/java/omq/common/util/OmqConnectionFactory.java
r77 r83 13 13 14 14 /** 15 * This class creates RabbitMQ connections 15 16 * 16 17 * @author Sergi Toda <sergi.toda@estudiants.urv.cat> … … 23 24 private static int connectionTimeout = 2 * 1000; 24 25 26 /** 27 * If this class is wanted to use as a singleton class this is the init 28 * function 29 * 30 * @param env 31 * - environment that sets the properties needed by RabbitMQ 32 * @throws KeyManagementException 33 * @throws NoSuchAlgorithmException 34 * @throws IOException 35 */ 25 36 public static void init(Properties env) throws KeyManagementException, NoSuchAlgorithmException, IOException { 26 37 if (connection == null) { … … 29 40 } 30 41 42 /** 43 * This function returns a working connection. 44 * 45 * @param env 46 * - used if it's necessary to create a new connection 47 * @return workingConnection 48 * @throws Exception 49 */ 31 50 public static Connection getNewWorkingConnection(Properties env) throws Exception { 32 51 Connection connection = null; … … 50 69 } 51 70 71 /** 72 * This function creates a new rabbitmq connection using the properties set 73 * in env 74 * 75 * @param env 76 * - Properties needed to create a new connection: username, 77 * password, rabbit_host, rabbit_port, enable_ssl (optional) 78 * @return new Connection 79 * @throws IOException 80 * @throws KeyManagementException 81 * @throws NoSuchAlgorithmException 82 */ 52 83 public static Connection getNewConnection(Properties env) throws IOException, KeyManagementException, NoSuchAlgorithmException { 53 84 // Get login info of rabbitmq … … 59 90 int port = Integer.parseInt(env.getProperty(ParameterQueue.RABBIT_PORT)); 60 91 61 boolean ssl = Boolean.parseBoolean(env.getProperty(ParameterQueue.ENABLE_SSL ));92 boolean ssl = Boolean.parseBoolean(env.getProperty(ParameterQueue.ENABLE_SSL, "false")); 62 93 63 94 // Start a new connection and channel … … 78 109 } 79 110 111 /** 112 * This method creates a new channel if the singleton pattern is used 113 * 114 * @return new Channel 115 * @throws IOException 116 */ 80 117 public static Channel getNewChannel() throws IOException { 81 118 Channel channel = connection.createChannel(); -
trunk/src/main/java/omq/common/util/ParameterQueue.java
r77 r83 2 2 3 3 /** 4 * This class is used to create new environments. 4 5 * 5 6 * @author Sergi Toda <sergi.toda@estudiants.urv.cat> … … 7 8 */ 8 9 public class ParameterQueue { 9 10 /*11 * Properties environment12 */13 10 14 11 /** … … 69 66 public static String MESSAGE_TTL_IN_QUEUES = "omq.message_ttl_queue"; 70 67 71 // TODO persistent messages? the messages will be saved in the disk if this72 // flag is set true73 74 68 /** 75 69 * Set if the system will use ssl … … 111 105 112 106 /** 113 * Time in milis 107 * Time in milis by default is set in a minute 114 108 */ 115 109 public static long DEFAULT_TIMEOUT = 1 * 1000 * 60; -
trunk/src/main/java/omq/common/util/Serializer.java
r77 r83 15 15 /** 16 16 * 17 * Serializer enables to serialize the requests and the responses of the 18 * remoteObjects. This class is used to have the same serializer object a not to 19 * create new instances every time they are needed. 20 * 17 21 * @author Sergi Toda <sergi.toda@estudiants.urv.cat> 18 22 * 19 23 */ 20 24 public class Serializer { 21 // private static final Logger logger =22 // Logger.getLogger(Serializer.class.getName());23 25 public static final String KRYO = "kryo"; 24 26 public static final String JAVA = "java"; -
trunk/src/main/java/omq/common/util/Serializers/GsonImp.java
r75 r83 15 15 import com.google.gson.JsonParser; 16 16 17 /** 18 * Json serialize implementation. It uses the Gson libraries. 19 * 20 * @author Sergi Toda <sergi.toda@estudiants.urv.cat> 21 * 22 */ 17 23 public class GsonImp implements ISerializer { 18 24 private final Gson gson = new Gson(); -
trunk/src/main/java/omq/common/util/Serializers/ISerializer.java
r72 r83 7 7 8 8 /** 9 * An ISerializer object can serialize any kind of objects and deserialize 10 * Requests and Responses. 9 11 * 10 12 * @author Sergi Toda <sergi.toda@estudiants.urv.cat> … … 12 14 */ 13 15 public interface ISerializer { 16 /** 17 * Serialize 18 * 19 * @param obj 20 * - object to serialize 21 * @return objectSerialized 22 * @throws SerializerException 23 * - If the serialization failed 24 */ 14 25 public byte[] serialize(Object obj) throws SerializerException; 15 26 27 /** 28 * Deserialize a Request 29 * 30 * @param bytes 31 * - serialized request 32 * @param obj 33 * - remoteObject which is receiving requests 34 * @return request 35 * @throws SerializerException 36 * - If the serialization failed 37 */ 16 38 public Request deserializeRequest(byte[] bytes, RemoteObject obj) throws SerializerException; 17 39 40 /** 41 * Deserialize a Response 42 * 43 * @param bytes 44 * serialized response 45 * @param type 46 * - return type expected 47 * @return response 48 * @throws SerializerException 49 * - If the serialization failed 50 */ 18 51 public Response deserializeResponse(byte[] bytes, Class<?> type) throws SerializerException; 19 52 } -
trunk/src/main/java/omq/common/util/Serializers/JavaImp.java
r72 r83 12 12 13 13 /** 14 * Java serialize implementation. It uses the default java serialization. 14 15 * 15 16 * @author Sergi Toda <sergi.toda@estudiants.urv.cat> -
trunk/src/main/java/omq/common/util/Serializers/KryoImp.java
r72 r83 13 13 14 14 /** 15 * Kryo serializerimplementation. It uses the Kryo libraries. 15 16 * 16 17 * @author Sergi Toda <sergi.toda@estudiants.urv.cat> -
trunk/src/main/java/omq/common/util/Zipper.java
r44 r83 7 7 import java.util.zip.GZIPOutputStream; 8 8 9 /** 10 * This class enables the compression of the information sent through the 11 * rabbitmq server. 12 * 13 * @author Sergi Toda <sergi.toda@estudiants.urv.cat> 14 * 15 */ 9 16 public class Zipper { 10 17 … … 16 23 zos = new GZIPOutputStream(baos); 17 24 zos.write(b); 18 } finally {19 if (zos != null){25 } finally { 26 if (zos != null) { 20 27 zos.close(); 21 28 } 22 29 23 30 baos.close(); 24 31 } 25 32 26 33 return baos.toByteArray(); 27 34 } … … 34 41 try { 35 42 zis = new GZIPInputStream(bais); 36 43 37 44 byte[] tmpBuffer = new byte[256]; 38 45 int n; … … 40 47 baos.write(tmpBuffer, 0, n); 41 48 } 42 } finally { 43 if (zis != null){49 } finally { 50 if (zis != null) { 44 51 zis.close(); 45 52 } 46 53 47 54 bais.close(); 48 55 baos.close(); 49 } 50 56 } 57 51 58 return baos.toByteArray(); 52 } 59 } 53 60 }
Note: See TracChangeset
for help on using the changeset viewer.