[80] | 1 | ''' |
---|
| 2 | Created on 03/07/2013 |
---|
| 3 | |
---|
| 4 | @author: sergi |
---|
| 5 | ''' |
---|
| 6 | import pika |
---|
| 7 | import uuid |
---|
| 8 | import json |
---|
| 9 | |
---|
| 10 | class Client(object): |
---|
| 11 | ''' |
---|
| 12 | classdocs |
---|
| 13 | ''' |
---|
| 14 | |
---|
| 15 | |
---|
| 16 | def __init__(self, env): |
---|
| 17 | ''' |
---|
| 18 | Constructor |
---|
| 19 | ''' |
---|
| 20 | self.rpc_exchange = env.get('exchange', 'rpc_exchange') |
---|
| 21 | self.reply_queue = env.get('reply_queue', 'reply_queue') |
---|
| 22 | |
---|
| 23 | self.host = env.get('host', 'localhost') |
---|
| 24 | self.port = env.get('port', 5672) |
---|
| 25 | self.ssl = env.get('ssl', False) |
---|
| 26 | |
---|
| 27 | self.credentials = pika.PlainCredentials(env.get('user', 'guest'), env.get('pass', 'guest')) |
---|
| 28 | |
---|
| 29 | self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, port=self.port, credentials=self.credentials, ssl=self.ssl)) |
---|
| 30 | |
---|
| 31 | self.channel = self.connection.channel() |
---|
| 32 | |
---|
| 33 | self.callback_queue = self.channel.queue_declare(queue=self.reply_queue) |
---|
| 34 | |
---|
| 35 | self.channel.basic_consume(self.on_response, no_ack=True, queue=self.reply_queue) |
---|
| 36 | |
---|
| 37 | def async_call(self, uid, method, params): |
---|
| 38 | """ Async call this function will invoke the method 'method' with the params 'params' in the object binded with 'uid'""" |
---|
| 39 | self.__async(uid, method, params, False) |
---|
| 40 | |
---|
| 41 | def multi_async_call(self, uid, method, params): |
---|
| 42 | self.__async(uid, method, params, True) |
---|
| 43 | |
---|
| 44 | def __async(self, uid, method, params, multi): |
---|
| 45 | if multi: |
---|
| 46 | exch = "multi#" + uid |
---|
| 47 | else: |
---|
| 48 | exch = self.rpc_exchange |
---|
| 49 | |
---|
| 50 | corr_id = str(uuid.uuid4()) |
---|
| 51 | request = json.dumps({"method": method, "params": params, "id": corr_id, "async":"true"}) |
---|
| 52 | |
---|
| 53 | props = pika.BasicProperties(app_id=uid, correlation_id=corr_id, reply_to="", type='gson') |
---|
| 54 | |
---|
| 55 | self.channel.basic_publish(exchange=exch, routing_key=uid, properties=props, body=request) |
---|
| 56 | |
---|
| 57 | print "UID: " + uid + ", exchange: " + exch + " ,corrId = " + corr_id + " , json: " + request |
---|
| 58 | |
---|
| 59 | def sync_call(self, uid, method, params): |
---|
| 60 | self.response = None |
---|
| 61 | self.corr_id = str(uuid.uuid4()) |
---|
| 62 | request = json.dumps({"method": method, "params": params, "id": self.corr_id, "async":"false"}) |
---|
| 63 | |
---|
| 64 | props = pika.BasicProperties(app_id=uid, correlation_id=self.corr_id, reply_to="reply_queue", type='gson') |
---|
| 65 | |
---|
| 66 | self.channel.basic_publish(exchange=self.rpc_exchange, routing_key=uid, properties=props, body=request) |
---|
| 67 | |
---|
| 68 | print "UID: " + uid + ", exchange: " + self.rpc_exchange + " ,corrId = " + self.corr_id + " , json: " + request |
---|
| 69 | |
---|
| 70 | return self.__get_response() |
---|
| 71 | |
---|
| 72 | def multi_sync_call(self, uid, method, params, wait): |
---|
| 73 | responses = [] |
---|
| 74 | self.response = None |
---|
| 75 | self.corr_id = str(uuid.uuid4()) |
---|
| 76 | rpc_exchange = "multi#" + uid |
---|
| 77 | request = json.dumps({"method": method, "params": params, "id": self.corr_id, "async":"false"}) |
---|
| 78 | |
---|
| 79 | props = pika.BasicProperties(app_id=uid, correlation_id=self.corr_id, reply_to="reply_queue", type='gson') |
---|
| 80 | |
---|
| 81 | self.channel.basic_publish(exchange=rpc_exchange, routing_key=uid, properties=props, body=request) |
---|
| 82 | |
---|
| 83 | print "UID: " + uid + ", exchange: " + rpc_exchange + " ,corrId = " + self.corr_id + " , json: " + request |
---|
| 84 | i = 0 |
---|
| 85 | while i < wait: |
---|
| 86 | responses.append(self.__get_response()) |
---|
| 87 | self.response = None |
---|
| 88 | i = i + 1 |
---|
| 89 | return responses |
---|
| 90 | |
---|
| 91 | def __get_response(self): |
---|
| 92 | while self.response is None: |
---|
| 93 | self.connection.process_data_events() |
---|
| 94 | return self.response |
---|
| 95 | |
---|
| 96 | def on_response(self, ch, method, props, body): |
---|
| 97 | if self.corr_id == props.correlation_id: |
---|
| 98 | result = json.loads(body) |
---|
| 99 | self.response = result["result"] |
---|
| 100 | |
---|
| 101 | |
---|