[108] | 1 | package omq.server; |
---|
| 2 | |
---|
| 3 | import java.io.IOException; |
---|
| 4 | import java.lang.reflect.InvocationTargetException; |
---|
| 5 | import java.util.Properties; |
---|
| 6 | |
---|
| 7 | import omq.common.broker.Broker; |
---|
| 8 | import omq.common.message.Request; |
---|
| 9 | import omq.common.message.Response; |
---|
| 10 | import omq.common.util.Serializer; |
---|
| 11 | import omq.exception.OmqException; |
---|
| 12 | |
---|
| 13 | import org.apache.log4j.Logger; |
---|
| 14 | |
---|
| 15 | import com.rabbitmq.client.AMQP.BasicProperties; |
---|
| 16 | import com.rabbitmq.client.Channel; |
---|
| 17 | import com.rabbitmq.client.QueueingConsumer; |
---|
| 18 | import com.rabbitmq.client.QueueingConsumer.Delivery; |
---|
| 19 | |
---|
| 20 | public abstract class AInvocationThread extends Thread { |
---|
| 21 | |
---|
| 22 | private static final Logger logger = Logger.getLogger(AInvocationThread.class.getName()); |
---|
| 23 | |
---|
| 24 | // RemoteObject |
---|
| 25 | protected RemoteObject obj; |
---|
| 26 | protected String reference; |
---|
| 27 | protected String UID; |
---|
| 28 | protected Properties env; |
---|
| 29 | |
---|
| 30 | protected RemoteThreadPool pool; |
---|
| 31 | |
---|
| 32 | // Broker |
---|
| 33 | protected Broker broker; |
---|
| 34 | protected Serializer serializer; |
---|
| 35 | |
---|
| 36 | // Consumer |
---|
| 37 | protected Channel channel; |
---|
| 38 | protected QueueingConsumer consumer; |
---|
| 39 | protected boolean killed = false; |
---|
| 40 | |
---|
| 41 | public AInvocationThread(RemoteObject obj) throws Exception { |
---|
| 42 | this.obj = obj; |
---|
| 43 | this.UID = obj.getUID(); |
---|
| 44 | this.reference = obj.getRef(); |
---|
| 45 | this.env = obj.getEnv(); |
---|
| 46 | this.broker = obj.getBroker(); |
---|
| 47 | this.pool = obj.getPool(); |
---|
| 48 | this.serializer = broker.getSerializer(); |
---|
| 49 | } |
---|
| 50 | |
---|
| 51 | @Override |
---|
| 52 | public synchronized void start() { |
---|
| 53 | try { |
---|
| 54 | startQueues(); |
---|
| 55 | super.start(); |
---|
| 56 | } catch (Exception e) { |
---|
| 57 | logger.error("Cannot start a remoteObject", e); |
---|
| 58 | } |
---|
| 59 | |
---|
| 60 | } |
---|
| 61 | |
---|
| 62 | /** |
---|
| 63 | * This method starts the queues using the information got in the |
---|
| 64 | * environment. |
---|
| 65 | * |
---|
| 66 | * @throws Exception |
---|
| 67 | */ |
---|
| 68 | protected abstract void startQueues() throws Exception; |
---|
| 69 | |
---|
| 70 | protected void executeTask(Delivery delivery) throws Exception{ |
---|
| 71 | String serializerType = delivery.getProperties().getType(); |
---|
| 72 | |
---|
| 73 | // Deserialize the request |
---|
| 74 | Request request = serializer.deserializeRequest(serializerType, delivery.getBody(), obj); |
---|
| 75 | String methodName = request.getMethod(); |
---|
| 76 | String requestID = request.getId(); |
---|
| 77 | |
---|
| 78 | logger.debug("Object: " + obj.getRef() + ", method: " + methodName + " corrID: " + requestID + ", serializerType: " |
---|
| 79 | + serializerType); |
---|
| 80 | |
---|
| 81 | // Invoke the method |
---|
| 82 | Object result = null; |
---|
| 83 | OmqException error = null; |
---|
| 84 | try { |
---|
| 85 | result = obj.invokeMethod(request.getMethod(), request.getParams()); |
---|
| 86 | } catch (InvocationTargetException e) { |
---|
| 87 | Throwable throwable = e.getTargetException(); |
---|
| 88 | logger.error("Object: " + obj.getRef() + " at method: " + methodName + ", corrID" + requestID, throwable); |
---|
| 89 | error = new OmqException(throwable.getClass().getCanonicalName(), throwable.getMessage()); |
---|
| 90 | } catch (NoSuchMethodException e) { |
---|
| 91 | logger.error("Object: " + obj.getRef() + " cannot find method: " + methodName); |
---|
| 92 | error = new OmqException(e.getClass().getCanonicalName(), e.getMessage()); |
---|
| 93 | } |
---|
| 94 | |
---|
| 95 | // Reply if it's necessary |
---|
| 96 | if (!request.isAsync()) { |
---|
| 97 | Response resp = new Response(request.getId(), obj.getRef(), result, error); |
---|
| 98 | |
---|
| 99 | BasicProperties props = delivery.getProperties(); |
---|
| 100 | |
---|
| 101 | BasicProperties replyProps = new BasicProperties.Builder().appId(obj.getRef()).correlationId(props.getCorrelationId()) |
---|
| 102 | .build(); |
---|
| 103 | |
---|
| 104 | byte[] bytesResponse = serializer.serialize(serializerType, resp); |
---|
| 105 | channel.basicPublish("", props.getReplyTo(), replyProps, bytesResponse); |
---|
| 106 | logger.debug("Publish sync response -> Object: " + obj.getRef() + ", method: " + methodName + " corrID: " + requestID |
---|
| 107 | + " replyTo: " + props.getReplyTo()); |
---|
| 108 | } |
---|
| 109 | |
---|
| 110 | channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); |
---|
| 111 | } |
---|
| 112 | |
---|
| 113 | public void kill() throws IOException { |
---|
| 114 | logger.info("Killing objectmq: " + reference + " thread id"); |
---|
| 115 | killed = true; |
---|
| 116 | interrupt(); |
---|
| 117 | channel.close(); |
---|
| 118 | } |
---|
| 119 | |
---|
| 120 | public RemoteObject getObj() { |
---|
| 121 | return obj; |
---|
| 122 | } |
---|
| 123 | |
---|
| 124 | public void setObj(RemoteObject obj) { |
---|
| 125 | this.obj = obj; |
---|
| 126 | } |
---|
| 127 | } |
---|