Changeset 84 for trunk/src/main/java


Ignore:
Timestamp:
07/10/13 13:14:30 (11 years ago)
Author:
stoda
Message:

Default queues added, default exchange enabled, more control in remote queues added.
Tests verified and changed Persistent test to show how to make persistent messages.

Location:
trunk/src/main/java/omq
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/main/java/omq/client/listener/ResponseListener.java

    r82 r84  
    120120
    121121                String reply_queue = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
    122                 boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false"));
     122                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUE, "false"));
     123                boolean exclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_QUEUE, "true"));
     124                boolean autoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_QUEUE, "true"));
    123125
    124126                int ttl = Integer.parseInt(env.getProperty(ParameterQueue.MESSAGE_TTL_IN_QUEUES, "-1"));
     
    128130                }
    129131
    130                 channel.queueDeclare(reply_queue, durable, false, false, args);
    131                 logger.info("ResponseListener creating queue: " + reply_queue + ", durable: " + durable + "TTL: " + (ttl > 0 ? ttl : "not set"));
     132                if (reply_queue == null) {
     133                        reply_queue = channel.queueDeclare().getQueue();
     134                        env.setProperty(ParameterQueue.RPC_REPLY_QUEUE, reply_queue);
     135                } else {
     136                        channel.queueDeclare(reply_queue, durable, exclusive, autoDelete, args);
     137                }
     138                logger.info("ResponseListener creating queue: " + reply_queue + ", durable: " + durable + ", exclusive: " + exclusive + ", autoDelete: " + autoDelete
     139                                + ", TTL: " + (ttl > 0 ? ttl : "not set"));
    132140
    133141                // Declare a new consumer
  • trunk/src/main/java/omq/client/proxy/Proxymq.java

    r83 r84  
    9292                // this.channel = Broker.getChannel();
    9393                env = broker.getEnvironment();
    94                 exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
     94                exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE, "");
    9595                multiExchange = multi + uid;
    9696                replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
  • trunk/src/main/java/omq/common/util/ParameterQueue.java

    r83 r84  
    99public class ParameterQueue {
    1010
    11         /**
    12          * Set the proxy's serializer method
     11        /*
     12         * Connection info
    1313         */
    14         public static String PROXY_SERIALIZER = "omq.serializer";
    1514
    1615        /**
    17          * Set whether the messages must be compressed or not
     16         * Set the clients username
    1817         */
    19         public static String ENABLE_COMPRESSION = "omq.compression";
     18        public static String USER_NAME = "omq.username";
     19
     20        /**
     21         * Set the clients password
     22         */
     23        public static String USER_PASS = "omq.pass";
    2024
    2125        /**
     
    3034
    3135        /**
    32          * Set the clients username
     36         * Set if the system will use ssl
    3337         */
    34         public static String USER_NAME = "omq.username";
     38        public static String ENABLE_SSL = "omq.enable_ssl";
    3539
    3640        /**
    37          * Set the clients password
     41         * Set how many time we have to wait to retry the connection with the server
     42         * when this goes down
    3843         */
    39         public static String USER_PASS = "omq.pass";
     44        public static String RETRY_TIME_CONNECTION = "omq.retry_connection";
     45
     46        /*
     47         * Queues info
     48         */
    4049
    4150        /**
     
    5160
    5261        /**
    53          * Set if the queues must be durable. The queues won't be lost when rabbitmq
    54          * crashes if DURABLE_QUEUES is set trues.
    55          */
    56         public static String DURABLE_QUEUES = "omq.durable_queue";
    57 
    58         /**
    59          * The MESSAGE_TTL_IN_QUEUES controls for how long a message published to
    60          * the queues can live before it is discarded. A message that has been in
    61          * the queue for longer than the configured TTL is said to be dead.
    62          *
    63          * This property must be a non-negative 32 bit integer (0 <= n <= 2^32-1),
    64          * describing the TTL period in milliseconds.
    65          */
    66         public static String MESSAGE_TTL_IN_QUEUES = "omq.message_ttl_queue";
    67 
    68         /**
    69          * Set if the system will use ssl
    70          */
    71         public static String ENABLE_SSL = "omq.enable_ssl";
    72 
    73         /**
    74          * Set how many time we have to wait to retry the connection with the server
    75          * when this goes down
    76          */
    77         public static String RETRY_TIME_CONNECTION = "omq.retry_connection";
    78 
    79         /**
    80          * Set how many threads will be created to invoke remote methods
    81          */
    82         public static String NUM_THREADS = "omq.num_threads";
    83 
    84         /**
    8562         * Set the specific name of a multi queue in a specific object
    8663         */
    8764        public static String MULTI_QUEUE_NAME = "omq.multi_queue_name";
     65
     66        /**
     67         * Set if a queue must be durable. The queue won't be lost when RabbitMQ
     68         * crashes if DURABLE_QUEUE is set true.
     69         */
     70        public static String DURABLE_QUEUE = "omq.durable_queue";
    8871
    8972        /**
     
    9982
    10083        /**
     84         * Set if a queue must be durable. The queue won't be lost when RabbitMQ
     85         * crashes if DURABLE_QUEUE is set true.
     86         */
     87        public static String DURABLE_MQUEUE = "omq.durable_mqueue";
     88
     89        /**
     90         * Set if server will delete a queue when is no longer in use
     91         */
     92        public static String AUTO_DELETE_MQUEUE = "omq.auto_mdelete";
     93
     94        /**
     95         * Set if we are declaring an exclusive queue (restricted to this
     96         * connection)
     97         */
     98        public static String EXCLUSIVE_MQUEUE = "omq.exclusive_mqueue";
     99
     100        /**
     101         * The MESSAGE_TTL_IN_QUEUES controls for how long a message published to
     102         * the queues can live before it is discarded. A message that has been in
     103         * the queue for longer than the configured TTL is said to be dead.
     104         *
     105         * This property must be a non-negative 32 bit integer (0 <= n <= 2^32-1),
     106         * describing the TTL period in milliseconds.
     107         */
     108        public static String MESSAGE_TTL_IN_QUEUES = "omq.message_ttl_queue";
     109
     110        /*
     111         * Message info
     112         */
     113
     114        /**
     115         * Set the proxy's serializer method
     116         */
     117        public static String PROXY_SERIALIZER = "omq.serializer";
     118
     119        /**
     120         * Set whether the messages must be compressed or not
     121         */
     122        public static String ENABLE_COMPRESSION = "omq.compression";
     123
     124        /**
    101125         * Set 1 to indicate the message will be nonpersistent and 2 to indicate it
    102126         * will be persistent
    103127         */
    104128        public static String DELIVERY_MODE = "omq.delivery_mode";
     129
     130        /*
     131         * ObjectMQ info
     132         */
     133
     134        /**
     135         * Set how many threads will be created to invoke remote methods
     136         */
     137        public static String NUM_THREADS = "omq.num_threads";
    105138
    106139        /**
  • trunk/src/main/java/omq/server/RemoteObject.java

    r83 r84  
    3939
    4040        private String UID;
    41         private String multiQueue;
    4241        private Properties env;
    4342        private transient Broker broker;
     43        private transient String multiQueue;
    4444        private transient RemoteWrapper remoteWrapper;
    4545        private transient Map<String, List<Class<?>>> params;
     
    7777                this.broker = broker;
    7878                this.UID = reference;
    79                 this.multiQueue = UID + System.currentTimeMillis();
    8079                this.env = env;
    8180
     
    262261         */
    263262        private void startQueues() throws Exception {
     263                // Start channel
     264                channel = broker.getNewChannel();
     265
     266                /*
     267                 * Default queue, Round Robin behaviour
     268                 */
     269
    264270                // Get info about which exchange and queue will use
    265                 String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
     271                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE, "");
    266272                String queue = UID;
    267273                String routingKey = UID;
    268274
    269                 // Multi info
    270                 String multiExchange = multi + UID;
    271                 if (env.getProperty(ParameterQueue.MULTI_QUEUE_NAME) != null) {
    272                         multiQueue = env.getProperty(ParameterQueue.MULTI_QUEUE_NAME);
    273                 }
    274 
    275                 boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false"));
     275                // RemoteObject default queue
     276                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUE, "false"));
    276277                boolean exclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_QUEUE, "false"));
    277278                boolean autoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_QUEUE, "false"));
    278279
    279                 // Start channel
    280                 channel = broker.getNewChannel();
    281 
    282280                // Declares and bindings
    283                 logger.info("RemoteObject: " + UID + " declaring direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable + ", Exclusive: "
     281                if (!exchange.equalsIgnoreCase("")) { // Default exchange case
     282                        channel.exchangeDeclare(exchange, "direct");
     283                }
     284                channel.queueDeclare(queue, durable, exclusive, autoDelete, null);
     285                if (!exchange.equalsIgnoreCase("")) { // Default exchange case
     286                        channel.queueBind(queue, exchange, routingKey);
     287                }
     288                logger.info("RemoteObject: " + UID + " declared direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable + ", Exclusive: "
    284289                                + exclusive + ", AutoDelete: " + autoDelete);
    285                 channel.exchangeDeclare(exchange, "direct");
    286                 channel.queueDeclare(queue, durable, exclusive, autoDelete, null);
    287                 channel.queueBind(queue, exchange, routingKey);
    288 
    289                 logger.info("RemoteObject: " + UID + " declaring fanout exchange: " + multiExchange + ", Queue: " + multiQueue + ", Durable: " + durable
    290                                 + ", Exclusive: " + exclusive + ", AutoDelete: " + autoDelete);
     290
     291                /*
     292                 * Multi queue, exclusive per each instance
     293                 */
     294
     295                // Get info about the multiQueue
     296                String multiExchange = multi + UID;
     297                multiQueue = env.getProperty(ParameterQueue.MULTI_QUEUE_NAME);
     298
     299                // Multi queue (exclusive queue per remoteObject)
     300                boolean multiDurable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_MQUEUE, "false"));
     301                boolean multiExclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_MQUEUE, "true"));
     302                boolean multiAutoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_MQUEUE, "true"));
     303
     304                // Declares and bindings
    291305                channel.exchangeDeclare(multiExchange, "fanout");
    292                 channel.queueDeclare(multiQueue, durable, exclusive, autoDelete, null);
     306                if (multiQueue == null) {
     307                        multiQueue = channel.queueDeclare().getQueue();
     308                } else {
     309                        channel.queueDeclare(multiQueue, multiDurable, multiExclusive, multiAutoDelete, null);
     310                }
    293311                channel.queueBind(multiQueue, multiExchange, "");
     312                logger.info("RemoteObject: " + UID + " declared fanout exchange: " + multiExchange + ", Queue: " + multiQueue + ", Durable: " + multiDurable
     313                                + ", Exclusive: " + multiExclusive + ", AutoDelete: " + multiAutoDelete);
     314
     315                /*
     316                 * Consumer
     317                 */
    294318
    295319                // Declare a new consumer
Note: See TracChangeset for help on using the changeset viewer.