Changeset 77 for trunk/src/main/java/omq


Ignore:
Timestamp:
07/04/13 16:45:14 (11 years ago)
Author:
stoda
Message:

ParameterQueues? changed, added some properties to modify the queues

Location:
trunk/src/main/java/omq
Files:
6 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/main/java/omq/client/proxy/MultiProxymq.java

    r75 r77  
    4141                replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
    4242                exchange = multi + uid;
    43                 serializerType = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.JAVA);
     43                serializerType = env.getProperty(ParameterQueue.PROXY_SERIALIZER, Serializer.JAVA);
    4444        }
    4545
  • trunk/src/main/java/omq/client/proxy/Proxymq.java

    r76 r77  
    5151        private transient Serializer serializer;
    5252        private transient Properties env;
     53        private transient Integer deliveryMode = null;
    5354        private transient Map<String, byte[]> results;
    5455
     
    9495
    9596                // set the serializer type
    96                 serializerType = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.JAVA);
     97                serializerType = env.getProperty(ParameterQueue.PROXY_SERIALIZER, Serializer.JAVA);
     98                if (env.getProperty(ParameterQueue.DELIVERY_MODE) != null) {
     99                        deliveryMode = Integer.parseInt(env.getProperty(ParameterQueue.DELIVERY_MODE));
     100                }
    97101
    98102                // Create a new hashmap and registry it in rListener
     
    143147
    144148                // Add the correlation ID and create a replyTo property
    145                 BasicProperties props = new BasicProperties.Builder().appId(uid).correlationId(corrId).replyTo(replyQueueName).type(serializerType).build();
     149                BasicProperties props = new BasicProperties.Builder().appId(uid).correlationId(corrId).replyTo(replyQueueName).type(serializerType)
     150                                .deliveryMode(deliveryMode).build();
    146151
    147152                // Publish the message
    148153                byte[] bytesRequest = serializer.serialize(serializerType, request);
    149154                broker.getChannel().basicPublish(exchange, routingkey, props, bytesRequest);
    150                 logger.debug("Proxymq: " + uid + " invokes " + request.getMethod() + ", corrID" + corrId + ", exchange: " + exchange + ", replyQueue: "
    151                                 + replyQueueName + ", serializerType: " + serializerType + ", multi call: " + request.isMulti() + ", async call: " + request.isAsync());
     155                logger.debug("Proxymq: " + uid + " invokes '" + request.getMethod() + "' , corrID: " + corrId + ", exchange: " + exchange + ", replyQueue: "
     156                                + replyQueueName + ", serializerType: " + serializerType + ", multi call: " + request.isMulti() + ", async call: " + request.isAsync()
     157                                + ", delivery mode: " + deliveryMode);
    152158        }
    153159
  • trunk/src/main/java/omq/common/util/OmqConnectionFactory.java

    r51 r77  
    5656
    5757                // Get host info of rabbimq (where it is)
    58                 String host = env.getProperty(ParameterQueue.SERVER_HOST);
    59                 int port = Integer.parseInt(env.getProperty(ParameterQueue.SERVER_PORT));
     58                String host = env.getProperty(ParameterQueue.RABBIT_HOST);
     59                int port = Integer.parseInt(env.getProperty(ParameterQueue.RABBIT_PORT));
    6060
    6161                boolean ssl = Boolean.parseBoolean(env.getProperty(ParameterQueue.ENABLE_SSL));
  • trunk/src/main/java/omq/common/util/ParameterQueue.java

    r44 r77  
    1212         */
    1313
    14         public static String SERIALIZER_NAME = "omq.serializer";
     14        /**
     15         * Set the proxy's serializer method
     16         */
     17        public static String PROXY_SERIALIZER = "omq.serializer";
    1518
    1619        /**
    1720         * Set whether the messages must be compressed or not
    1821         */
    19         public static String ENABLECOMPRESSION = "omq.compression";
     22        public static String ENABLE_COMPRESSION = "omq.compression";
    2023
    2124        /**
    2225         * Set the ip where the rabbitmq server is.
    2326         */
    24         public static String SERVER_HOST = "omq.host";
     27        public static String RABBIT_HOST = "omq.host";
    2528
    2629        /**
    2730         * Set the port that rabbitmq uses.
    2831         */
    29         public static String SERVER_PORT = "omq.port";
     32        public static String RABBIT_PORT = "omq.port";
    3033
    3134        /**
     
    5154
    5255        /**
    53          * Set the clients event queue. Every client must have a different queue
    54          * name.
    55          */
    56         public static String EVENT_REPLY_QUEUE = "omq.reply_queue_event";
    57 
    58         /**
    5956         * Set if the queues must be durable. The queues won't be lost when rabbitmq
    6057         * crashes if DURABLE_QUEUES is set trues.
     
    7269        public static String MESSAGE_TTL_IN_QUEUES = "omq.message_ttl_queue";
    7370
    74         // TODO persisten messages? the messages will be saved in the disk if this
     71        // TODO persistent messages? the messages will be saved in the disk if this
    7572        // flag is set true
    7673
     74        /**
     75         * Set if the system will use ssl
     76         */
    7777        public static String ENABLE_SSL = "omq.enable_ssl";
    78         public static String DEBUGFILE = "omq.debug_file";
    7978
     79        /**
     80         * Set how many time we have to wait to retry the connection with the server
     81         * when this goes down
     82         */
    8083        public static String RETRY_TIME_CONNECTION = "omq.retry_connection";
    8184
    82         /*
    83          * Values
     85        /**
     86         * Set how many threads will be created to invoke remote methods
    8487         */
    85 
    86         // Change this!!!
    87         public static String RPC_TYPE = "direct";
    88 
    8988        public static String NUM_THREADS = "omq.num_threads";
    9089
    91         public static String REGISTRY_NAME = "REGISTRY";
     90        /**
     91         * Set the specific name of a multi queue in a specific object
     92         */
     93        public static String MULTI_QUEUE_NAME = "omq.multi_queue_name";
     94
     95        /**
     96         * Set if server will delete a queue when is no longer in use
     97         */
     98        public static String AUTO_DELETE_QUEUE = "omq.auto_delete";
     99
     100        /**
     101         * Set if we are declaring an exclusive queue (restricted to this
     102         * connection)
     103         */
     104        public static String EXCLUSIVE_QUEUE = "omq.exclusive_queue";
     105
     106        /**
     107         * Set 1 to indicate the message will be nonpersistent and 2 to indicate it
     108         * will be persistent
     109         */
     110        public static String DELIVERY_MODE = "omq.delivery_mode";
    92111
    93112        /**
  • trunk/src/main/java/omq/common/util/Serializer.java

    r72 r77  
    4040
    4141        private Boolean getEnableCompression() {
    42                 return Boolean.valueOf(env.getProperty(ParameterQueue.ENABLECOMPRESSION, "false"));
     42                return Boolean.valueOf(env.getProperty(ParameterQueue.ENABLE_COMPRESSION, "false"));
    4343        }
    4444
     
    4646                if (serializer == null) {
    4747                        try {
    48                                 String className = env.getProperty(ParameterQueue.SERIALIZER_NAME, Serializer.JAVA);
     48                                String className = env.getProperty(ParameterQueue.PROXY_SERIALIZER, Serializer.JAVA);
    4949
    5050                                if (className == null || className.isEmpty()) {
     
    9898        }
    9999
    100         // TODO: remove this function and think about the event serialization
    101100        public byte[] serialize(Object obj) throws SerializerException {
    102101                ISerializer instance = getInstance();
  • trunk/src/main/java/omq/server/RemoteObject.java

    r75 r77  
    218218                String queue = UID;
    219219                String routingKey = UID;
     220
    220221                // Multi info
    221222                String multiExchange = multi + UID;
     223                if (env.getProperty(ParameterQueue.MULTI_QUEUE_NAME) != null) {
     224                        multiQueue = env.getProperty(ParameterQueue.MULTI_QUEUE_NAME);
     225                }
    222226
    223227                boolean durable = Boolean.parseBoolean(env.getProperty(ParameterQueue.DURABLE_QUEUES, "false"));
     228                boolean exclusive = Boolean.parseBoolean(env.getProperty(ParameterQueue.EXCLUSIVE_QUEUE, "false"));
     229                boolean autoDelete = Boolean.parseBoolean(env.getProperty(ParameterQueue.AUTO_DELETE_QUEUE, "false"));
    224230
    225231                // Start channel
     
    227233
    228234                // Declares and bindings
    229                 logger.info("RemoteObject: " + UID + " declaring direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable);
     235                logger.info("RemoteObject: " + UID + " declaring direct exchange: " + exchange + ", Queue: " + queue + ", Durable: " + durable + ", Exclusive: "
     236                                + exclusive + ", AutoDelete: " + autoDelete);
    230237                channel.exchangeDeclare(exchange, "direct");
    231                 channel.queueDeclare(queue, durable, false, false, null);
     238                channel.queueDeclare(queue, durable, exclusive, autoDelete, null);
    232239                channel.queueBind(queue, exchange, routingKey);
    233240
    234                 logger.info("RemoteObject: " + UID + " declaring fanout exchange: " + multiExchange + ", Queue: " + multiQueue + ", Durable: " + durable);
     241                logger.info("RemoteObject: " + UID + " declaring fanout exchange: " + multiExchange + ", Queue: " + multiQueue + ", Durable: " + durable
     242                                + ", Exclusive: " + exclusive + ", AutoDelete: " + autoDelete);
    235243                channel.exchangeDeclare(multiExchange, "fanout");
    236                 channel.queueDeclare(multiQueue, durable, false, false, null);
     244                channel.queueDeclare(multiQueue, durable, exclusive, autoDelete, null);
    237245                channel.queueBind(multiQueue, multiExchange, "");
    238246
Note: See TracChangeset for help on using the changeset viewer.