Changeset 18


Ignore:
Timestamp:
05/23/13 15:05:54 (11 years ago)
Author:
gguerrero
Message:

Logs and new event.

Location:
trunk/objectmq/src/omq
Files:
1 added
10 edited

Legend:

Unmodified
Added
Removed
  • trunk/objectmq/src/omq/client/proxy/Proxymq.java

    r15 r18  
    2020import omq.common.message.Request;
    2121import omq.common.message.Response;
     22import omq.common.util.Log;
    2223import omq.common.util.ParameterQueue;
    2324import omq.common.util.Serializer;
     
    126127                }
    127128        }
    128 
    129         private void publishAsyncRequest(Request request) throws IOException, SerializerException {
     129       
     130        private void publishMessage(Request request, String replyQueueName) throws IOException, SerializerException{
    130131                String corrId = request.getId();
    131132
    132133                // Get the environment properties
    133134                String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
    134                 String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
    135135                String routingkey = this.uid;
    136136
     
    139139
    140140                // Publish the message
    141                 channel.basicPublish(exchange, routingkey, props, Serializer.serialize(request));
     141                byte[] bytesRequest = Serializer.serialize(request);                           
     142                channel.basicPublish(exchange, routingkey, props, bytesRequest);
     143                Log.saveLog("Client-Serialize", bytesRequest);         
     144        }
     145
     146        private void publishAsyncRequest(Request request) throws IOException, SerializerException {
     147                // Get the environment properties
     148                String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
     149                publishMessage(request, replyQueueName);
    142150        }
    143151
     
    149157
    150158                // Get the environment properties
    151                 String exchange = env.getProperty(ParameterQueue.RPC_EXCHANGE);
    152159                String replyQueueName = env.getProperty(ParameterQueue.RPC_REPLY_QUEUE);
    153                 String routingkey = this.uid;
    154 
    155                 // Add the correlation ID and create a replyTo property
    156                 BasicProperties props = new BasicProperties.Builder().appId(uid).correlationId(corrId).replyTo(replyQueueName).build();
    157160
    158161                // Publish the message
     
    160163                while (i < retries) {
    161164                        try {
    162                                 channel.basicPublish(exchange, routingkey, props, Serializer.serialize(request));
     165                                publishMessage(request, replyQueueName);
    163166                                return getResult(corrId, timeout, type);
    164167                        } catch (TimeoutException te) {
     
    208211                        }
    209212                        resp = Serializer.deserializeResponse(results.get(corrId), type);
     213                        Log.saveLog("Client-Deserialize", results.get(corrId));
    210214                        // Remove and indicate the key exists (a hashmap can contain a null
    211215                        // object, using this we'll know whether a response has been
  • trunk/objectmq/src/omq/common/broker/Broker.java

    r15 r18  
    77import omq.client.proxy.Proxymq;
    88import omq.client.remote.response.ResponseListener;
     9import omq.common.event.Event;
    910import omq.common.event.EventDispatcher;
     11import omq.common.event.EventWrapper;
    1012import omq.common.util.Environment;
    1113import omq.common.util.OmqConnectionFactory;
     14import omq.common.util.Serializer;
    1215import omq.exception.EnvironmentException;
    1316import omq.exception.RemoteException;
     17import omq.exception.SerializerException;
    1418import omq.server.remote.request.RemoteObject;
    1519
     
    9195                }
    9296        }
     97       
     98       
     99        public static void trigger(Event event) throws IOException, SerializerException{
     100                String UID = event.getTopic();
     101                EventWrapper wrapper = new EventWrapper(event);
     102                channel.exchangeDeclare(UID, "fanout");
     103                channel.basicPublish(UID, "", null, Serializer.serialize(wrapper));
     104        }
    93105
    94106}
  • trunk/objectmq/src/omq/common/util/ParameterQueue.java

    r11 r18  
    3030
    3131        public static String ENABLE_SSL = "revo.enable_ssl";
     32        public static String DEBUGFILE = "revo.debug_file";
     33       
    3234
    3335        /*
  • trunk/objectmq/src/omq/common/util/Serializer.java

    r14 r18  
    3737                                String className = env.getProperty(ParameterQueue.SERIALIZERNAME, "omq.common.util.Serializers.JavaImp");
    3838
     39                                if (className == null || className.isEmpty()) {
     40                                        throw new ClassNotFoundException("Class name is null or empty.");
     41                                }
     42                               
    3943                                serializer = (ISerializer) Class.forName(className).newInstance();
    4044                        } catch (Exception ex) {
  • trunk/objectmq/src/omq/server/remote/request/InvocationThread.java

    r10 r18  
    55import omq.common.message.Request;
    66import omq.common.message.Response;
     7import omq.common.util.Log;
    78import omq.common.util.Serializer;
    89
     
    3536                                // Deserialize the json
    3637                                Request request = Serializer.deserializeRequest(delivery.getBody(), obj);
     38                                Log.saveLog("Server-Deserialize", delivery.getBody());
    3739
    3840                                // Invoke the method
     
    4648                                        BasicProperties props = delivery.getProperties();
    4749
    48                                         BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef()).correlationId(props.getCorrelationId()).build();
     50                                        BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef())
     51                                                        .correlationId(props.getCorrelationId()).build();
    4952
    50                                         channel.basicPublish("", props.getReplyTo(), replyProps, Serializer.serialize(resp));
     53                                        byte[] bytesResponse = Serializer.serialize(resp);
     54                                        channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse);
     55
     56                                        Log.saveLog("Server-Serialize", bytesResponse);
    5157                                }
    5258
  • trunk/objectmq/src/omq/ztest/calculator/CalculatorImpl.java

    r17 r18  
    44
    55import omq.client.annotation.AsyncMethod;
     6import omq.common.broker.Broker;
    67import omq.exception.SerializerException;
    78import omq.server.remote.request.RemoteObject;
     
    3334
    3435        public void divideByZero() throws IOException, SerializerException {
    35                 ZeroEvent ze = new ZeroEvent("my zero event");
    36                 notifyEvent(ze);
     36                ZeroEvent ze = new ZeroEvent("my zero event", "zero-event");
     37                Broker.trigger(ze);
     38                //notifyEvent(ze);
    3739        }
    3840
  • trunk/objectmq/src/omq/ztest/calculator/CalculatorTest.java

    r17 r18  
    2222
    2323                // Set host info of rabbimq (where it is)
    24                 env.setProperty(ParameterQueue.SERVER_HOST, "127.0.0.1");
     24                env.setProperty(ParameterQueue.SERVER_HOST, "10.30.239.228");
    2525                env.setProperty(ParameterQueue.SERVER_PORT, "5672");
    2626                // env.setProperty(ParameterQueue.SERIALIZERNAME,
     
    3131                // Set info about where the message will be sent
    3232                env.setProperty(ParameterQueue.RPC_EXCHANGE, "rpc_exchange");
     33                env.setProperty(ParameterQueue.DEBUGFILE, "c:\\middlewareDebug");
    3334
    3435                // Set info about the queue & the exchange where the ResponseListener
     
    7576        @Test
    7677        public void notifyEvent() throws Exception {
    77                 ZeroListener zL = new ZeroListener();
     78                ZeroListener zL = new ZeroListener("zero-event");
    7879
    7980                remoteCalc.addListener(zL);
  • trunk/objectmq/src/omq/ztest/calculator/ServerTest.java

    r17 r18  
    1616
    1717                // Get host info of rabbimq (where it is)
    18                 env.setProperty(ParameterQueue.SERVER_HOST, "127.0.0.1");
     18                env.setProperty(ParameterQueue.SERVER_HOST, "10.30.239.228");
    1919                env.setProperty(ParameterQueue.SERVER_PORT, "5672");
    2020                // env.setProperty(ParameterQueue.SERIALIZERNAME,
  • trunk/objectmq/src/omq/ztest/calculator/ZeroEvent.java

    r16 r18  
    1212        }
    1313
    14         public ZeroEvent(String corrId) {
    15                 super(corrId);
     14        public ZeroEvent(String corrId, String topic) {
     15                super(corrId, topic);
    1616        }
    1717
  • trunk/objectmq/src/omq/ztest/calculator/ZeroListener.java

    r16 r18  
    55public class ZeroListener extends EventListener<ZeroEvent> {
    66
     7        public ZeroListener(String topic){
     8                super(topic);
     9        }
     10       
    711        @Override
    812        public void notifyEvent(ZeroEvent event) {
Note: See TracChangeset for help on using the changeset viewer.