1 | ''' |
---|
2 | Created on 03/07/2013 |
---|
3 | |
---|
4 | @author: sergi |
---|
5 | ''' |
---|
6 | import pika |
---|
7 | import uuid |
---|
8 | import json |
---|
9 | |
---|
10 | class Client(object): |
---|
11 | ''' |
---|
12 | classdocs |
---|
13 | ''' |
---|
14 | |
---|
15 | |
---|
16 | def __init__(self, env): |
---|
17 | ''' |
---|
18 | Constructor |
---|
19 | ''' |
---|
20 | self.rpc_exchange = env.get('exchange', 'rpc_exchange') |
---|
21 | self.reply_queue = env.get('reply_queue', 'reply_queue') |
---|
22 | |
---|
23 | self.host = env.get('host', 'localhost') |
---|
24 | self.port = env.get('port', 5672) |
---|
25 | self.ssl = env.get('ssl', False) |
---|
26 | |
---|
27 | self.credentials = pika.PlainCredentials(env.get('user', 'guest'), env.get('pass', 'guest')) |
---|
28 | |
---|
29 | self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, port=self.port, credentials=self.credentials, ssl=self.ssl)) |
---|
30 | |
---|
31 | self.channel = self.connection.channel() |
---|
32 | |
---|
33 | self.callback_queue = self.channel.queue_declare(queue=self.reply_queue) |
---|
34 | |
---|
35 | self.channel.basic_consume(self.on_response, no_ack=True, queue=self.reply_queue) |
---|
36 | |
---|
37 | def async_call(self, uid, method, params): |
---|
38 | """ Async call this function will invoke the method 'method' with the params 'params' in the object binded with 'uid'""" |
---|
39 | self.__async(uid, method, params, False) |
---|
40 | |
---|
41 | def multi_async_call(self, uid, method, params): |
---|
42 | self.__async(uid, method, params, True) |
---|
43 | |
---|
44 | def __async(self, uid, method, params, multi): |
---|
45 | if multi: |
---|
46 | exch = "multi#" + uid |
---|
47 | else: |
---|
48 | exch = self.rpc_exchange |
---|
49 | |
---|
50 | corr_id = str(uuid.uuid4()) |
---|
51 | request = json.dumps({"method": method, "params": params, "id": corr_id, "async":"true"}) |
---|
52 | |
---|
53 | props = pika.BasicProperties(app_id=uid, correlation_id=corr_id, reply_to="", type='gson') |
---|
54 | |
---|
55 | self.channel.basic_publish(exchange=exch, routing_key=uid, properties=props, body=request) |
---|
56 | |
---|
57 | print "UID: " + uid + ", exchange: " + exch + " ,corrId = " + corr_id + " , json: " + request |
---|
58 | |
---|
59 | def sync_call(self, uid, method, params): |
---|
60 | self.response = None |
---|
61 | self.corr_id = str(uuid.uuid4()) |
---|
62 | request = json.dumps({"method": method, "params": params, "id": self.corr_id, "async":"false"}) |
---|
63 | |
---|
64 | props = pika.BasicProperties(app_id=uid, correlation_id=self.corr_id, reply_to="reply_queue", type='gson') |
---|
65 | |
---|
66 | self.channel.basic_publish(exchange=self.rpc_exchange, routing_key=uid, properties=props, body=request) |
---|
67 | |
---|
68 | print "UID: " + uid + ", exchange: " + self.rpc_exchange + " ,corrId = " + self.corr_id + " , json: " + request |
---|
69 | |
---|
70 | return self.__get_response() |
---|
71 | |
---|
72 | def multi_sync_call(self, uid, method, params, wait): |
---|
73 | responses = [] |
---|
74 | self.response = None |
---|
75 | self.corr_id = str(uuid.uuid4()) |
---|
76 | rpc_exchange = "multi#" + uid |
---|
77 | request = json.dumps({"method": method, "params": params, "id": self.corr_id, "async":"false"}) |
---|
78 | |
---|
79 | props = pika.BasicProperties(app_id=uid, correlation_id=self.corr_id, reply_to="reply_queue", type='gson') |
---|
80 | |
---|
81 | self.channel.basic_publish(exchange=rpc_exchange, routing_key=uid, properties=props, body=request) |
---|
82 | |
---|
83 | print "UID: " + uid + ", exchange: " + rpc_exchange + " ,corrId = " + self.corr_id + " , json: " + request |
---|
84 | i = 0 |
---|
85 | while i < wait: |
---|
86 | responses.append(self.__get_response()) |
---|
87 | self.response = None |
---|
88 | i = i + 1 |
---|
89 | return responses |
---|
90 | |
---|
91 | def __get_response(self): |
---|
92 | while self.response is None: |
---|
93 | self.connection.process_data_events() |
---|
94 | return self.response |
---|
95 | |
---|
96 | def on_response(self, ch, method, props, body): |
---|
97 | if self.corr_id == props.correlation_id: |
---|
98 | result = json.loads(body) |
---|
99 | self.response = result["result"] |
---|
100 | |
---|
101 | |
---|