1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
23 package org.onap.dmaap.mr.client.impl;
25 import static org.junit.Assert.assertEquals;
28 import java.io.FileNotFoundException;
29 import java.io.FileOutputStream;
30 import java.io.IOException;
31 import java.util.List;
32 import java.util.Properties;
33 import java.util.concurrent.TimeUnit;
35 import org.json.JSONObject;
36 import org.junit.Assert;
37 import org.junit.Before;
38 import org.junit.Test;
40 import org.onap.dmaap.mr.client.MRClientFactory;
41 import org.onap.dmaap.mr.client.MRPublisher.message;
42 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
43 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
45 public class MRSimplerBatchPublisherTest {
49 public void setUp(String contentType) throws Exception {
50 Properties properties = new Properties();
52 MRSimplerBatchPublisherTest.class.getClassLoader().getResourceAsStream("dme2/producer.properties"));
54 String routeFilePath = "dme2/preferredRoute.txt";
56 File file = new File(MRSimplerBatchPublisherTest.class.getClassLoader().getResource(routeFilePath).getFile());
57 properties.put("DME2preferredRouterFilePath",
58 MRSimplerBatchPublisherTest.class.getClassLoader().getResource(routeFilePath).getFile());
59 if (contentType != null) {
60 properties.put("contenttype", contentType);
62 outFile = new File(file.getParent() + "/producer_tmp.properties");
63 properties.store(new FileOutputStream(outFile), "");
67 public void testSend() throws Exception {
71 final MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
72 .createBatchingPublisher(outFile.getPath());
74 // publish some messages
75 final JSONObject msg1 = new JSONObject();
76 pub.send("MyPartitionKey", msg1.toString());
78 final List<message> stuck = pub.close(1, TimeUnit.SECONDS);
79 Assert.assertEquals(1, stuck.size());
84 public void testSendBatchWithResponse() throws Exception {
88 final MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
89 .createBatchingPublisher(outFile.getPath(), true);
91 // publish some messages
92 final JSONObject msg1 = new JSONObject();
93 pub.send("MyPartitionKey", msg1.toString());
94 MRPublisherResponse pubResponse = new MRPublisherResponse();
95 pub.setPubResponse(pubResponse);
97 MRPublisherResponse mrPublisherResponse = pub.sendBatchWithResponse();
98 Assert.assertEquals(1, mrPublisherResponse.getPendingMsgs());
103 public void testSendBatchWithResponseConText() throws Exception {
107 final MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
108 .createBatchingPublisher(outFile.getPath());
110 // publish some messages
111 final JSONObject msg1 = new JSONObject();
112 pub.send("MyPartitionKey", msg1.toString());
114 final List<message> stuck = pub.close(1, TimeUnit.SECONDS);
115 Assert.assertEquals(1, stuck.size());
120 public void testSendBatchWithResponseContCambria() throws Exception {
122 setUp("application/cambria-zip");
124 final MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
125 .createBatchingPublisher(outFile.getPath());
127 // publish some messages
128 final JSONObject msg1 = new JSONObject();
129 pub.send("MyPartitionKey", msg1.toString());
131 final List<message> stuck = pub.close(1, TimeUnit.SECONDS);
132 Assert.assertEquals(1, stuck.size());
137 public void testSendBatchWithResponseProtKey() throws Exception {
141 final MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
142 .createBatchingPublisher(outFile.getPath());
143 pub.setProtocolFlag(ProtocolTypeConstants.AUTH_KEY.getValue());
144 // publish some messages
145 final JSONObject msg1 = new JSONObject();
146 pub.send("MyPartitionKey", msg1.toString());
148 final List<message> stuck = pub.close(1, TimeUnit.SECONDS);
149 Assert.assertEquals(1, stuck.size());
154 public void testSendBatchWithResponseProtAaf() throws Exception {
158 final MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
159 .createBatchingPublisher(outFile.getPath());
160 pub.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
161 // publish some messages
162 final JSONObject msg1 = new JSONObject();
163 pub.send("MyPartitionKey", msg1.toString());
165 final List<message> stuck = pub.close(1, TimeUnit.SECONDS);
166 Assert.assertEquals(1, stuck.size());
171 public void testSendBatchWithResponseProtNoAuth() throws Exception {
175 final MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
176 .createBatchingPublisher(outFile.getPath());
177 pub.setProtocolFlag(ProtocolTypeConstants.HTTPNOAUTH.getValue());
178 // publish some messages
179 final JSONObject msg1 = new JSONObject();
180 pub.send("MyPartitionKey", msg1.toString());
182 final List<message> stuck = pub.close(1, TimeUnit.SECONDS);
183 Assert.assertEquals(1, stuck.size());
188 public void testSendBatchWithResponsecontypeText() throws Exception {
192 final MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
193 .createBatchingPublisher(outFile.getPath(), true);
195 // publish some messages
196 final JSONObject msg1 = new JSONObject();
197 pub.send("MyPartitionKey", "payload");
198 MRPublisherResponse pubResponse = new MRPublisherResponse();
199 pub.setPubResponse(pubResponse);
201 MRPublisherResponse mrPublisherResponse = pub.sendBatchWithResponse();
202 Assert.assertEquals(1, mrPublisherResponse.getPendingMsgs());
207 public void testSendBatchWithResponsecontypeCambria() throws Exception {
209 setUp("application/cambria-zip");
211 final MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
212 .createBatchingPublisher(outFile.getPath(), true);
214 // publish some messages
215 final JSONObject msg1 = new JSONObject();
216 pub.send("MyPartitionKey", "payload");
217 MRPublisherResponse pubResponse = new MRPublisherResponse();
218 pub.setPubResponse(pubResponse);
220 MRPublisherResponse mrPublisherResponse = pub.sendBatchWithResponse();
221 Assert.assertEquals(1, mrPublisherResponse.getPendingMsgs());
226 public void testSendBatchWithResponsePrAuthKey() throws Exception {
230 final MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
231 .createBatchingPublisher(outFile.getPath(), true);
232 pub.setProtocolFlag(ProtocolTypeConstants.AUTH_KEY.getValue());
234 // publish some messages
235 final JSONObject msg1 = new JSONObject();
236 pub.send("MyPartitionKey", msg1.toString());
237 MRPublisherResponse pubResponse = new MRPublisherResponse();
238 pub.setPubResponse(pubResponse);
240 MRPublisherResponse mrPublisherResponse = pub.sendBatchWithResponse();
241 Assert.assertEquals(1, mrPublisherResponse.getPendingMsgs());
246 public void testSendBatchWithResponsePrAaf() throws Exception {
250 final MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
251 .createBatchingPublisher(outFile.getPath(), true);
252 pub.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
254 // publish some messages
255 final JSONObject msg1 = new JSONObject();
256 pub.send("MyPartitionKey", msg1.toString());
257 MRPublisherResponse pubResponse = new MRPublisherResponse();
258 pub.setPubResponse(pubResponse);
260 MRPublisherResponse mrPublisherResponse = pub.sendBatchWithResponse();
261 Assert.assertEquals(1, mrPublisherResponse.getPendingMsgs());
266 public void testSendBatchWithResponsePrNoauth() throws Exception {
270 final MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
271 .createBatchingPublisher(outFile.getPath(), true);
272 pub.setProtocolFlag(ProtocolTypeConstants.HTTPNOAUTH.getValue());
274 // publish some messages
275 final JSONObject msg1 = new JSONObject();
276 pub.send("MyPartitionKey", msg1.toString());
277 MRPublisherResponse pubResponse = new MRPublisherResponse();
278 pub.setPubResponse(pubResponse);
280 MRPublisherResponse mrPublisherResponse = pub.sendBatchWithResponse();
281 Assert.assertEquals(1, mrPublisherResponse.getPendingMsgs());
286 public void createPublisherResponse() throws Exception{
288 MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
289 .createBatchingPublisher(outFile.getPath(), true);
291 MRPublisherResponse response=pub.createMRPublisherResponse("{\"message\": \"published the message\", \"status\": \"200\"}", new MRPublisherResponse());
292 assertEquals("200", response.getResponseCode());
297 public void createPublisherResponseSucc() throws Exception{
299 MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
300 .createBatchingPublisher(outFile.getPath(), true);
302 MRPublisherResponse response=pub.createMRPublisherResponse("{\"fakemessage\": \"published the message\", \"fakestatus\": \"200\"}", new MRPublisherResponse());
303 assertEquals("200", response.getResponseCode());
308 public void createPublisherResponseError() throws Exception{
310 MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
311 .createBatchingPublisher(outFile.getPath(), true);
313 MRPublisherResponse response=pub.createMRPublisherResponse("", new MRPublisherResponse());
314 assertEquals("400", response.getResponseCode());