1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.service;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.util.ArrayList;
27 import java.util.Date;
28 import java.util.List;
30 import javax.servlet.http.HttpServletRequest;
31 import javax.servlet.http.HttpServletResponse;
32 import javax.ws.rs.POST;
33 import javax.ws.rs.Path;
34 import javax.ws.rs.Produces;
35 import javax.ws.rs.core.Context;
36 import org.json.JSONObject;
37 import org.apache.commons.io.IOUtils;
38 import org.apache.commons.lang.StringUtils;
39 import org.apache.http.HttpStatus;
41 import com.att.eelf.configuration.EELFLogger;
42 import com.att.eelf.configuration.EELFManager;
43 import org.springframework.beans.factory.annotation.Autowired;
44 import org.springframework.beans.factory.annotation.Qualifier;
45 import org.springframework.stereotype.Component;
47 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
48 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder;
49 import org.onap.dmaap.dmf.mr.utils.Utils;
50 import com.att.nsa.configs.ConfigDbException;
51 import org.onap.dmaap.mmagent.*;
52 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
53 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
54 import com.google.gson.Gson;
55 import com.google.gson.JsonSyntaxException;
57 import edu.emory.mathcs.backport.java.util.Arrays;
59 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
60 import org.onap.dmaap.dmf.mr.CambriaApiException;
61 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
63 import org.json.JSONArray;
64 import org.json.JSONException;
65 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
66 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
67 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
68 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
69 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
70 import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException;
71 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
72 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
73 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
74 import org.onap.dmaap.dmf.mr.service.MMService;
77 * Rest Service class for Mirror Maker proxy Rest Services
79 * @author <a href="mailto:"></a>
85 public class MMRestService {
87 private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(MMRestService.class);
88 private static final String NO_ADMIN_PERMISSION = "No Mirror Maker Admin permission.";
89 private static final String NO_USER_PERMISSION = "No Mirror Maker User permission.";
90 private static final String NO_USER_CREATE_PERMISSION = "No Mirror Maker User Create permission.";
91 private static final String NAME_DOES_NOT_MEET_REQUIREMENT = "Mirror Maker name can only contain alpha numeric";
92 private static final String INVALID_IPPORT = "This is not a valid IP:Port";
93 private static final String MIRROR_MAKERADMIN = "msgRtr.mirrormakeradmin.aaf";
94 private static final String MIRROR_MAKERUSER = "msgRtr.mirrormakeruser.aaf";
95 private static final String UTF_8 = "UTF-8";
96 private static final String MESSAGE = "message";
97 private static final String LISTMIRRORMAKER = "listMirrorMaker";
98 private static final String ERROR = "error";
99 private static final String NAMESPACE = "namespace";
101 private String topic;
103 private String consumergroup;
104 private String consumerid;
107 @Qualifier("configurationReader")
108 private ConfigurationReader configReader;
111 private HttpServletRequest request;
114 private HttpServletResponse response;
117 private MMService mirrorService;
120 private DMaaPErrorMessages errorMessages;
122 private ErrorResponse errResJson = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
123 DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), "", null, Utils.getFormattedDate(new Date()), topic,
124 null, null, "mirrorMakerAgent", null);
126 private DMaaPAAFAuthenticator dmaapAAFauthenticator = new DMaaPAAFAuthenticatorImpl();
129 * This method is used for taking Configuration Object,HttpServletRequest
130 * Object,HttpServletRequest HttpServletResponse Object,HttpServletSession
133 * @return DMaaPContext object from where user can get Configuration
134 * Object,HttpServlet Object
137 private DMaaPContext getDmaapContext() {
138 DMaaPContext dmaapContext = new DMaaPContext();
139 dmaapContext.setRequest(request);
140 dmaapContext.setResponse(response);
141 dmaapContext.setConfigReader(configReader);
142 dmaapContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
148 @Produces("application/json")
150 public void callCreateMirrorMaker(InputStream msg) throws Exception {
152 DMaaPContext ctx = getDmaapContext();
153 if (checkMirrorMakerPermission(ctx,
154 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeradmin.aaf"))) {
158 String randomStr = getRandomNum();
160 InputStream inStream = null;
161 Gson gson = new Gson();
162 CreateMirrorMaker createMirrorMaker = new CreateMirrorMaker();
163 LOGGER.info("Starting Create MirrorMaker");
165 input = IOUtils.toString(msg, "UTF-8");
167 if (input != null && input.length() > 0) {
168 input = removeExtraChar(input);
171 // Check if the request has CreateMirrorMaker
173 createMirrorMaker = gson.fromJson(input, CreateMirrorMaker.class);
175 } catch (JsonSyntaxException ex) {
177 errResJson.setErrorMessage(errorMessages.getIncorrectJson() + ex.getMessage());
178 LOGGER.info(errResJson.toString());
179 throw new CambriaApiException(errResJson);
182 // send error message if it is not a CreateMirrorMaker request.
183 if (createMirrorMaker.getCreateMirrorMaker() == null) {
185 errResJson.setErrorMessage("This is not a CreateMirrorMaker request. Please try again.");
186 LOGGER.info(errResJson.toString());
187 throw new CambriaApiException(errResJson);
189 createMirrorMaker.validateJSON();
192 String name = createMirrorMaker.getCreateMirrorMaker().getName();
194 // if empty, blank name is entered
195 if (StringUtils.isBlank(name)) {
197 errResJson.setErrorMessage("Name can not be empty or blank.");
198 LOGGER.info(errResJson.toString());
199 throw new CambriaApiException(errResJson);
202 // Check if the name contains only Alpha Numeric
203 else if (!isAlphaNumeric(name)) {
205 errResJson.setErrorMessage(NAME_DOES_NOT_MEET_REQUIREMENT);
206 LOGGER.info(errResJson.toString());
207 throw new CambriaApiException(errResJson);
211 if (null == createMirrorMaker.getMessageID() || createMirrorMaker.getMessageID().isEmpty()) {
212 createMirrorMaker.setMessageID(randomStr);
214 inStream = IOUtils.toInputStream(gson.toJson(createMirrorMaker), "UTF-8");
215 JSONObject existMirrorMaker = validateMMExists(ctx, name);
216 if (!(boolean) existMirrorMaker.get("exists")) {
217 JSONObject finalJsonObj = callPubSub(createMirrorMaker.getMessageID(), ctx, inStream, name,
219 DMaaPResponseBuilder.respondOk(ctx, finalJsonObj);
222 errResJson.setErrorMessage("MirrorMaker " + name + " already exists");
223 LOGGER.info(errResJson.toString());
224 throw new CambriaApiException(errResJson);
229 } catch (IOException e) {
234 // Send error response if user does not provide Authorization
236 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
237 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_ADMIN_PERMISSION, null,
238 Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
239 ctx.getRequest().getRemoteHost());
240 LOGGER.info(errRes.toString());
241 throw new CambriaApiException(errRes);
247 @Produces("application/json")
249 public void callListAllMirrorMaker(InputStream msg) throws Exception {
251 DMaaPContext ctx = getDmaapContext();
253 if (checkMirrorMakerPermission(ctx,
254 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeradmin.aaf"))) {
259 Gson gson = new Gson();
262 input = IOUtils.toString(msg, "UTF-8");
264 if (input != null && input.length() > 0) {
265 input = removeExtraChar(input);
268 String randomStr = getRandomNum();
269 JSONObject jsonOb = null;
272 jsonOb = new JSONObject(input);
274 } catch (JSONException ex) {
275 errResJson.setErrorMessage(errorMessages.getIncorrectJson());
276 LOGGER.info(errResJson.toString());
277 throw new CambriaApiException(errResJson);
280 // Check if request has listAllMirrorMaker and
281 // listAllMirrorMaker is empty
282 if (jsonOb.has("listAllMirrorMaker") && jsonOb.getJSONObject("listAllMirrorMaker").length() == 0) {
284 if (!jsonOb.has("messageID") || jsonOb.isNull("messageID")) {
285 jsonOb.put("messageID", randomStr);
288 InputStream inStream = null;
289 MirrorMaker mirrormaker = gson.fromJson(input, MirrorMaker.class);
292 inStream = IOUtils.toInputStream(jsonOb.toString(), "UTF-8");
294 } catch (IOException ioe) {
298 JSONObject responseJson = callPubSub(jsonOb.getString("messageID"), ctx, inStream, mirrormaker.name,
300 DMaaPResponseBuilder.respondOk(ctx, responseJson);
304 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
305 DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
306 "This is not a ListAllMirrorMaker request. Please try again.", null,
307 Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
308 ctx.getRequest().getRemoteHost());
309 LOGGER.info(errRes.toString());
310 throw new CambriaApiException(errRes);
313 } catch (IOException ioe) {
320 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
321 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_ADMIN_PERMISSION, null,
322 Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
323 ctx.getRequest().getRemoteHost());
324 LOGGER.info(errRes.toString());
325 throw new CambriaApiException(errRes);
330 @Produces("application/json")
332 public void callUpdateMirrorMaker(InputStream msg) throws Exception {
333 DMaaPContext ctx = getDmaapContext();
334 if (checkMirrorMakerPermission(ctx,
335 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeradmin.aaf"))) {
339 String randomStr = getRandomNum();
341 InputStream inStream = null;
342 Gson gson = new Gson();
343 UpdateMirrorMaker updateMirrorMaker = new UpdateMirrorMaker();
344 JSONObject jsonOb, jsonObInput = null;
347 input = IOUtils.toString(msg, "UTF-8");
349 if (input != null && input.length() > 0) {
350 input = removeExtraChar(input);
353 // Check if the request has UpdateMirrorMaker
355 updateMirrorMaker = gson.fromJson(input, UpdateMirrorMaker.class);
356 jsonOb = new JSONObject(input);
357 jsonObInput = (JSONObject) jsonOb.get("updateMirrorMaker");
359 } catch (JsonSyntaxException ex) {
361 errResJson.setErrorMessage(errorMessages.getIncorrectJson() + ex.getMessage());
362 LOGGER.info(errResJson.toString());
363 throw new CambriaApiException(errResJson);
366 // send error message if it is not a UpdateMirrorMaker request.
367 if (updateMirrorMaker.getUpdateMirrorMaker() == null) {
369 errResJson.setErrorMessage("This is not a UpdateMirrorMaker request. Please try again.");
370 LOGGER.info(errResJson.toString());
371 throw new CambriaApiException(errResJson);
373 updateMirrorMaker.validateJSON(jsonObInput);
376 String name = updateMirrorMaker.getUpdateMirrorMaker().getName();
377 // if empty, blank name is entered
378 if (StringUtils.isBlank(name)) {
380 errResJson.setErrorMessage("Name can not be empty or blank.");
381 LOGGER.info(errResJson.toString());
382 throw new CambriaApiException(errResJson);
385 // Check if the name contains only Alpha Numeric
386 else if (!isAlphaNumeric(name)) {
387 errResJson.setErrorMessage(NAME_DOES_NOT_MEET_REQUIREMENT);
388 LOGGER.info(errResJson.toString());
389 throw new CambriaApiException(errResJson);
393 // Set a random number as messageID, convert Json Object to
394 // InputStream and finally call publisher and subscriber
397 if (null == updateMirrorMaker.getMessageID() || updateMirrorMaker.getMessageID().isEmpty()) {
398 updateMirrorMaker.setMessageID(randomStr);
401 JSONObject existMirrorMaker = validateMMExists(ctx, name);
403 if ((boolean) existMirrorMaker.get("exists")) {
404 JSONObject existMM = (JSONObject) existMirrorMaker.get("listMirrorMaker");
406 if (!jsonObInput.has("numStreams")) {
407 updateMirrorMaker.getUpdateMirrorMaker().setNumStreams(existMM.getInt("numStreams"));
409 if (!jsonObInput.has("enablelogCheck")) {
410 updateMirrorMaker.getUpdateMirrorMaker()
411 .setEnablelogCheck(existMM.getBoolean("enablelogCheck"));
413 inStream = IOUtils.toInputStream(gson.toJson(updateMirrorMaker), "UTF-8");
414 JSONObject finalJsonObj = callPubSub(updateMirrorMaker.getMessageID(), ctx, inStream, name,
416 DMaaPResponseBuilder.respondOk(ctx, finalJsonObj);
419 errResJson.setErrorMessage("MirrorMaker " + name + " does not exist");
420 LOGGER.info(errResJson.toString());
421 throw new CambriaApiException(errResJson);
427 } catch (IOException e) {
432 // Send error response if user does not provide Authorization
434 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
435 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_ADMIN_PERMISSION, null,
436 Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
437 ctx.getRequest().getRemoteHost());
438 LOGGER.info(errRes.toString());
439 throw new CambriaApiException(errRes);
444 @Produces("application/json")
446 public void callDeleteMirrorMaker(InputStream msg) throws JSONException, Exception {
447 DMaaPContext ctx = getDmaapContext();
449 if (checkMirrorMakerPermission(ctx,
450 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeradmin.aaf"))) {
455 Gson gson = new Gson();
456 MirrorMaker mirrormaker = new MirrorMaker();
459 input = IOUtils.toString(msg, "UTF-8");
461 if (input != null && input.length() > 0) {
462 input = removeExtraChar(input);
465 String randomStr = getRandomNum();
466 JSONObject jsonOb = null;
469 jsonOb = new JSONObject(input);
470 mirrormaker = gson.fromJson(input, MirrorMaker.class);
472 } catch (JSONException ex) {
474 errResJson.setErrorMessage(errorMessages.getIncorrectJson());
475 LOGGER.info(errResJson.toString());
476 throw new CambriaApiException(errResJson);
479 // Check if request has DeleteMirrorMaker and
480 // DeleteMirrorMaker has MirrorMaker object with name variable
481 // and check if the name contain only alpha numeric
483 if (jsonOb.has("deleteMirrorMaker") && jsonOb.getJSONObject("deleteMirrorMaker").length() == 1
484 && jsonOb.getJSONObject("deleteMirrorMaker").has("name")
485 && !StringUtils.isBlank(jsonOb.getJSONObject("deleteMirrorMaker").getString("name"))
486 && isAlphaNumeric(jsonOb.getJSONObject("deleteMirrorMaker").getString("name"))) {
488 if (!jsonOb.has("messageID") || jsonOb.isNull("messageID")) {
489 jsonOb.put("messageID", randomStr);
492 InputStream inStream = null;
495 inStream = IOUtils.toInputStream(jsonOb.toString(), "UTF-8");
497 } catch (IOException ioe) {
498 ioe.printStackTrace();
500 JSONObject deleteMM = (JSONObject) jsonOb.getJSONObject("deleteMirrorMaker");
502 JSONObject existMirrorMaker = validateMMExists(ctx, deleteMM.getString("name"));
504 if ((boolean) existMirrorMaker.get("exists")) {
506 JSONObject finalJsonObj = callPubSub(jsonOb.getString("messageID"), ctx, inStream,
507 mirrormaker.name, false);
508 DMaaPResponseBuilder.respondOk(ctx, finalJsonObj);
511 errResJson.setErrorMessage("MirrorMaker " + deleteMM.getString("name") + " does not exist");
512 LOGGER.info(errResJson.toString());
513 throw new CambriaApiException(errResJson);
519 errResJson.setErrorMessage("This is not a DeleteMirrorMaker request. Please try again.");
520 LOGGER.info(errResJson.toString());
521 throw new CambriaApiException(errResJson);
525 } catch (IOException ioe) {
532 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
533 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_ADMIN_PERMISSION, null,
534 Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
535 ctx.getRequest().getRemoteHost());
536 LOGGER.info(errRes.toString());
537 throw new CambriaApiException(errRes);
541 public boolean isListMirrorMaker(String msg, String messageID) {
542 String topicmsg = msg;
543 topicmsg = removeExtraChar(topicmsg);
544 JSONObject jObj = new JSONObject();
545 JSONArray jArray = null;
546 boolean exist = false;
548 if (!StringUtils.isBlank(topicmsg) && topicmsg.length() > 2) {
549 jArray = new JSONArray(topicmsg);
551 for (int i = 0; i < jArray.length(); i++) {
552 jObj = jArray.getJSONObject(i);
554 if (jObj.has("messageID") && jObj.get("messageID").equals(messageID) && jObj.has("listMirrorMaker")) {
563 private void loadProperty() {
565 this.timeout = Integer.parseInt(
566 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.timeout").trim());
567 this.topic = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.topic").trim();
568 this.consumergroup = AJSCPropertiesMap
569 .getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.consumergroup").trim();
570 this.consumerid = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.consumerid")
574 private String removeExtraChar(String message) {
575 String str = message;
576 str = checkJsonFormate(str);
578 if (str != null && str.length() > 0) {
579 str = str.replace("\\", "");
580 str = str.replace("\"{", "{");
581 str = str.replace("}\"", "}");
586 private String getRandomNum() {
587 long random = Math.round(Math.random() * 89999) + 10000;
588 String strLong = Long.toString(random);
592 private boolean isAlphaNumeric(String name) {
593 String pattern = "^[a-zA-Z0-9]*$";
594 if (name.matches(pattern)) {
600 private String checkJsonFormate(String jsonStr) {
602 String json = jsonStr;
603 if (jsonStr != null && jsonStr.length() > 0 && jsonStr.startsWith("[") && !jsonStr.endsWith("]")) {
609 private boolean checkMirrorMakerPermission(DMaaPContext ctx, String permission) {
611 boolean hasPermission = false;
613 if (dmaapAAFauthenticator.aafAuthentication(ctx.getRequest(), permission)) {
614 hasPermission = true;
616 return hasPermission;
619 public JSONObject callPubSub(String randomstr, DMaaPContext ctx, InputStream inStream, String name, boolean listAll)
622 JSONObject jsonObj = new JSONObject();
623 JSONObject finalJsonObj = new JSONObject();
624 JSONArray jsonArray = null;
626 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
627 mirrorService.pushEvents(ctx, topic, inStream, null, null);
628 long startTime = System.currentTimeMillis();
630 while (!isListMirrorMaker(msgFrmSubscribe, randomstr)
631 && ((System.currentTimeMillis() - startTime) < timeout)) {
632 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
636 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
637 && isListMirrorMaker(msgFrmSubscribe, randomstr)) {
638 msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
640 jsonArray = new JSONArray(msgFrmSubscribe);
641 jsonObj = jsonArray.getJSONObject(0);
642 if (jsonObj.has("listMirrorMaker")) {
643 jsonArray = (JSONArray) jsonObj.get("listMirrorMaker");
644 if (true == listAll) {
647 for (int i = 0; i < jsonArray.length(); i++) {
648 jsonObj = jsonArray.getJSONObject(i);
649 if (null != name && !name.isEmpty()) {
650 if (jsonObj.getString("name").equals(name)) {
651 finalJsonObj.put("listMirrorMaker", jsonObj);
655 finalJsonObj.put("listMirrorMaker", jsonObj);
665 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
666 DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(),
667 "listMirrorMaker is not available, please make sure MirrorMakerAgent is running", null,
668 Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
669 ctx.getRequest().getRemoteHost());
670 LOGGER.info(errRes.toString());
671 throw new CambriaApiException(errRes);
675 } catch (Exception e) {
682 private void sendErrResponse(DMaaPContext ctx, String errMsg) {
683 JSONObject err = new JSONObject();
684 err.append(ERROR, errMsg);
687 DMaaPResponseBuilder.respondOk(ctx, err);
688 LOGGER.error(errMsg);
690 } catch (JSONException | IOException e) {
691 LOGGER.error("Error at sendErrResponse method:" + errMsg + "Exception name:" + e);
695 @SuppressWarnings("unchecked")
697 @Produces("application/json")
698 @Path("/listallwhitelist")
699 public void listWhiteList(InputStream msg) throws Exception {
701 DMaaPContext ctx = getDmaapContext();
702 if (checkMirrorMakerPermission(ctx,
703 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeruser.aaf"))) {
709 input = IOUtils.toString(msg, "UTF-8");
711 if (input != null && input.length() > 0) {
712 input = removeExtraChar(input);
715 // Check if it is correct Json object
716 JSONObject jsonOb = null;
719 jsonOb = new JSONObject(input);
721 } catch (JSONException ex) {
723 errResJson.setErrorMessage(errorMessages.getIncorrectJson());
724 LOGGER.info(errResJson.toString());
725 throw new CambriaApiException(errResJson);
729 // Check if the request has name and name contains only alpha
731 // and check if the request has namespace and namespace contains
732 // only alpha numeric
733 if (jsonOb.length() == 2 && jsonOb.has("name") && !StringUtils.isBlank(jsonOb.getString("name"))
734 && isAlphaNumeric(jsonOb.getString("name")) && jsonOb.has("namespace")
735 && !StringUtils.isBlank(jsonOb.getString("namespace"))) {
737 String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
738 "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString("namespace") + "|create";
740 // Check if the user have create permission for the
742 if (checkMirrorMakerPermission(ctx, permission)) {
744 JSONObject listAll = new JSONObject();
745 JSONObject emptyObject = new JSONObject();
747 // Create a listAllMirrorMaker Json object
749 listAll.put("listAllMirrorMaker", emptyObject);
751 } catch (JSONException e) {
753 errResJson.setErrorMessage(errorMessages.getIncorrectJson());
754 LOGGER.info(errResJson.toString());
755 throw new CambriaApiException(errResJson);
758 // set a random number as messageID
759 String randomStr = getRandomNum();
760 listAll.put("messageID", randomStr);
761 InputStream inStream = null;
763 // convert listAll Json object to InputStream object
765 inStream = IOUtils.toInputStream(listAll.toString(), "UTF-8");
767 } catch (IOException ioe) {
768 ioe.printStackTrace();
770 JSONObject listMirrorMaker = new JSONObject();
771 listMirrorMaker = callPubSub(randomStr, ctx, inStream, null, true);
773 String whitelist = null;
774 JSONArray listMMArray = new JSONArray();
775 if (listMirrorMaker.has("listMirrorMaker")) {
776 listMMArray = (JSONArray) listMirrorMaker.get("listMirrorMaker");
777 for (int i = 0; i < listMMArray.length(); i++) {
779 JSONObject mm = new JSONObject();
780 mm = listMMArray.getJSONObject(i);
781 String name = mm.getString("name");
783 if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) {
784 whitelist = mm.getString("whitelist");
789 if (!StringUtils.isBlank(whitelist)) {
791 List<String> topicList = new ArrayList<String>();
792 List<String> finalTopicList = new ArrayList<String>();
793 topicList = Arrays.asList(whitelist.split(","));
795 for (String topic : topicList) {
796 if (topic != null && !topic.equals("null")
797 && getNamespace(topic).equals(jsonOb.getString("namespace"))) {
799 finalTopicList.add(topic);
803 String topicNames = "";
805 if (finalTopicList.size() > 0) {
806 topicNames = StringUtils.join(finalTopicList, ",");
809 JSONObject listAllWhiteList = new JSONObject();
810 listAllWhiteList.put("name", jsonOb.getString("name"));
811 listAllWhiteList.put("whitelist", topicNames);
813 DMaaPResponseBuilder.respondOk(ctx, listAllWhiteList);
818 errResJson.setErrorMessage(
819 "listWhiteList is not available, please make sure MirrorMakerAgent is running");
820 LOGGER.info(errResJson.toString());
821 throw new CambriaApiException(errResJson);
826 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
827 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_USER_CREATE_PERMISSION,
828 null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
829 ctx.getRequest().getRemoteHost());
830 LOGGER.info(errRes.toString());
831 throw new CambriaApiException(errRes);
836 errResJson.setErrorMessage("This is not a ListAllWhitelist request. Please try again.");
837 LOGGER.info(errResJson.toString());
838 throw new CambriaApiException(errResJson);
841 } catch (IOException e) {
846 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
847 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_USER_PERMISSION, null,
848 Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
849 ctx.getRequest().getRemoteHost());
850 LOGGER.info(errRes.toString());
851 throw new CambriaApiException(errRes);
855 @SuppressWarnings("unchecked")
857 @Produces("application/json")
858 @Path("/createwhitelist")
859 public void createWhiteList(InputStream msg) throws Exception {
861 DMaaPContext ctx = getDmaapContext();
862 if (checkMirrorMakerPermission(ctx,
863 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeruser.aaf"))) {
869 input = IOUtils.toString(msg, "UTF-8");
871 if (input != null && input.length() > 0) {
872 input = removeExtraChar(input);
875 // Check if it is correct Json object
876 JSONObject jsonOb = null;
879 jsonOb = new JSONObject(input);
881 } catch (JSONException ex) {
882 errResJson.setErrorMessage(errorMessages.getIncorrectJson());
883 LOGGER.info(errResJson.toString());
884 throw new CambriaApiException(errResJson);
887 // Check if the request has name and name contains only alpha
889 // check if the request has namespace and
890 // check if the request has whitelistTopicName
891 // check if the topic name contains only alpha numeric
892 if (jsonOb.length() == 3 && jsonOb.has("name") && !StringUtils.isBlank(jsonOb.getString("name"))
893 && isAlphaNumeric(jsonOb.getString("name")) && jsonOb.has("namespace")
894 && !StringUtils.isBlank(jsonOb.getString("namespace")) && jsonOb.has("whitelistTopicName")
895 && !StringUtils.isBlank(jsonOb.getString("whitelistTopicName"))
896 && isAlphaNumeric(jsonOb.getString("whitelistTopicName").substring(
897 jsonOb.getString("whitelistTopicName").lastIndexOf(".") + 1, jsonOb
898 .getString("whitelistTopicName").length()))) {
900 String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
901 "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString("namespace") + "|create";
903 // Check if the user have create permission for the
905 if (checkMirrorMakerPermission(ctx, permission)) {
907 JSONObject listAll = new JSONObject();
908 JSONObject emptyObject = new JSONObject();
910 // Create a listAllMirrorMaker Json object
912 listAll.put("listAllMirrorMaker", emptyObject);
914 } catch (JSONException e) {
916 errResJson.setErrorMessage(errorMessages.getIncorrectJson());
917 LOGGER.info(errResJson.toString());
918 throw new CambriaApiException(errResJson);
921 // set a random number as messageID
922 String randomStr = getRandomNum();
923 listAll.put("messageID", randomStr);
924 InputStream inStream = null;
926 // convert listAll Json object to InputStream object
928 inStream = IOUtils.toInputStream(listAll.toString(), "UTF-8");
930 } catch (IOException ioe) {
931 ioe.printStackTrace();
933 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
934 // call listAllMirrorMaker
935 mirrorService.pushEvents(ctx, topic, inStream, null, null);
937 // subscribe for listMirrorMaker
938 long startTime = System.currentTimeMillis();
940 while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
941 && (System.currentTimeMillis() - startTime) < timeout) {
942 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
945 JSONArray listMirrorMaker = null;
947 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
948 && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
950 listMirrorMaker = getListMirrorMaker(msgFrmSubscribe, randomStr);
951 String whitelist = null;
953 for (int i = 0; i < listMirrorMaker.length(); i++) {
954 JSONObject mm = new JSONObject();
955 mm = listMirrorMaker.getJSONObject(i);
956 String name = mm.getString("name");
958 if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) {
959 whitelist = mm.getString("whitelist");
964 List<String> topicList = new ArrayList<String>();
965 List<String> finalTopicList = new ArrayList<String>();
967 if (whitelist != null) {
968 topicList = Arrays.asList(whitelist.split(","));
971 for (String st : topicList) {
972 if (!StringUtils.isBlank(st)) {
973 finalTopicList.add(st);
977 String newTopic = jsonOb.getString("whitelistTopicName");
979 if (!topicList.contains(newTopic)
980 && getNamespace(newTopic).equals(jsonOb.getString("namespace"))) {
982 UpdateWhiteList updateWhiteList = new UpdateWhiteList();
983 MirrorMaker mirrorMaker = new MirrorMaker();
984 mirrorMaker.setName(jsonOb.getString("name"));
985 finalTopicList.add(newTopic);
986 String newWhitelist = "";
988 if (finalTopicList.size() > 0) {
989 newWhitelist = StringUtils.join(finalTopicList, ",");
992 mirrorMaker.setWhitelist(newWhitelist);
994 String newRandom = getRandomNum();
995 updateWhiteList.setMessageID(newRandom);
996 updateWhiteList.setUpdateWhiteList(mirrorMaker);
999 g.toJson(updateWhiteList);
1000 InputStream inputStream = null;
1001 inputStream = IOUtils.toInputStream(g.toJson(updateWhiteList), "UTF-8");
1002 // callPubSub(newRandom, ctx, inputStream);
1003 callPubSubForWhitelist(newRandom, ctx, inputStream, jsonOb);
1005 } else if (topicList.contains(newTopic)) {
1006 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR,
1007 DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), "The topic already exist.",
1008 null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
1009 ctx.getRequest().getRemoteHost());
1010 LOGGER.info(errRes.toString());
1011 throw new CambriaApiException(errRes);
1013 } else if (!getNamespace(newTopic).equals(jsonOb.getString("namespace"))) {
1014 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR,
1015 DMaaPResponseCode.INCORRECT_JSON.getResponseCode(),
1016 "The namespace of the topic does not match with the namespace you provided.",
1017 null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
1018 ctx.getRequest().getRemoteHost());
1019 LOGGER.info(errRes.toString());
1020 throw new CambriaApiException(errRes);
1024 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
1025 DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
1026 "listWhiteList is not available, please make sure MirrorMakerAgent is running",
1027 null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
1028 ctx.getRequest().getRemoteHost());
1029 LOGGER.info(errRes.toString());
1030 throw new CambriaApiException(errRes);
1034 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
1035 DMaaPResponseCode.UNABLE_TO_AUTHORIZE.getResponseCode(), NO_USER_CREATE_PERMISSION,
1036 null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
1037 ctx.getRequest().getRemoteHost());
1038 LOGGER.info(errRes.toString());
1039 throw new CambriaApiException(errRes);
1044 errResJson.setErrorMessage("This is not a createWhitelist request. Please try again.");
1045 LOGGER.info(errResJson.toString());
1046 throw new CambriaApiException(errResJson);
1049 } catch (IOException | CambriaApiException | ConfigDbException | AccessDeniedException
1050 | TopicExistsException | missingReqdSetting | UnavailableException e) {
1055 // Send error response if user does not provide Authorization
1057 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
1058 DMaaPResponseCode.UNABLE_TO_AUTHORIZE.getResponseCode(), NO_USER_PERMISSION, null,
1059 Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
1060 ctx.getRequest().getRemoteHost());
1061 LOGGER.info(errRes.toString());
1062 throw new CambriaApiException(errRes);
1067 @SuppressWarnings("unchecked")
1069 @Produces("application/json")
1070 @Path("/deletewhitelist")
1071 public void deleteWhiteList(InputStream msg) throws Exception {
1073 DMaaPContext ctx = getDmaapContext();
1074 if (checkMirrorMakerPermission(ctx,
1075 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeruser.aaf"))) {
1078 String input = null;
1081 input = IOUtils.toString(msg, "UTF-8");
1083 if (input != null && input.length() > 0) {
1084 input = removeExtraChar(input);
1087 // Check if it is correct Json object
1088 JSONObject jsonOb = null;
1091 jsonOb = new JSONObject(input);
1093 } catch (JSONException ex) {
1095 errResJson.setErrorMessage(errorMessages.getIncorrectJson());
1096 LOGGER.info(errResJson.toString());
1097 throw new CambriaApiException(errResJson);
1101 // Check if the request has name and name contains only alpha
1103 // check if the request has namespace and
1104 // check if the request has whitelistTopicName
1105 if (jsonOb.length() == 3 && jsonOb.has("name") && isAlphaNumeric(jsonOb.getString("name"))
1106 && jsonOb.has("namespace") && jsonOb.has("whitelistTopicName")
1107 && isAlphaNumeric(jsonOb.getString("whitelistTopicName").substring(
1108 jsonOb.getString("whitelistTopicName").lastIndexOf(".") + 1,
1109 jsonOb.getString("whitelistTopicName").length()))) {
1111 String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
1112 "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString("namespace") + "|create";
1114 // Check if the user have create permission for the
1116 if (checkMirrorMakerPermission(ctx, permission)) {
1118 JSONObject listAll = new JSONObject();
1119 JSONObject emptyObject = new JSONObject();
1121 // Create a listAllMirrorMaker Json object
1123 listAll.put("listAllMirrorMaker", emptyObject);
1125 } catch (JSONException e) {
1127 errResJson.setErrorMessage(errorMessages.getIncorrectJson());
1128 LOGGER.info(errResJson.toString());
1129 throw new CambriaApiException(errResJson);
1132 // set a random number as messageID
1133 String randomStr = getRandomNum();
1134 listAll.put("messageID", randomStr);
1135 InputStream inStream = null;
1137 // convert listAll Json object to InputStream object
1139 inStream = IOUtils.toInputStream(listAll.toString(), "UTF-8");
1141 } catch (IOException ioe) {
1142 ioe.printStackTrace();
1144 // call listAllMirrorMaker
1145 mirrorService.pushEvents(ctx, topic, inStream, null, null);
1147 // subscribe for listMirrorMaker
1148 long startTime = System.currentTimeMillis();
1149 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1151 while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
1152 && (System.currentTimeMillis() - startTime) < timeout) {
1153 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1156 JSONObject jsonObj = new JSONObject();
1157 JSONArray jsonArray = null;
1158 JSONArray listMirrorMaker = null;
1160 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
1161 && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
1162 msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
1163 jsonArray = new JSONArray(msgFrmSubscribe);
1165 for (int i = 0; i < jsonArray.length(); i++) {
1166 jsonObj = jsonArray.getJSONObject(i);
1168 if (jsonObj.has("messageID") && jsonObj.get("messageID").equals(randomStr)
1169 && jsonObj.has("listMirrorMaker")) {
1170 listMirrorMaker = jsonObj.getJSONArray("listMirrorMaker");
1174 String whitelist = null;
1175 for (int i = 0; i < listMirrorMaker.length(); i++) {
1177 JSONObject mm = new JSONObject();
1178 mm = listMirrorMaker.getJSONObject(i);
1179 String name = mm.getString("name");
1181 if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) {
1182 whitelist = mm.getString("whitelist");
1187 List<String> topicList = new ArrayList<String>();
1189 if (whitelist != null) {
1190 topicList = Arrays.asList(whitelist.split(","));
1192 boolean removeTopic = false;
1193 String topicToRemove = jsonOb.getString("whitelistTopicName");
1195 if (topicList.contains(topicToRemove)) {
1198 errResJson.setErrorMessage(errorMessages.getTopicNotExist());
1199 LOGGER.info(errResJson.toString());
1200 throw new CambriaApiException(errResJson);
1204 UpdateWhiteList updateWhiteList = new UpdateWhiteList();
1205 MirrorMaker mirrorMaker = new MirrorMaker();
1207 mirrorMaker.setName(jsonOb.getString("name"));
1209 if (StringUtils.isNotBlank((removeTopic(whitelist, topicToRemove)))) {
1210 mirrorMaker.setWhitelist(removeTopic(whitelist, topicToRemove));
1213 String newRandom = getRandomNum();
1215 updateWhiteList.setMessageID(newRandom);
1216 updateWhiteList.setUpdateWhiteList(mirrorMaker);
1218 Gson g = new Gson();
1219 g.toJson(updateWhiteList);
1221 InputStream inputStream = null;
1222 inputStream = IOUtils.toInputStream(g.toJson(updateWhiteList), "UTF-8");
1223 callPubSubForWhitelist(newRandom, ctx, inputStream, jsonOb);
1224 // mmAgentUtil.getNamespace(topicToRemove));
1229 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
1230 DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
1231 "listWhiteList is not available, please make sure MirrorMakerAgent is running",
1232 null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
1233 ctx.getRequest().getRemoteHost());
1234 LOGGER.info(errRes.toString());
1235 throw new CambriaApiException(errRes);
1239 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
1240 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_USER_CREATE_PERMISSION,
1241 null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
1242 ctx.getRequest().getRemoteHost());
1243 LOGGER.info(errRes.toString());
1244 throw new CambriaApiException(errRes);
1249 errResJson.setErrorMessage("This is not a DeleteAllWhitelist request. Please try again.");
1250 LOGGER.info(errResJson.toString());
1251 throw new CambriaApiException(errResJson);
1255 } catch (IOException | CambriaApiException | ConfigDbException | AccessDeniedException
1256 | TopicExistsException | missingReqdSetting | UnavailableException e) {
1261 // Send error response if user does not provide Authorization
1263 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
1264 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_USER_PERMISSION, null,
1265 Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
1266 ctx.getRequest().getRemoteHost());
1267 LOGGER.info(errRes.toString());
1268 throw new CambriaApiException(errRes);
1273 private String getNamespace(String topic) {
1274 return topic.substring(0, topic.lastIndexOf("."));
1277 private String removeTopic(String whitelist, String topicToRemove) {
1278 List<String> topicList = new ArrayList<>();
1279 List<String> newTopicList = new ArrayList<>();
1281 if (whitelist.contains(",")) {
1282 topicList = Arrays.asList(whitelist.split(","));
1286 if (topicList.contains(topicToRemove)) {
1287 for (String topic : topicList) {
1288 if (!topic.equals(topicToRemove)) {
1289 newTopicList.add(topic);
1294 String newWhitelist = StringUtils.join(newTopicList, ",");
1296 return newWhitelist;
1299 public void callPubSubForWhitelist(String randomStr, DMaaPContext ctx, InputStream inStream, JSONObject jsonOb) {
1303 String namespace = jsonOb.getString("namespace");
1304 String mmName = jsonOb.getString("name");
1306 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1307 mirrorService.pushEvents(ctx, topic, inStream, null, null);
1308 long startTime = System.currentTimeMillis();
1310 while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
1311 && (System.currentTimeMillis() - startTime) < timeout) {
1312 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1315 JSONObject jsonObj = new JSONObject();
1316 JSONArray jsonArray = null;
1317 JSONArray jsonArrayNamespace = null;
1319 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
1320 && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
1321 msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
1322 jsonArray = new JSONArray(msgFrmSubscribe);
1324 for (int i = 0; i < jsonArray.length(); i++) {
1325 jsonObj = jsonArray.getJSONObject(i);
1327 if (jsonObj.has("messageID") && jsonObj.get("messageID").equals(randomStr)
1328 && jsonObj.has("listMirrorMaker")) {
1329 jsonArrayNamespace = jsonObj.getJSONArray("listMirrorMaker");
1332 JSONObject finalJasonObj = new JSONObject();
1333 JSONArray finalJsonArray = new JSONArray();
1335 for (int i = 0; i < jsonArrayNamespace.length(); i++) {
1337 JSONObject mmObj = new JSONObject();
1338 mmObj = jsonArrayNamespace.getJSONObject(i);
1339 if (mmObj.has("name") && mmName.equals(mmObj.getString("name"))) {
1341 finalJsonArray.put(mmObj);
1345 finalJasonObj.put("listMirrorMaker", finalJsonArray);
1347 DMaaPResponseBuilder.respondOk(ctx, finalJasonObj);
1351 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
1352 DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(),
1353 "listMirrorMaker is not available, please make sure MirrorMakerAgent is running", null,
1354 Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
1355 ctx.getRequest().getRemoteHost());
1356 LOGGER.info(errRes.toString());
1357 throw new CambriaApiException(errRes);
1360 } catch (Exception e) {
1361 e.printStackTrace();
1366 public JSONArray getListMirrorMaker(String msgFrmSubscribe, String randomStr) {
1367 JSONObject jsonObj = new JSONObject();
1368 JSONArray jsonArray = new JSONArray();
1369 JSONArray listMirrorMaker = new JSONArray();
1371 msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
1372 jsonArray = new JSONArray(msgFrmSubscribe);
1373 jsonObj = jsonArray.getJSONObject(0);
1375 for (int i = 0; i < jsonArray.length(); i++) {
1376 jsonObj = jsonArray.getJSONObject(i);
1378 if (jsonObj.has("messageID") && jsonObj.get("messageID").equals(randomStr) && jsonObj.has("listMirrorMaker")) {
1379 listMirrorMaker = jsonObj.getJSONArray("listMirrorMaker");
1383 return listMirrorMaker;
1387 public JSONObject validateMMExists(DMaaPContext ctx, String name) throws Exception {
1388 // Create a listAllMirrorMaker Json object
1389 JSONObject listAll = new JSONObject();
1391 listAll.put("listAllMirrorMaker", new JSONObject());
1393 } catch (JSONException e) {
1395 e.printStackTrace();
1398 // set a random number as messageID
1399 String randomStr = getRandomNum();
1400 listAll.put("messageID", randomStr);
1401 InputStream inStream = null;
1403 // convert listAll Json object to InputStream object
1405 inStream = IOUtils.toInputStream(listAll.toString(), "UTF-8");
1407 } catch (IOException ioe) {
1408 ioe.printStackTrace();
1410 JSONObject listMirrorMaker = new JSONObject();
1411 listMirrorMaker = callPubSub(randomStr, ctx, inStream, name, false);
1412 if (null != listMirrorMaker && listMirrorMaker.length() > 0) {
1413 listMirrorMaker.put("exists", true);
1414 return listMirrorMaker;
1417 listMirrorMaker.put("exists", false);
1418 return listMirrorMaker;