Ignore:
Timestamp:
06/20/13 16:57:39 (11 years ago)
Author:
stoda
Message:

Non static broker
TODO: change all test to see whether the new broker configuration works

Location:
trunk/src/main/java/omq/server
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/main/java/omq/server/InvocationThread.java

    r49 r53  
    2323        private static final Logger logger = Logger.getLogger(InvocationThread.class.getName());
    2424        private RemoteObject obj;
     25        private transient Serializer serializer;
    2526        private BlockingQueue<Delivery> deliveryQueue;
    2627        private boolean killed = false;
    2728
    28         public InvocationThread(RemoteObject obj, BlockingQueue<Delivery> deliveryQueue) {
     29        public InvocationThread(RemoteObject obj, BlockingQueue<Delivery> deliveryQueue, Serializer serializer) {
    2930                this.obj = obj;
    3031                this.deliveryQueue = deliveryQueue;
     32                this.serializer = serializer;
    3133        }
    3234
     
    4143
    4244                                // Deserialize the json
    43                                 Request request = Serializer.deserializeRequest(serializerType, delivery.getBody(), obj);
     45                                Request request = serializer.deserializeRequest(serializerType, delivery.getBody(), obj);
    4446                                // Log.saveLog("Server-Deserialize", delivery.getBody());
    4547
     
    7375                                        BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef()).correlationId(props.getCorrelationId()).build();
    7476
    75                                         byte[] bytesResponse = Serializer.serialize(serializerType, resp);
     77                                        byte[] bytesResponse = serializer.serialize(serializerType, resp);
    7678                                        channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse);
    7779
  • trunk/src/main/java/omq/server/RemoteObject.java

    r49 r53  
    3939        private String UID;
    4040        private Properties env;
     41        private transient Broker broker;
     42        private transient Serializer serializer;
    4143        private transient RemoteWrapper remoteWrapper;
    4244        private transient Map<String, List<Class<?>>> params;
     
    6062        }
    6163
    62         public void startRemoteObject(String reference, Properties env) throws Exception {
    63                 this.UID = reference;
    64                 this.env = env;
     64        public void startRemoteObject(String reference, Broker broker) throws Exception {
     65                this.broker = broker;
     66                UID = reference;
     67                env = broker.getEnvironment();
     68                serializer = broker.getSerializer();
    6569
    6670                params = new HashMap<String, List<Class<?>>>();
     
    7579                // Get num threads to use
    7680                int numThreads = Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS, "1"));
    77                 remoteWrapper = new RemoteWrapper(this, numThreads);
     81                remoteWrapper = new RemoteWrapper(this, numThreads, broker.getSerializer());
    7882
    7983                startQueues();
     
    130134                EventWrapper wrapper = new EventWrapper(event);
    131135                channel.exchangeDeclare(UID, "fanout");
    132                 channel.basicPublish(UID, "", null, Serializer.serialize(wrapper));
     136                channel.basicPublish(UID, "", null, serializer.serialize(wrapper));
    133137        }
    134138
     
    221225
    222226                // Start channel
    223                 channel = Broker.getNewChannel();
     227                channel = broker.getNewChannel();
    224228
    225229                // Declares and bindings
  • trunk/src/main/java/omq/server/RemoteWrapper.java

    r49 r53  
    44import java.util.concurrent.BlockingQueue;
    55import java.util.concurrent.LinkedBlockingDeque;
     6
     7import omq.common.util.Serializer;
    68
    79import org.apache.log4j.Logger;
     
    2325        private BlockingQueue<Delivery> deliveryQueue;
    2426
    25         public RemoteWrapper(RemoteObject obj, int numThreads) {
     27        public RemoteWrapper(RemoteObject obj, int numThreads, Serializer serializer) {
    2628                this.obj = obj;
    2729                this.numThreads = numThreads;
     
    3234
    3335                for (int i = 0; i < numThreads; i++) {
    34                         InvocationThread thread = new InvocationThread(obj, deliveryQueue);
     36                        InvocationThread thread = new InvocationThread(obj, deliveryQueue, serializer);
    3537                        invocationList.add(thread);
    3638                        thread.start();
Note: See TracChangeset for help on using the changeset viewer.