Changeset 53 for trunk/src/main/java/omq/server
- Timestamp:
- 06/20/13 16:57:39 (11 years ago)
- Location:
- trunk/src/main/java/omq/server
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/src/main/java/omq/server/InvocationThread.java
r49 r53 23 23 private static final Logger logger = Logger.getLogger(InvocationThread.class.getName()); 24 24 private RemoteObject obj; 25 private transient Serializer serializer; 25 26 private BlockingQueue<Delivery> deliveryQueue; 26 27 private boolean killed = false; 27 28 28 public InvocationThread(RemoteObject obj, BlockingQueue<Delivery> deliveryQueue ) {29 public InvocationThread(RemoteObject obj, BlockingQueue<Delivery> deliveryQueue, Serializer serializer) { 29 30 this.obj = obj; 30 31 this.deliveryQueue = deliveryQueue; 32 this.serializer = serializer; 31 33 } 32 34 … … 41 43 42 44 // Deserialize the json 43 Request request = Serializer.deserializeRequest(serializerType, delivery.getBody(), obj);45 Request request = serializer.deserializeRequest(serializerType, delivery.getBody(), obj); 44 46 // Log.saveLog("Server-Deserialize", delivery.getBody()); 45 47 … … 73 75 BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef()).correlationId(props.getCorrelationId()).build(); 74 76 75 byte[] bytesResponse = Serializer.serialize(serializerType, resp);77 byte[] bytesResponse = serializer.serialize(serializerType, resp); 76 78 channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse); 77 79 -
trunk/src/main/java/omq/server/RemoteObject.java
r49 r53 39 39 private String UID; 40 40 private Properties env; 41 private transient Broker broker; 42 private transient Serializer serializer; 41 43 private transient RemoteWrapper remoteWrapper; 42 44 private transient Map<String, List<Class<?>>> params; … … 60 62 } 61 63 62 public void startRemoteObject(String reference, Properties env) throws Exception { 63 this.UID = reference; 64 this.env = env; 64 public void startRemoteObject(String reference, Broker broker) throws Exception { 65 this.broker = broker; 66 UID = reference; 67 env = broker.getEnvironment(); 68 serializer = broker.getSerializer(); 65 69 66 70 params = new HashMap<String, List<Class<?>>>(); … … 75 79 // Get num threads to use 76 80 int numThreads = Integer.parseInt(env.getProperty(ParameterQueue.NUM_THREADS, "1")); 77 remoteWrapper = new RemoteWrapper(this, numThreads );81 remoteWrapper = new RemoteWrapper(this, numThreads, broker.getSerializer()); 78 82 79 83 startQueues(); … … 130 134 EventWrapper wrapper = new EventWrapper(event); 131 135 channel.exchangeDeclare(UID, "fanout"); 132 channel.basicPublish(UID, "", null, Serializer.serialize(wrapper));136 channel.basicPublish(UID, "", null, serializer.serialize(wrapper)); 133 137 } 134 138 … … 221 225 222 226 // Start channel 223 channel = Broker.getNewChannel();227 channel = broker.getNewChannel(); 224 228 225 229 // Declares and bindings -
trunk/src/main/java/omq/server/RemoteWrapper.java
r49 r53 4 4 import java.util.concurrent.BlockingQueue; 5 5 import java.util.concurrent.LinkedBlockingDeque; 6 7 import omq.common.util.Serializer; 6 8 7 9 import org.apache.log4j.Logger; … … 23 25 private BlockingQueue<Delivery> deliveryQueue; 24 26 25 public RemoteWrapper(RemoteObject obj, int numThreads ) {27 public RemoteWrapper(RemoteObject obj, int numThreads, Serializer serializer) { 26 28 this.obj = obj; 27 29 this.numThreads = numThreads; … … 32 34 33 35 for (int i = 0; i < numThreads; i++) { 34 InvocationThread thread = new InvocationThread(obj, deliveryQueue );36 InvocationThread thread = new InvocationThread(obj, deliveryQueue, serializer); 35 37 invocationList.add(thread); 36 38 thread.start();
Note: See TracChangeset
for help on using the changeset viewer.