- Timestamp:
- 05/24/13 16:23:16 (12 years ago)
- Location:
- trunk/objectmq/src/omq
- Files:
-
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/objectmq/src/omq/common/broker/Broker.java
r22 r24 23 23 import com.rabbitmq.client.Connection; 24 24 import com.rabbitmq.client.QueueingConsumer; 25 import com.rabbitmq.client.ShutdownListener; 26 import com.rabbitmq.client.ShutdownSignalException; 25 27 import com.rabbitmq.client.QueueingConsumer.Delivery; 26 28 … … 36 38 Environment.setEnvironment(env); 37 39 connection = OmqConnectionFactory.getNewConnection(env); 40 connection.addShutdownListener(new ShutdownListener() { 41 @Override 42 public void shutdownCompleted(ShutdownSignalException cause) { 43 if (connection.isOpen()) { 44 try { 45 connection.close(); 46 } catch (IOException e) { 47 e.printStackTrace(); 48 } 49 } 50 try { 51 Properties env = Environment.getEnvironment(); 52 connection = OmqConnectionFactory.getNewWorkingConnection(env); 53 channel = connection.createChannel(); 54 } catch (Exception e) { 55 e.printStackTrace(); 56 } 57 } 58 }); 38 59 channel = connection.createChannel(); 39 60 try { … … 84 105 try { 85 106 Properties environment = Environment.getEnvironment(); 86 remote.start (reference, environment);107 remote.startRemoteObject(reference, environment); 87 108 } catch (Exception e) { 88 109 throw new RemoteException(e); … … 119 140 String message = "ping"; 120 141 121 String exchange = env.getProperty(ParameterQueue.USER_NAME) ;122 String queueName = exchange + "ping";142 String exchange = env.getProperty(ParameterQueue.USER_NAME) + "ping"; 143 String queueName = exchange; 123 144 String routingKey = "routingKey"; 124 145 … … 140 161 141 162 if (!message.equalsIgnoreCase(new String(delivery.getBody()))) { 142 throw new Exception();163 throw new IOException("Ping-pong initialitzation has failed"); 143 164 } 144 165 } 166 145 167 } -
trunk/objectmq/src/omq/common/util/OmqConnectionFactory.java
r22 r24 23 23 connection = getNewConnection(env); 24 24 } 25 } 26 27 public static Connection getNewWorkingConnection(Properties env) throws Exception { 28 Connection connection = null; 29 boolean working = false; 30 31 while (!working) { 32 try { 33 connection = getNewConnection(env); 34 working = true; 35 } catch (Exception e) { 36 e.printStackTrace(); 37 long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000")); 38 Thread.sleep(milis); 39 } 40 } 41 42 return connection; 25 43 } 26 44 -
trunk/objectmq/src/omq/common/util/ParameterQueue.java
r19 r24 14 14 public static String SERIALIZERNAME = "revo.serializer"; 15 15 public static String ENABLECOMPRESSION = "revo.compression"; 16 16 17 17 public static String SERVER_HOST = "revo.host"; 18 18 public static String SERVER_PORT = "revo.port"; … … 31 31 public static String ENABLE_SSL = "revo.enable_ssl"; 32 32 public static String DEBUGFILE = "revo.debug_file"; 33 33 34 public static String RETRY_TIME_CONNECTION = "omq.retry_connection"; 34 35 35 36 /* -
trunk/objectmq/src/omq/server/remote/request/RemoteObject.java
r20 r24 35 35 36 36 private String UID; 37 private Properties env; 37 38 private transient RemoteWrapper remoteWrapper; 38 39 private transient Map<String, List<Class<?>>> params; … … 56 57 } 57 58 58 public void start (String reference, Properties env) throws Exception {59 public void startRemoteObject(String reference, Properties env) throws Exception { 59 60 this.UID = reference; 61 this.env = env; 60 62 61 63 params = new HashMap<String, List<Class<?>>>(); … … 72 74 remoteWrapper = new RemoteWrapper(this, numThreads); 73 75 74 // Get info about which exchange and queue will use 75 String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE); 76 String queue = UID; 77 String routingKey = UID; 78 79 // Start channel 80 channel = Broker.getNewChannel(); 81 82 // Declares and bindings 83 System.out.println("RemoteObject: " + UID + " declaring direct exchange: " + exchange + ", Queue: " + queue); 84 channel.exchangeDeclare(exchange, "direct"); 85 channel.queueDeclare(queue, false, false, false, null); 86 channel.queueBind(queue, exchange, routingKey); 87 88 // Declare the event topic fanout 89 System.out.println("RemoteObject: " + UID + " declaring fanout exchange: " + UID); 90 channel.exchangeDeclare(UID, "fanout"); 91 92 // Declare a new consumer 93 consumer = new QueueingConsumer(channel); 94 channel.basicConsume(queue, true, consumer); 76 startQueues(); 95 77 96 78 // Start this listener … … 109 91 } catch (ShutdownSignalException e) { 110 92 e.printStackTrace(); 93 try { 94 if (channel.isOpen()) { 95 channel.close(); 96 } 97 startQueues(); 98 } catch (Exception e1) { 99 try { 100 long milis = Long.parseLong(env.getProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000")); 101 Thread.sleep(milis); 102 } catch (InterruptedException e2) { 103 // TODO Auto-generated catch block 104 e2.printStackTrace(); 105 } 106 e1.printStackTrace(); 107 } 111 108 } catch (ConsumerCancelledException e) { 112 109 e.printStackTrace(); … … 209 206 } 210 207 208 private void startQueues() throws Exception { 209 // Get info about which exchange and queue will use 210 String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE); 211 String queue = UID; 212 String routingKey = UID; 213 214 // Start channel 215 channel = Broker.getNewChannel(); 216 217 // Declares and bindings 218 System.out.println("RemoteObject: " + UID + " declaring direct exchange: " + exchange + ", Queue: " + queue); 219 channel.exchangeDeclare(exchange, "direct"); 220 channel.queueDeclare(queue, false, false, false, null); 221 channel.queueBind(queue, exchange, routingKey); 222 223 // Declare the event topic fanout 224 System.out.println("RemoteObject: " + UID + " declaring fanout exchange: " + UID); 225 channel.exchangeDeclare(UID, "fanout"); 226 227 // Declare a new consumer 228 consumer = new QueueingConsumer(channel); 229 channel.basicConsume(queue, true, consumer); 230 } 231 211 232 @Override 212 233 public void addListener(EventListener eventListener) throws Exception { -
trunk/objectmq/src/omq/ztest/calculator/ClientTest.java
r19 r24 22 22 23 23 // Set host info of rabbimq (where it is) 24 env.setProperty(ParameterQueue.SERVER_HOST, "1 0.30.239.228");24 env.setProperty(ParameterQueue.SERVER_HOST, "127.0.0.1"); 25 25 env.setProperty(ParameterQueue.SERVER_PORT, "5672"); 26 26 // env.setProperty(ParameterQueue.SERIALIZERNAME, -
trunk/objectmq/src/omq/ztest/calculator/ServerTest.java
r18 r24 16 16 17 17 // Get host info of rabbimq (where it is) 18 env.setProperty(ParameterQueue.SERVER_HOST, "1 0.30.239.228");18 env.setProperty(ParameterQueue.SERVER_HOST, "127.0.0.1"); 19 19 env.setProperty(ParameterQueue.SERVER_PORT, "5672"); 20 20 // env.setProperty(ParameterQueue.SERIALIZERNAME, … … 25 25 // Set info about where the message will be sent 26 26 env.setProperty(ParameterQueue.RPC_EXCHANGE, "rpc_exchange"); 27 env.setProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000"); 27 28 28 29 calc = new CalculatorImpl();
Note: See TracChangeset
for help on using the changeset viewer.