Ignore:
Timestamp:
07/10/13 13:14:30 (11 years ago)
Author:
stoda
Message:

Default queues added, default exchange enabled, more control in remote queues added.
Tests verified and changed Persistent test to show how to make persistent messages.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/main/java/omq/server/RemoteObject.java

    r83 r84  
    3939
    4040        private String UID;
    41         private String multiQueue;
    4241        private Properties env;
    4342        private transient Broker broker;
     43        private transient String multiQueue;
    4444        private transient RemoteWrapper remoteWrapper;
    4545        private transient Map<String, List<Class<?>>> params;
     
    7777                this.broker = broker;
    7878                this.UID = reference;
    79                 this.multiQueue = UID + System.currentTimeMillis();
    8079                this.env = env;
    8180
     
    262261         */
    263262        private void startQueues() throws Exception {
     263                // Start channel
     264                channel = broker.getNewChannel();
     265
     266                /*
     267                 * Default queue, Round Robin behaviour
     268                 */
     269
    264270                // Get info about which exchange and queue will use
    265                 String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
     271                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE, "");
    266272                String queue = UID;
    267273                String routingKey = UID;
    268274
    269                 // Multi info
    270                 String multiExchange = multi + UID;
    271                 if (env.getProperty(ParameterQueue.MULTI_QUEUE_NAME) != null) {
    272                         multiQueue = env.getProperty(ParameterQueue.MULTI_QUEUE_NAME);
    273                 }
    274 
    275                 boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false"));
     275                // RemoteObject default queue
     276                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUE, "false"));
    276277                boolean exclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_QUEUE, "false"));
    277278                boolean autoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_QUEUE, "false"));
    278279
    279                 // Start channel
    280                 channel = broker.getNewChannel();
    281 
    282280                // Declares and bindings
    283                 logger.info("RemoteObject: " + UID + " declaring direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable + ", Exclusive: "
     281                if (!exchange.equalsIgnoreCase("")) { // Default exchange case
     282                        channel.exchangeDeclare(exchange, "direct");
     283                }
     284                channel.queueDeclare(queue, durable, exclusive, autoDelete, null);
     285                if (!exchange.equalsIgnoreCase("")) { // Default exchange case
     286                        channel.queueBind(queue, exchange, routingKey);
     287                }
     288                logger.info("RemoteObject: " + UID + " declared direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable + ", Exclusive: "
    284289                                + exclusive + ", AutoDelete: " + autoDelete);
    285                 channel.exchangeDeclare(exchange, "direct");
    286                 channel.queueDeclare(queue, durable, exclusive, autoDelete, null);
    287                 channel.queueBind(queue, exchange, routingKey);
    288 
    289                 logger.info("RemoteObject: " + UID + " declaring fanout exchange: " + multiExchange + ", Queue: " + multiQueue + ", Durable: " + durable
    290                                 + ", Exclusive: " + exclusive + ", AutoDelete: " + autoDelete);
     290
     291                /*
     292                 * Multi queue, exclusive per each instance
     293                 */
     294
     295                // Get info about the multiQueue
     296                String multiExchange = multi + UID;
     297                multiQueue = env.getProperty(ParameterQueue.MULTI_QUEUE_NAME);
     298
     299                // Multi queue (exclusive queue per remoteObject)
     300                boolean multiDurable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_MQUEUE, "false"));
     301                boolean multiExclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_MQUEUE, "true"));
     302                boolean multiAutoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_MQUEUE, "true"));
     303
     304                // Declares and bindings
    291305                channel.exchangeDeclare(multiExchange, "fanout");
    292                 channel.queueDeclare(multiQueue, durable, exclusive, autoDelete, null);
     306                if (multiQueue == null) {
     307                        multiQueue = channel.queueDeclare().getQueue();
     308                } else {
     309                        channel.queueDeclare(multiQueue, multiDurable, multiExclusive, multiAutoDelete, null);
     310                }
    293311                channel.queueBind(multiQueue, multiExchange, "");
     312                logger.info("RemoteObject: " + UID + " declared fanout exchange: " + multiExchange + ", Queue: " + multiQueue + ", Durable: " + multiDurable
     313                                + ", Exclusive: " + multiExclusive + ", AutoDelete: " + multiAutoDelete);
     314
     315                /*
     316                 * Consumer
     317                 */
    294318
    295319                // Declare a new consumer
Note: See TracChangeset for help on using the changeset viewer.