source: branches/supervisor/src/test/java/omq/test/persistence/PersistentTest.java @ 109

Last change on this file since 109 was 109, checked in by stoda, 11 years ago

PersistentTest? works slower now since now there is a pool of threads. Using a little sleep it seems to work.

File size: 4.2 KB
Line 
1package omq.test.persistence;
2
3import static org.junit.Assert.assertEquals;
4
5import java.util.Arrays;
6import java.util.Collection;
7import java.util.Properties;
8
9import omq.common.broker.Broker;
10import omq.common.util.ParameterQueue;
11import omq.common.util.Serializer;
12
13import org.junit.Test;
14import org.junit.runner.RunWith;
15import org.junit.runners.Parameterized;
16import org.junit.runners.Parameterized.Parameters;
17
18/**
19 *
20 * @author Sergi Toda <sergi.toda@estudiants.urv.cat>
21 *
22 */
23@RunWith(value = Parameterized.class)
24public class PersistentTest {
25        private static String MESSAGE = "message";
26        private String type;
27        private Properties clientProps;
28        private Properties serverProps;
29        private Properties msgImplProps;
30
31        public PersistentTest(String type) {
32                this.type = type;
33
34                /*
35                 * Server Properties
36                 */
37                serverProps = new Properties();
38                serverProps.setProperty(ParameterQueue.USER_NAME, "guest");
39                serverProps.setProperty(ParameterQueue.USER_PASS, "guest");
40
41                // Set host info of rabbtimq (where it is)
42                serverProps.setProperty(ParameterQueue.RABBIT_HOST, "127.0.0.1");
43                serverProps.setProperty(ParameterQueue.RABBIT_PORT, "5672");
44
45                /*
46                 * MessageImpl Properties
47                 */
48                msgImplProps = new Properties();
49                msgImplProps.setProperty(ParameterQueue.RPC_EXCHANGE, "rpc_exchange");
50                msgImplProps.setProperty(ParameterQueue.DURABLE_QUEUE, "true");
51                msgImplProps.setProperty(ParameterQueue.EXCLUSIVE_QUEUE, "false");
52                msgImplProps.setProperty(ParameterQueue.AUTO_DELETE_QUEUE, "false");
53                msgImplProps.setProperty(ParameterQueue.MULTI_QUEUE_NAME, "multiMessageQueue");
54                msgImplProps.setProperty(ParameterQueue.DURABLE_MQUEUE, "true");
55                msgImplProps.setProperty(ParameterQueue.EXCLUSIVE_MQUEUE, "true");
56                msgImplProps.setProperty(ParameterQueue.AUTO_DELETE_MQUEUE, "false");
57                msgImplProps.setProperty(ParameterQueue.DELIVERY_MODE, "1");
58                msgImplProps.setProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000");
59
60                /*
61                 * Client Properties
62                 */
63
64                clientProps = new Properties();
65                clientProps.setProperty(ParameterQueue.USER_NAME, "guest");
66                clientProps.setProperty(ParameterQueue.USER_PASS, "guest");
67
68                // Set host info of rabbimq (where it is)
69                clientProps.setProperty(ParameterQueue.RABBIT_HOST, "127.0.0.1");
70                clientProps.setProperty(ParameterQueue.RABBIT_PORT, "5672");
71                clientProps.setProperty(ParameterQueue.DURABLE_QUEUE, "true");
72                clientProps.setProperty(ParameterQueue.PROXY_SERIALIZER, type);
73                clientProps.setProperty(ParameterQueue.RPC_EXCHANGE, "rpc_exchange");
74                clientProps.setProperty(ParameterQueue.RPC_REPLY_QUEUE, "persistent_message_reply_queue");
75                clientProps.setProperty(ParameterQueue.DURABLE_QUEUE, "true");
76                clientProps.setProperty(ParameterQueue.EXCLUSIVE_QUEUE, "true");
77                clientProps.setProperty(ParameterQueue.AUTO_DELETE_QUEUE, "false");
78                clientProps.setProperty(ParameterQueue.DELIVERY_MODE, "2");
79                clientProps.setProperty(ParameterQueue.RETRY_TIME_CONNECTION, "2000");
80                // TODO msgProps -> rpc_exchange, serializer type and delivery mode
81        }
82
83        @Parameters
84        public static Collection<Object[]> data() {
85                Object[][] data = new Object[][] { { Serializer.JAVA }, { Serializer.GSON }, { Serializer.KRYO } };
86                return Arrays.asList(data);
87        }
88
89        @Test
90        public void test() throws Exception {
91                System.out.println("Type = " + type);
92                String expected = "message";
93                String actual = null;
94
95                // Ensure the queues exist
96                Broker serverBroker = new Broker(serverProps);
97                MessageImpl msgImpl = new MessageImpl();
98                serverBroker.bind(MESSAGE, msgImpl, msgImplProps);
99
100                // Stop the serverBroker
101                serverBroker.stopBroker();
102
103                Broker clientBroker = new Broker(clientProps);
104                Message iMsg = clientBroker.lookup(MESSAGE, Message.class);
105                iMsg.setMessage(expected);
106
107                // Restart the rabbitmq
108                String password = "unpc";
109                String[] command = { "/bin/bash", "-c", "echo " + password + " | sudo -S service rabbitmq-server restart" };
110
111                Runtime runtime = Runtime.getRuntime();
112                runtime.exec(command);
113
114                Thread.sleep(15000);
115
116                // Restart the server and listen to the new messages
117                serverBroker = new Broker(serverProps);
118                msgImpl = new MessageImpl();
119                serverBroker.bind(MESSAGE, msgImpl, msgImplProps);
120
121                Thread.sleep(5000);
122               
123                actual = iMsg.getMessage();
124
125                // Stop both brokers
126                serverBroker.stopBroker();
127                clientBroker.stopBroker();
128
129                assertEquals(expected, actual);
130        }
131}
Note: See TracBrowser for help on using the repository browser.