1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Modifications Copyright © 2021 Orange.
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
23 *******************************************************************************/
25 package org.onap.dmaap.mr.client.impl;
27 import org.json.JSONObject;
28 import org.junit.Assert;
29 import org.junit.Test;
30 import org.onap.dmaap.mr.client.MRClientFactory;
31 import org.onap.dmaap.mr.client.MRPublisher.Message;
32 import org.onap.dmaap.mr.client.ProtocolType;
33 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
36 import java.io.FileOutputStream;
37 import java.util.List;
38 import java.util.Properties;
39 import java.util.concurrent.TimeUnit;
41 import static org.junit.Assert.assertEquals;
43 public class MRSimplerBatchPublisherTest {
47 public void setUp(String contentType) throws Exception {
48 Properties properties = new Properties();
50 MRSimplerBatchPublisherTest.class.getClassLoader().getResourceAsStream("dme2/producer.properties"));
52 String routeFilePath = "dme2/preferredRoute.txt";
54 File file = new File(MRSimplerBatchPublisherTest.class.getClassLoader().getResource(routeFilePath).getFile());
55 properties.put("DME2preferredRouterFilePath",
56 MRSimplerBatchPublisherTest.class.getClassLoader().getResource(routeFilePath).getFile());
57 if (contentType != null) {
58 properties.put("contenttype", contentType);
60 outFile = new File(file.getParent() + "/producer_tmp.properties");
61 properties.store(new FileOutputStream(outFile), "");
65 public void testSend() throws Exception {
69 final MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
70 .createBatchingPublisher(outFile.getPath());
72 // publish some messages
73 final JSONObject msg1 = new JSONObject();
74 pub.send("MyPartitionKey", msg1.toString());
76 final List<Message> stuck = pub.close(1, TimeUnit.SECONDS);
77 Assert.assertEquals(1, stuck.size());
82 public void testSendBatchWithResponse() throws Exception {
86 final MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
87 .createBatchingPublisher(outFile.getPath(), true);
89 // publish some messages
90 final JSONObject msg1 = new JSONObject();
91 pub.send("MyPartitionKey", msg1.toString());
92 MRPublisherResponse pubResponse = new MRPublisherResponse();
93 pub.setPubResponse(pubResponse);
95 MRPublisherResponse mrPublisherResponse = pub.sendBatchWithResponse();
96 Assert.assertEquals(1, mrPublisherResponse.getPendingMsgs());
101 public void testSendBatchWithResponseConText() throws Exception {
105 final MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
106 .createBatchingPublisher(outFile.getPath());
108 // publish some messages
109 final JSONObject msg1 = new JSONObject();
110 pub.send("MyPartitionKey", msg1.toString());
112 final List<Message> stuck = pub.close(1, TimeUnit.SECONDS);
113 Assert.assertEquals(1, stuck.size());
118 public void testSendBatchWithResponseContCambria() throws Exception {
120 setUp("application/cambria-zip");
122 final MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
123 .createBatchingPublisher(outFile.getPath());
125 // publish some messages
126 final JSONObject msg1 = new JSONObject();
127 pub.send("MyPartitionKey", msg1.toString());
129 final List<Message> stuck = pub.close(1, TimeUnit.SECONDS);
130 Assert.assertEquals(1, stuck.size());
135 public void testSendBatchWithResponseProtKey() throws Exception {
139 final MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
140 .createBatchingPublisher(outFile.getPath());
141 pub.setProtocolFlag(ProtocolType.AUTH_KEY.getValue());
142 // publish some messages
143 final JSONObject msg1 = new JSONObject();
144 pub.send("MyPartitionKey", msg1.toString());
146 final List<Message> stuck = pub.close(1, TimeUnit.SECONDS);
147 Assert.assertEquals(1, stuck.size());
152 public void testSendBatchWithResponseProtAaf() throws Exception {
156 final MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
157 .createBatchingPublisher(outFile.getPath());
158 pub.setProtocolFlag(ProtocolType.AAF_AUTH.getValue());
159 // publish some messages
160 final JSONObject msg1 = new JSONObject();
161 pub.send("MyPartitionKey", msg1.toString());
163 final List<Message> stuck = pub.close(1, TimeUnit.SECONDS);
164 Assert.assertEquals(1, stuck.size());
169 public void testSendBatchWithResponseProtNoAuth() throws Exception {
173 final MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
174 .createBatchingPublisher(outFile.getPath());
175 pub.setProtocolFlag(ProtocolType.HTTPNOAUTH.getValue());
176 // publish some messages
177 final JSONObject msg1 = new JSONObject();
178 pub.send("MyPartitionKey", msg1.toString());
180 final List<Message> stuck = pub.close(1, TimeUnit.SECONDS);
181 Assert.assertEquals(1, stuck.size());
186 public void testSendBatchWithResponsecontypeText() throws Exception {
190 final MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
191 .createBatchingPublisher(outFile.getPath(), true);
193 // publish some messages
194 final JSONObject msg1 = new JSONObject();
195 pub.send("MyPartitionKey", "payload");
196 MRPublisherResponse pubResponse = new MRPublisherResponse();
197 pub.setPubResponse(pubResponse);
199 MRPublisherResponse mrPublisherResponse = pub.sendBatchWithResponse();
200 Assert.assertEquals(1, mrPublisherResponse.getPendingMsgs());
205 public void testSendBatchWithResponsecontypeCambria() throws Exception {
207 setUp("application/cambria-zip");
209 final MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
210 .createBatchingPublisher(outFile.getPath(), true);
212 // publish some messages
213 final JSONObject msg1 = new JSONObject();
214 pub.send("MyPartitionKey", "payload");
215 MRPublisherResponse pubResponse = new MRPublisherResponse();
216 pub.setPubResponse(pubResponse);
218 MRPublisherResponse mrPublisherResponse = pub.sendBatchWithResponse();
219 Assert.assertEquals(1, mrPublisherResponse.getPendingMsgs());
224 public void testSendBatchWithResponsePrAuthKey() throws Exception {
228 final MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
229 .createBatchingPublisher(outFile.getPath(), true);
230 pub.setProtocolFlag(ProtocolType.AUTH_KEY.getValue());
232 // publish some messages
233 final JSONObject msg1 = new JSONObject();
234 pub.send("MyPartitionKey", msg1.toString());
235 MRPublisherResponse pubResponse = new MRPublisherResponse();
236 pub.setPubResponse(pubResponse);
238 MRPublisherResponse mrPublisherResponse = pub.sendBatchWithResponse();
239 Assert.assertEquals(1, mrPublisherResponse.getPendingMsgs());
244 public void testSendBatchWithResponsePrAaf() throws Exception {
248 final MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
249 .createBatchingPublisher(outFile.getPath(), true);
250 pub.setProtocolFlag(ProtocolType.AAF_AUTH.getValue());
252 // publish some messages
253 final JSONObject msg1 = new JSONObject();
254 pub.send("MyPartitionKey", msg1.toString());
255 MRPublisherResponse pubResponse = new MRPublisherResponse();
256 pub.setPubResponse(pubResponse);
258 MRPublisherResponse mrPublisherResponse = pub.sendBatchWithResponse();
259 Assert.assertEquals(1, mrPublisherResponse.getPendingMsgs());
264 public void testSendBatchWithResponsePrNoauth() throws Exception {
268 final MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
269 .createBatchingPublisher(outFile.getPath(), true);
270 pub.setProtocolFlag(ProtocolType.HTTPNOAUTH.getValue());
272 // publish some messages
273 final JSONObject msg1 = new JSONObject();
274 pub.send("MyPartitionKey", msg1.toString());
275 MRPublisherResponse pubResponse = new MRPublisherResponse();
276 pub.setPubResponse(pubResponse);
278 MRPublisherResponse mrPublisherResponse = pub.sendBatchWithResponse();
279 Assert.assertEquals(1, mrPublisherResponse.getPendingMsgs());
284 public void createPublisherResponse() throws Exception {
286 MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
287 .createBatchingPublisher(outFile.getPath(), true);
289 MRPublisherResponse response = pub.createMRPublisherResponse("{\"message\": \"published the message\", \"status\": \"200\"}", new MRPublisherResponse());
290 assertEquals("200", response.getResponseCode());
295 public void createPublisherResponseSucc() throws Exception {
297 MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
298 .createBatchingPublisher(outFile.getPath(), true);
300 MRPublisherResponse response = pub.createMRPublisherResponse("{\"fakemessage\": \"published the message\", \"fakestatus\": \"200\"}", new MRPublisherResponse());
301 assertEquals("200", response.getResponseCode());
306 public void createPublisherResponseError() throws Exception {
308 MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
309 .createBatchingPublisher(outFile.getPath(), true);
311 MRPublisherResponse response = pub.createMRPublisherResponse("", new MRPublisherResponse());
312 assertEquals("400", response.getResponseCode());