1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.service;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.util.ArrayList;
27 import java.util.Date;
28 import java.util.List;
30 import javax.servlet.http.HttpServletRequest;
31 import javax.servlet.http.HttpServletResponse;
32 import javax.ws.rs.POST;
33 import javax.ws.rs.Path;
34 import javax.ws.rs.Produces;
35 import javax.ws.rs.core.Context;
36 import org.json.JSONObject;
37 import org.apache.commons.io.IOUtils;
38 import org.apache.commons.lang.StringUtils;
39 import org.apache.http.HttpStatus;
41 import com.att.eelf.configuration.EELFLogger;
42 import com.att.eelf.configuration.EELFManager;
43 import org.springframework.beans.factory.annotation.Autowired;
44 import org.springframework.beans.factory.annotation.Qualifier;
45 import org.springframework.stereotype.Component;
47 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
48 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder;
49 import org.onap.dmaap.dmf.mr.utils.Utils;
50 import com.att.nsa.configs.ConfigDbException;
51 import org.onap.dmaap.mmagent.*;
52 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
53 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
54 import com.google.gson.Gson;
55 import com.google.gson.JsonSyntaxException;
57 import edu.emory.mathcs.backport.java.util.Arrays;
59 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
60 import org.onap.dmaap.dmf.mr.CambriaApiException;
61 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
63 import org.json.JSONArray;
64 import org.json.JSONException;
65 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
66 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
67 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
68 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
69 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
70 import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException;
71 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
72 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
73 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
74 import org.onap.dmaap.dmf.mr.service.MMService;
77 * Rest Service class for Mirror Maker proxy Rest Services
79 * @author <a href="mailto:"></a>
85 public class MMRestService {
87 private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(MMRestService.class);
88 private static final String NO_ADMIN_PERMISSION = "No Mirror Maker Admin permission.";
89 private static final String NO_USER_PERMISSION = "No Mirror Maker User permission.";
90 private static final String NO_USER_CREATE_PERMISSION = "No Mirror Maker User Create permission.";
91 private static final String NAME_DOES_NOT_MEET_REQUIREMENT = "Mirror Maker name can only contain alpha numeric";
92 private static final String INVALID_IPPORT = "This is not a valid IP:Port";
93 private static final String MIRROR_MAKERADMIN = "msgRtr.mirrormakeradmin.aaf";
94 private static final String MIRROR_MAKERUSER = "msgRtr.mirrormakeruser.aaf";
95 private static final String UTF_8 = "UTF-8";
96 private static final String MESSAGE = "message";
97 private static final String LISTMIRRORMAKER = "listMirrorMaker";
98 private static final String ERROR = "error";
99 private static final String NAMESPACE = "namespace";
101 private String topic;
103 private String consumergroup;
104 private String consumerid;
107 @Qualifier("configurationReader")
108 private ConfigurationReader configReader;
111 private HttpServletRequest request;
114 private HttpServletResponse response;
117 private MMService mirrorService;
120 private DMaaPErrorMessages errorMessages;
122 private ErrorResponse errResJson = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
123 DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), "", null, Utils.getFormattedDate(new Date()), topic,
124 null, null, "mirrorMakerAgent", null);
126 private DMaaPAAFAuthenticator dmaapAAFauthenticator = new DMaaPAAFAuthenticatorImpl();
129 * This method is used for taking Configuration Object,HttpServletRequest
130 * Object,HttpServletRequest HttpServletResponse Object,HttpServletSession
133 * @return DMaaPContext object from where user can get Configuration
134 * Object,HttpServlet Object
137 private DMaaPContext getDmaapContext() {
138 DMaaPContext dmaapContext = new DMaaPContext();
139 dmaapContext.setRequest(request);
140 dmaapContext.setResponse(response);
141 dmaapContext.setConfigReader(configReader);
142 dmaapContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
148 @Produces("application/json")
150 public void callCreateMirrorMaker(InputStream msg) throws Exception {
152 DMaaPContext ctx = getDmaapContext();
153 if (checkMirrorMakerPermission(ctx,
154 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeradmin.aaf"))) {
158 String randomStr = getRandomNum();
160 InputStream inStream = null;
161 Gson gson = new Gson();
162 CreateMirrorMaker createMirrorMaker = new CreateMirrorMaker();
163 LOGGER.info("Starting Create MirrorMaker");
165 input = IOUtils.toString(msg, "UTF-8");
167 if (input != null && input.length() > 0) {
168 input = removeExtraChar(input);
171 // Check if the request has CreateMirrorMaker
173 createMirrorMaker = gson.fromJson(input, CreateMirrorMaker.class);
175 } catch (JsonSyntaxException ex) {
177 errResJson.setErrorMessage(errorMessages.getIncorrectJson() + ex.getMessage());
178 LOGGER.info(errResJson.toString());
179 throw new CambriaApiException(errResJson);
182 // send error message if it is not a CreateMirrorMaker request.
183 if (createMirrorMaker.getCreateMirrorMaker() == null) {
185 errResJson.setErrorMessage("This is not a CreateMirrorMaker request. Please try again.");
186 LOGGER.info(errResJson.toString());
187 throw new CambriaApiException(errResJson);
189 createMirrorMaker.validateJSON();
192 String name = createMirrorMaker.getCreateMirrorMaker().getName();
194 // if empty, blank name is entered
195 if (StringUtils.isBlank(name)) {
197 errResJson.setErrorMessage("Name can not be empty or blank.");
198 LOGGER.info(errResJson.toString());
199 throw new CambriaApiException(errResJson);
202 // Check if the name contains only Alpha Numeric
203 else if (!isAlphaNumeric(name)) {
205 errResJson.setErrorMessage(NAME_DOES_NOT_MEET_REQUIREMENT);
206 LOGGER.info(errResJson.toString());
207 throw new CambriaApiException(errResJson);
211 if (null == createMirrorMaker.getMessageID() || createMirrorMaker.getMessageID().isEmpty()) {
212 createMirrorMaker.setMessageID(randomStr);
214 inStream = IOUtils.toInputStream(gson.toJson(createMirrorMaker), "UTF-8");
215 JSONObject existMirrorMaker = validateMMExists(ctx, name);
216 if (!(boolean) existMirrorMaker.get("exists")) {
217 JSONObject finalJsonObj = callPubSub(createMirrorMaker.getMessageID(), ctx, inStream, name,
219 DMaaPResponseBuilder.respondOk(ctx, finalJsonObj);
222 errResJson.setErrorMessage("MirrorMaker " + name + " already exists");
223 LOGGER.info(errResJson.toString());
224 throw new CambriaApiException(errResJson);
229 } catch (IOException e) {
234 // Send error response if user does not provide Authorization
236 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
237 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_ADMIN_PERMISSION, null,
238 Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
239 ctx.getRequest().getRemoteHost());
240 LOGGER.info(errRes.toString());
241 throw new CambriaApiException(errRes);
247 @Produces("application/json")
249 public void callListAllMirrorMaker(InputStream msg) throws Exception {
251 DMaaPContext ctx = getDmaapContext();
253 if (checkMirrorMakerPermission(ctx,
254 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeradmin.aaf"))) {
259 Gson gson = new Gson();
262 input = IOUtils.toString(msg, "UTF-8");
264 if (input != null && input.length() > 0) {
265 input = removeExtraChar(input);
268 String randomStr = getRandomNum();
269 JSONObject jsonOb = null;
272 jsonOb = new JSONObject(input);
274 } catch (JSONException ex) {
275 errResJson.setErrorMessage(errorMessages.getIncorrectJson());
276 LOGGER.info(errResJson.toString());
277 throw new CambriaApiException(errResJson);
280 // Check if request has listAllMirrorMaker and
281 // listAllMirrorMaker is empty
282 if (jsonOb.has("listAllMirrorMaker") && jsonOb.getJSONObject("listAllMirrorMaker").length() == 0) {
284 if (!jsonOb.has("messageID") || jsonOb.isNull("messageID")) {
285 jsonOb.put("messageID", randomStr);
288 InputStream inStream = null;
289 MirrorMaker mirrormaker = gson.fromJson(input, MirrorMaker.class);
291 inStream = IOUtils.toInputStream(jsonOb.toString(), "UTF-8");
293 JSONObject responseJson = callPubSub(jsonOb.getString("messageID"), ctx, inStream, mirrormaker.name,
295 DMaaPResponseBuilder.respondOk(ctx, responseJson);
299 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
300 DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
301 "This is not a ListAllMirrorMaker request. Please try again.", null,
302 Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
303 ctx.getRequest().getRemoteHost());
304 LOGGER.info(errRes.toString());
305 throw new CambriaApiException(errRes);
308 } catch (IOException ioe) {
315 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
316 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_ADMIN_PERMISSION, null,
317 Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
318 ctx.getRequest().getRemoteHost());
319 LOGGER.info(errRes.toString());
320 throw new CambriaApiException(errRes);
325 @Produces("application/json")
327 public void callUpdateMirrorMaker(InputStream msg) throws Exception {
328 DMaaPContext ctx = getDmaapContext();
329 if (checkMirrorMakerPermission(ctx,
330 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeradmin.aaf"))) {
334 String randomStr = getRandomNum();
336 InputStream inStream = null;
337 Gson gson = new Gson();
338 UpdateMirrorMaker updateMirrorMaker = new UpdateMirrorMaker();
339 JSONObject jsonOb, jsonObInput = null;
342 input = IOUtils.toString(msg, "UTF-8");
344 if (input != null && input.length() > 0) {
345 input = removeExtraChar(input);
348 // Check if the request has UpdateMirrorMaker
350 updateMirrorMaker = gson.fromJson(input, UpdateMirrorMaker.class);
351 jsonOb = new JSONObject(input);
352 jsonObInput = (JSONObject) jsonOb.get("updateMirrorMaker");
354 } catch (JsonSyntaxException ex) {
356 errResJson.setErrorMessage(errorMessages.getIncorrectJson() + ex.getMessage());
357 LOGGER.info(errResJson.toString());
358 throw new CambriaApiException(errResJson);
361 // send error message if it is not a UpdateMirrorMaker request.
362 if (updateMirrorMaker.getUpdateMirrorMaker() == null) {
364 errResJson.setErrorMessage("This is not a UpdateMirrorMaker request. Please try again.");
365 LOGGER.info(errResJson.toString());
366 throw new CambriaApiException(errResJson);
368 updateMirrorMaker.validateJSON(jsonObInput);
371 String name = updateMirrorMaker.getUpdateMirrorMaker().getName();
372 // if empty, blank name is entered
373 if (StringUtils.isBlank(name)) {
375 errResJson.setErrorMessage("Name can not be empty or blank.");
376 LOGGER.info(errResJson.toString());
377 throw new CambriaApiException(errResJson);
380 // Check if the name contains only Alpha Numeric
381 else if (!isAlphaNumeric(name)) {
382 errResJson.setErrorMessage(NAME_DOES_NOT_MEET_REQUIREMENT);
383 LOGGER.info(errResJson.toString());
384 throw new CambriaApiException(errResJson);
388 // Set a random number as messageID, convert Json Object to
389 // InputStream and finally call publisher and subscriber
392 if (null == updateMirrorMaker.getMessageID() || updateMirrorMaker.getMessageID().isEmpty()) {
393 updateMirrorMaker.setMessageID(randomStr);
396 JSONObject existMirrorMaker = validateMMExists(ctx, name);
398 if ((boolean) existMirrorMaker.get("exists")) {
399 JSONObject existMM = (JSONObject) existMirrorMaker.get("listMirrorMaker");
401 if (!jsonObInput.has("numStreams")) {
402 updateMirrorMaker.getUpdateMirrorMaker().setNumStreams(existMM.getInt("numStreams"));
404 if (!jsonObInput.has("enablelogCheck")) {
405 updateMirrorMaker.getUpdateMirrorMaker()
406 .setEnablelogCheck(existMM.getBoolean("enablelogCheck"));
408 inStream = IOUtils.toInputStream(gson.toJson(updateMirrorMaker), "UTF-8");
409 JSONObject finalJsonObj = callPubSub(updateMirrorMaker.getMessageID(), ctx, inStream, name,
411 DMaaPResponseBuilder.respondOk(ctx, finalJsonObj);
414 errResJson.setErrorMessage("MirrorMaker " + name + " does not exist");
415 LOGGER.info(errResJson.toString());
416 throw new CambriaApiException(errResJson);
422 } catch (IOException e) {
423 LOGGER.error("Error in callUpdateMirrorMaker:", e);
426 // Send error response if user does not provide Authorization
428 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
429 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_ADMIN_PERMISSION, null,
430 Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
431 ctx.getRequest().getRemoteHost());
432 LOGGER.info(errRes.toString());
433 throw new CambriaApiException(errRes);
438 @Produces("application/json")
440 public void callDeleteMirrorMaker(InputStream msg) throws JSONException, Exception {
441 DMaaPContext ctx = getDmaapContext();
443 if (checkMirrorMakerPermission(ctx,
444 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeradmin.aaf"))) {
449 Gson gson = new Gson();
450 MirrorMaker mirrormaker = new MirrorMaker();
453 input = IOUtils.toString(msg, "UTF-8");
455 if (input != null && input.length() > 0) {
456 input = removeExtraChar(input);
459 String randomStr = getRandomNum();
460 JSONObject jsonOb = null;
463 jsonOb = new JSONObject(input);
464 mirrormaker = gson.fromJson(input, MirrorMaker.class);
466 } catch (JSONException ex) {
468 errResJson.setErrorMessage(errorMessages.getIncorrectJson());
469 LOGGER.info(errResJson.toString());
470 throw new CambriaApiException(errResJson);
473 // Check if request has DeleteMirrorMaker and
474 // DeleteMirrorMaker has MirrorMaker object with name variable
475 // and check if the name contain only alpha numeric
477 if (jsonOb.has("deleteMirrorMaker") && jsonOb.getJSONObject("deleteMirrorMaker").length() == 1
478 && jsonOb.getJSONObject("deleteMirrorMaker").has("name")
479 && !StringUtils.isBlank(jsonOb.getJSONObject("deleteMirrorMaker").getString("name"))
480 && isAlphaNumeric(jsonOb.getJSONObject("deleteMirrorMaker").getString("name"))) {
482 if (!jsonOb.has("messageID") || jsonOb.isNull("messageID")) {
483 jsonOb.put("messageID", randomStr);
486 InputStream inStream = null;
488 inStream = IOUtils.toInputStream(jsonOb.toString(), "UTF-8");
490 JSONObject deleteMM = jsonOb.getJSONObject("deleteMirrorMaker");
492 JSONObject existMirrorMaker = validateMMExists(ctx, deleteMM.getString("name"));
494 if ((boolean) existMirrorMaker.get("exists")) {
496 JSONObject finalJsonObj = callPubSub(jsonOb.getString("messageID"), ctx, inStream,
497 mirrormaker.name, false);
498 DMaaPResponseBuilder.respondOk(ctx, finalJsonObj);
501 errResJson.setErrorMessage("MirrorMaker " + deleteMM.getString("name") + " does not exist");
502 LOGGER.info(errResJson.toString());
503 throw new CambriaApiException(errResJson);
509 errResJson.setErrorMessage("This is not a DeleteMirrorMaker request. Please try again.");
510 LOGGER.info(errResJson.toString());
511 throw new CambriaApiException(errResJson);
515 } catch (IOException ioe) {
516 LOGGER.error("Error in callDeleteMirrorMaker:", ioe);
522 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
523 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_ADMIN_PERMISSION, null,
524 Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
525 ctx.getRequest().getRemoteHost());
526 LOGGER.info(errRes.toString());
527 throw new CambriaApiException(errRes);
531 public boolean isListMirrorMaker(String msg, String messageID) {
532 String topicmsg = msg;
533 topicmsg = removeExtraChar(topicmsg);
534 JSONObject jObj = new JSONObject();
535 JSONArray jArray = null;
536 boolean exist = false;
538 if (!StringUtils.isBlank(topicmsg) && topicmsg.length() > 2) {
539 jArray = new JSONArray(topicmsg);
541 for (int i = 0; i < jArray.length(); i++) {
542 jObj = jArray.getJSONObject(i);
544 if (jObj.has("messageID") && jObj.get("messageID").equals(messageID) && jObj.has("listMirrorMaker")) {
553 private void loadProperty() {
555 this.timeout = Integer.parseInt(
556 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.timeout").trim());
557 this.topic = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.topic").trim();
558 this.consumergroup = AJSCPropertiesMap
559 .getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.consumergroup").trim();
560 this.consumerid = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.consumerid")
564 private String removeExtraChar(String message) {
565 String str = message;
566 str = checkJsonFormate(str);
568 if (str != null && str.length() > 0) {
569 str = str.replace("\\", "");
570 str = str.replace("\"{", "{");
571 str = str.replace("}\"", "}");
576 private String getRandomNum() {
577 long random = Math.round(Math.random() * 89999) + 10000;
578 String strLong = Long.toString(random);
582 private boolean isAlphaNumeric(String name) {
583 String pattern = "^[a-zA-Z0-9]*$";
584 if (name.matches(pattern)) {
590 private String checkJsonFormate(String jsonStr) {
592 String json = jsonStr;
593 if (jsonStr != null && jsonStr.length() > 0 && jsonStr.startsWith("[") && !jsonStr.endsWith("]")) {
599 private boolean checkMirrorMakerPermission(DMaaPContext ctx, String permission) {
601 boolean hasPermission = false;
603 if (dmaapAAFauthenticator.aafAuthentication(ctx.getRequest(), permission)) {
604 hasPermission = true;
606 return hasPermission;
609 public JSONObject callPubSub(String randomstr, DMaaPContext ctx, InputStream inStream, String name, boolean listAll)
612 JSONObject jsonObj = new JSONObject();
613 JSONObject finalJsonObj = new JSONObject();
614 JSONArray jsonArray = null;
616 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
617 mirrorService.pushEvents(ctx, topic, inStream, null, null);
618 long startTime = System.currentTimeMillis();
620 while (!isListMirrorMaker(msgFrmSubscribe, randomstr)
621 && ((System.currentTimeMillis() - startTime) < timeout)) {
622 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
626 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
627 && isListMirrorMaker(msgFrmSubscribe, randomstr)) {
628 msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
630 jsonArray = new JSONArray(msgFrmSubscribe);
631 jsonObj = jsonArray.getJSONObject(0);
632 if (jsonObj.has("listMirrorMaker")) {
633 jsonArray = (JSONArray) jsonObj.get("listMirrorMaker");
634 if (true == listAll) {
637 for (int i = 0; i < jsonArray.length(); i++) {
638 jsonObj = jsonArray.getJSONObject(i);
639 if (null != name && !name.isEmpty()) {
640 if (jsonObj.getString("name").equals(name)) {
641 finalJsonObj.put("listMirrorMaker", jsonObj);
645 finalJsonObj.put("listMirrorMaker", jsonObj);
655 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
656 DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(),
657 "listMirrorMaker is not available, please make sure MirrorMakerAgent is running", null,
658 Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
659 ctx.getRequest().getRemoteHost());
660 LOGGER.info(errRes.toString());
661 throw new CambriaApiException(errRes);
665 } catch (Exception e) {
666 LOGGER.error("Error in callPubSub", e);
672 private void sendErrResponse(DMaaPContext ctx, String errMsg) {
673 JSONObject err = new JSONObject();
674 err.append(ERROR, errMsg);
677 DMaaPResponseBuilder.respondOk(ctx, err);
678 LOGGER.error(errMsg);
680 } catch (JSONException | IOException e) {
681 LOGGER.error("Error at sendErrResponse method:" + errMsg + "Exception name:" + e);
685 @SuppressWarnings("unchecked")
687 @Produces("application/json")
688 @Path("/listallwhitelist")
689 public void listWhiteList(InputStream msg) throws Exception {
691 DMaaPContext ctx = getDmaapContext();
692 if (checkMirrorMakerPermission(ctx,
693 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeruser.aaf"))) {
699 input = IOUtils.toString(msg, "UTF-8");
701 if (input != null && input.length() > 0) {
702 input = removeExtraChar(input);
705 // Check if it is correct Json object
706 JSONObject jsonOb = null;
709 jsonOb = new JSONObject(input);
711 } catch (JSONException ex) {
713 errResJson.setErrorMessage(errorMessages.getIncorrectJson());
714 LOGGER.info(errResJson.toString());
715 throw new CambriaApiException(errResJson);
719 // Check if the request has name and name contains only alpha
721 // and check if the request has namespace and namespace contains
722 // only alpha numeric
723 if (jsonOb.length() == 2 && jsonOb.has("name") && !StringUtils.isBlank(jsonOb.getString("name"))
724 && isAlphaNumeric(jsonOb.getString("name")) && jsonOb.has("namespace")
725 && !StringUtils.isBlank(jsonOb.getString("namespace"))) {
727 String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
728 "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString("namespace") + "|create";
730 // Check if the user have create permission for the
732 if (checkMirrorMakerPermission(ctx, permission)) {
734 JSONObject listAll = new JSONObject();
735 JSONObject emptyObject = new JSONObject();
737 // Create a listAllMirrorMaker Json object
739 listAll.put("listAllMirrorMaker", emptyObject);
741 } catch (JSONException e) {
743 errResJson.setErrorMessage(errorMessages.getIncorrectJson());
744 LOGGER.info(errResJson.toString());
745 throw new CambriaApiException(errResJson);
748 // set a random number as messageID
749 String randomStr = getRandomNum();
750 listAll.put("messageID", randomStr);
751 InputStream inStream = null;
753 // convert listAll Json object to InputStream object
754 inStream = IOUtils.toInputStream(listAll.toString(), "UTF-8");
756 JSONObject listMirrorMaker = new JSONObject();
757 listMirrorMaker = callPubSub(randomStr, ctx, inStream, null, true);
759 String whitelist = null;
760 JSONArray listMMArray = new JSONArray();
761 if (listMirrorMaker.has("listMirrorMaker")) {
762 listMMArray = (JSONArray) listMirrorMaker.get("listMirrorMaker");
763 for (int i = 0; i < listMMArray.length(); i++) {
765 JSONObject mm = new JSONObject();
766 mm = listMMArray.getJSONObject(i);
767 String name = mm.getString("name");
769 if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) {
770 whitelist = mm.getString("whitelist");
775 if (!StringUtils.isBlank(whitelist)) {
777 List<String> topicList = new ArrayList<String>();
778 List<String> finalTopicList = new ArrayList<String>();
779 topicList = Arrays.asList(whitelist.split(","));
781 for (String topic : topicList) {
782 if (topic != null && !topic.equals("null")
783 && getNamespace(topic).equals(jsonOb.getString("namespace"))) {
785 finalTopicList.add(topic);
789 String topicNames = "";
791 if (finalTopicList.size() > 0) {
792 topicNames = StringUtils.join(finalTopicList, ",");
795 JSONObject listAllWhiteList = new JSONObject();
796 listAllWhiteList.put("name", jsonOb.getString("name"));
797 listAllWhiteList.put("whitelist", topicNames);
799 DMaaPResponseBuilder.respondOk(ctx, listAllWhiteList);
804 errResJson.setErrorMessage(
805 "listWhiteList is not available, please make sure MirrorMakerAgent is running");
806 LOGGER.info(errResJson.toString());
807 throw new CambriaApiException(errResJson);
812 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
813 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_USER_CREATE_PERMISSION,
814 null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
815 ctx.getRequest().getRemoteHost());
816 LOGGER.info(errRes.toString());
817 throw new CambriaApiException(errRes);
822 errResJson.setErrorMessage("This is not a ListAllWhitelist request. Please try again.");
823 LOGGER.info(errResJson.toString());
824 throw new CambriaApiException(errResJson);
827 } catch (IOException e) {
828 LOGGER.error("Error in listWhiteList", e);
831 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
832 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_USER_PERMISSION, null,
833 Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
834 ctx.getRequest().getRemoteHost());
835 LOGGER.info(errRes.toString());
836 throw new CambriaApiException(errRes);
840 @SuppressWarnings("unchecked")
842 @Produces("application/json")
843 @Path("/createwhitelist")
844 public void createWhiteList(InputStream msg) throws Exception {
846 DMaaPContext ctx = getDmaapContext();
847 if (checkMirrorMakerPermission(ctx,
848 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeruser.aaf"))) {
854 input = IOUtils.toString(msg, "UTF-8");
856 if (input != null && input.length() > 0) {
857 input = removeExtraChar(input);
860 // Check if it is correct Json object
861 JSONObject jsonOb = null;
864 jsonOb = new JSONObject(input);
866 } catch (JSONException ex) {
867 errResJson.setErrorMessage(errorMessages.getIncorrectJson());
868 LOGGER.info(errResJson.toString());
869 throw new CambriaApiException(errResJson);
872 // Check if the request has name and name contains only alpha
874 // check if the request has namespace and
875 // check if the request has whitelistTopicName
876 // check if the topic name contains only alpha numeric
877 if (jsonOb.length() == 3 && jsonOb.has("name") && !StringUtils.isBlank(jsonOb.getString("name"))
878 && isAlphaNumeric(jsonOb.getString("name")) && jsonOb.has("namespace")
879 && !StringUtils.isBlank(jsonOb.getString("namespace")) && jsonOb.has("whitelistTopicName")
880 && !StringUtils.isBlank(jsonOb.getString("whitelistTopicName"))
881 && isAlphaNumeric(jsonOb.getString("whitelistTopicName").substring(
882 jsonOb.getString("whitelistTopicName").lastIndexOf(".") + 1, jsonOb
883 .getString("whitelistTopicName").length()))) {
885 String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
886 "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString("namespace") + "|create";
888 // Check if the user have create permission for the
890 if (checkMirrorMakerPermission(ctx, permission)) {
892 JSONObject listAll = new JSONObject();
893 JSONObject emptyObject = new JSONObject();
895 // Create a listAllMirrorMaker Json object
897 listAll.put("listAllMirrorMaker", emptyObject);
899 } catch (JSONException e) {
901 errResJson.setErrorMessage(errorMessages.getIncorrectJson());
902 LOGGER.info(errResJson.toString());
903 throw new CambriaApiException(errResJson);
906 // set a random number as messageID
907 String randomStr = getRandomNum();
908 listAll.put("messageID", randomStr);
909 InputStream inStream = null;
911 // convert listAll Json object to InputStream object
912 inStream = IOUtils.toInputStream(listAll.toString(), "UTF-8");
914 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
915 // call listAllMirrorMaker
916 mirrorService.pushEvents(ctx, topic, inStream, null, null);
918 // subscribe for listMirrorMaker
919 long startTime = System.currentTimeMillis();
921 while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
922 && (System.currentTimeMillis() - startTime) < timeout) {
923 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
926 JSONArray listMirrorMaker = null;
928 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
929 && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
931 listMirrorMaker = getListMirrorMaker(msgFrmSubscribe, randomStr);
932 String whitelist = null;
934 for (int i = 0; i < listMirrorMaker.length(); i++) {
935 JSONObject mm = new JSONObject();
936 mm = listMirrorMaker.getJSONObject(i);
937 String name = mm.getString("name");
939 if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) {
940 whitelist = mm.getString("whitelist");
945 List<String> topicList = new ArrayList<String>();
946 List<String> finalTopicList = new ArrayList<String>();
948 if (whitelist != null) {
949 topicList = Arrays.asList(whitelist.split(","));
952 for (String st : topicList) {
953 if (!StringUtils.isBlank(st)) {
954 finalTopicList.add(st);
958 String newTopic = jsonOb.getString("whitelistTopicName");
960 if (!topicList.contains(newTopic)
961 && getNamespace(newTopic).equals(jsonOb.getString("namespace"))) {
963 UpdateWhiteList updateWhiteList = new UpdateWhiteList();
964 MirrorMaker mirrorMaker = new MirrorMaker();
965 mirrorMaker.setName(jsonOb.getString("name"));
966 finalTopicList.add(newTopic);
967 String newWhitelist = "";
969 if (finalTopicList.size() > 0) {
970 newWhitelist = StringUtils.join(finalTopicList, ",");
973 mirrorMaker.setWhitelist(newWhitelist);
975 String newRandom = getRandomNum();
976 updateWhiteList.setMessageID(newRandom);
977 updateWhiteList.setUpdateWhiteList(mirrorMaker);
980 g.toJson(updateWhiteList);
981 InputStream inputStream = null;
982 inputStream = IOUtils.toInputStream(g.toJson(updateWhiteList), "UTF-8");
983 // callPubSub(newRandom, ctx, inputStream);
984 callPubSubForWhitelist(newRandom, ctx, inputStream, jsonOb);
986 } else if (topicList.contains(newTopic)) {
987 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR,
988 DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), "The topic already exist.",
989 null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
990 ctx.getRequest().getRemoteHost());
991 LOGGER.info(errRes.toString());
992 throw new CambriaApiException(errRes);
994 } else if (!getNamespace(newTopic).equals(jsonOb.getString("namespace"))) {
995 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR,
996 DMaaPResponseCode.INCORRECT_JSON.getResponseCode(),
997 "The namespace of the topic does not match with the namespace you provided.",
998 null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
999 ctx.getRequest().getRemoteHost());
1000 LOGGER.info(errRes.toString());
1001 throw new CambriaApiException(errRes);
1005 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
1006 DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
1007 "listWhiteList is not available, please make sure MirrorMakerAgent is running",
1008 null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
1009 ctx.getRequest().getRemoteHost());
1010 LOGGER.info(errRes.toString());
1011 throw new CambriaApiException(errRes);
1015 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
1016 DMaaPResponseCode.UNABLE_TO_AUTHORIZE.getResponseCode(), NO_USER_CREATE_PERMISSION,
1017 null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
1018 ctx.getRequest().getRemoteHost());
1019 LOGGER.info(errRes.toString());
1020 throw new CambriaApiException(errRes);
1025 errResJson.setErrorMessage("This is not a createWhitelist request. Please try again.");
1026 LOGGER.info(errResJson.toString());
1027 throw new CambriaApiException(errResJson);
1030 } catch (IOException | CambriaApiException | ConfigDbException | AccessDeniedException
1031 | TopicExistsException | missingReqdSetting | UnavailableException e) {
1036 // Send error response if user does not provide Authorization
1038 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
1039 DMaaPResponseCode.UNABLE_TO_AUTHORIZE.getResponseCode(), NO_USER_PERMISSION, null,
1040 Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
1041 ctx.getRequest().getRemoteHost());
1042 LOGGER.info(errRes.toString());
1043 throw new CambriaApiException(errRes);
1048 @SuppressWarnings("unchecked")
1050 @Produces("application/json")
1051 @Path("/deletewhitelist")
1052 public void deleteWhiteList(InputStream msg) throws Exception {
1054 DMaaPContext ctx = getDmaapContext();
1055 if (checkMirrorMakerPermission(ctx,
1056 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeruser.aaf"))) {
1059 String input = null;
1062 input = IOUtils.toString(msg, "UTF-8");
1064 if (input != null && input.length() > 0) {
1065 input = removeExtraChar(input);
1068 // Check if it is correct Json object
1069 JSONObject jsonOb = null;
1072 jsonOb = new JSONObject(input);
1074 } catch (JSONException ex) {
1076 errResJson.setErrorMessage(errorMessages.getIncorrectJson());
1077 LOGGER.info(errResJson.toString());
1078 throw new CambriaApiException(errResJson);
1082 // Check if the request has name and name contains only alpha
1084 // check if the request has namespace and
1085 // check if the request has whitelistTopicName
1086 if (jsonOb.length() == 3 && jsonOb.has("name") && isAlphaNumeric(jsonOb.getString("name"))
1087 && jsonOb.has("namespace") && jsonOb.has("whitelistTopicName")
1088 && isAlphaNumeric(jsonOb.getString("whitelistTopicName").substring(
1089 jsonOb.getString("whitelistTopicName").lastIndexOf(".") + 1,
1090 jsonOb.getString("whitelistTopicName").length()))) {
1092 String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
1093 "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString("namespace") + "|create";
1095 // Check if the user have create permission for the
1097 if (checkMirrorMakerPermission(ctx, permission)) {
1099 JSONObject listAll = new JSONObject();
1100 JSONObject emptyObject = new JSONObject();
1102 // Create a listAllMirrorMaker Json object
1104 listAll.put("listAllMirrorMaker", emptyObject);
1106 } catch (JSONException e) {
1108 errResJson.setErrorMessage(errorMessages.getIncorrectJson());
1109 LOGGER.info(errResJson.toString());
1110 throw new CambriaApiException(errResJson);
1113 // set a random number as messageID
1114 String randomStr = getRandomNum();
1115 listAll.put("messageID", randomStr);
1116 InputStream inStream = null;
1118 // convert listAll Json object to InputStream object
1119 inStream = IOUtils.toInputStream(listAll.toString(), "UTF-8");
1121 // call listAllMirrorMaker
1122 mirrorService.pushEvents(ctx, topic, inStream, null, null);
1124 // subscribe for listMirrorMaker
1125 long startTime = System.currentTimeMillis();
1126 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1128 while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
1129 && (System.currentTimeMillis() - startTime) < timeout) {
1130 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1133 JSONObject jsonObj = new JSONObject();
1134 JSONArray jsonArray = null;
1135 JSONArray listMirrorMaker = null;
1137 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
1138 && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
1139 msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
1140 jsonArray = new JSONArray(msgFrmSubscribe);
1142 for (int i = 0; i < jsonArray.length(); i++) {
1143 jsonObj = jsonArray.getJSONObject(i);
1145 if (jsonObj.has("messageID") && jsonObj.get("messageID").equals(randomStr)
1146 && jsonObj.has("listMirrorMaker")) {
1147 listMirrorMaker = jsonObj.getJSONArray("listMirrorMaker");
1151 String whitelist = null;
1152 for (int i = 0; i < listMirrorMaker.length(); i++) {
1154 JSONObject mm = new JSONObject();
1155 mm = listMirrorMaker.getJSONObject(i);
1156 String name = mm.getString("name");
1158 if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) {
1159 whitelist = mm.getString("whitelist");
1164 List<String> topicList = new ArrayList<String>();
1166 if (whitelist != null) {
1167 topicList = Arrays.asList(whitelist.split(","));
1169 boolean removeTopic = false;
1170 String topicToRemove = jsonOb.getString("whitelistTopicName");
1172 if (topicList.contains(topicToRemove)) {
1175 errResJson.setErrorMessage(errorMessages.getTopicNotExist());
1176 LOGGER.info(errResJson.toString());
1177 throw new CambriaApiException(errResJson);
1181 UpdateWhiteList updateWhiteList = new UpdateWhiteList();
1182 MirrorMaker mirrorMaker = new MirrorMaker();
1184 mirrorMaker.setName(jsonOb.getString("name"));
1186 if (StringUtils.isNotBlank((removeTopic(whitelist, topicToRemove)))) {
1187 mirrorMaker.setWhitelist(removeTopic(whitelist, topicToRemove));
1190 String newRandom = getRandomNum();
1192 updateWhiteList.setMessageID(newRandom);
1193 updateWhiteList.setUpdateWhiteList(mirrorMaker);
1195 Gson g = new Gson();
1196 g.toJson(updateWhiteList);
1198 InputStream inputStream = null;
1199 inputStream = IOUtils.toInputStream(g.toJson(updateWhiteList), "UTF-8");
1200 callPubSubForWhitelist(newRandom, ctx, inputStream, jsonOb);
1201 // mmAgentUtil.getNamespace(topicToRemove));
1206 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
1207 DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
1208 "listWhiteList is not available, please make sure MirrorMakerAgent is running",
1209 null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
1210 ctx.getRequest().getRemoteHost());
1211 LOGGER.info(errRes.toString());
1212 throw new CambriaApiException(errRes);
1216 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
1217 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_USER_CREATE_PERMISSION,
1218 null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
1219 ctx.getRequest().getRemoteHost());
1220 LOGGER.info(errRes.toString());
1221 throw new CambriaApiException(errRes);
1226 errResJson.setErrorMessage("This is not a DeleteAllWhitelist request. Please try again.");
1227 LOGGER.info(errResJson.toString());
1228 throw new CambriaApiException(errResJson);
1232 } catch (IOException | CambriaApiException | ConfigDbException | AccessDeniedException
1233 | TopicExistsException | missingReqdSetting | UnavailableException e) {
1238 // Send error response if user does not provide Authorization
1240 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
1241 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_USER_PERMISSION, null,
1242 Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
1243 ctx.getRequest().getRemoteHost());
1244 LOGGER.info(errRes.toString());
1245 throw new CambriaApiException(errRes);
1250 private String getNamespace(String topic) {
1251 return topic.substring(0, topic.lastIndexOf("."));
1254 private String removeTopic(String whitelist, String topicToRemove) {
1255 List<String> topicList = new ArrayList<>();
1256 List<String> newTopicList = new ArrayList<>();
1258 if (whitelist.contains(",")) {
1259 topicList = Arrays.asList(whitelist.split(","));
1263 if (topicList.contains(topicToRemove)) {
1264 for (String topic : topicList) {
1265 if (!topic.equals(topicToRemove)) {
1266 newTopicList.add(topic);
1271 String newWhitelist = StringUtils.join(newTopicList, ",");
1273 return newWhitelist;
1276 public void callPubSubForWhitelist(String randomStr, DMaaPContext ctx, InputStream inStream, JSONObject jsonOb) {
1280 String namespace = jsonOb.getString("namespace");
1281 String mmName = jsonOb.getString("name");
1283 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1284 mirrorService.pushEvents(ctx, topic, inStream, null, null);
1285 long startTime = System.currentTimeMillis();
1287 while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
1288 && (System.currentTimeMillis() - startTime) < timeout) {
1289 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1292 JSONObject jsonObj = new JSONObject();
1293 JSONArray jsonArray = null;
1294 JSONArray jsonArrayNamespace = null;
1296 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
1297 && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
1298 msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
1299 jsonArray = new JSONArray(msgFrmSubscribe);
1301 for (int i = 0; i < jsonArray.length(); i++) {
1302 jsonObj = jsonArray.getJSONObject(i);
1304 if (jsonObj.has("messageID") && jsonObj.get("messageID").equals(randomStr)
1305 && jsonObj.has("listMirrorMaker")) {
1306 jsonArrayNamespace = jsonObj.getJSONArray("listMirrorMaker");
1310 JSONObject finalJasonObj = new JSONObject();
1311 JSONArray finalJsonArray = new JSONArray();
1313 if (jsonArrayNamespace != null) {
1314 for (int i = 0; i < jsonArrayNamespace.length(); i++) {
1316 JSONObject mmObj = new JSONObject();
1317 mmObj = jsonArrayNamespace.getJSONObject(i);
1318 if (mmObj.has("name") && mmName.equals(mmObj.getString("name"))) {
1320 finalJsonArray.put(mmObj);
1325 finalJasonObj.put("listMirrorMaker", finalJsonArray);
1327 DMaaPResponseBuilder.respondOk(ctx, finalJasonObj);
1331 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
1332 DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(),
1333 "listMirrorMaker is not available, please make sure MirrorMakerAgent is running", null,
1334 Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
1335 ctx.getRequest().getRemoteHost());
1336 LOGGER.info(errRes.toString());
1337 throw new CambriaApiException(errRes);
1340 } catch (Exception e) {
1341 LOGGER.error("Error in callPubSubForWhitelist:", e);
1346 public JSONArray getListMirrorMaker(String msgFrmSubscribe, String randomStr) {
1347 JSONObject jsonObj = new JSONObject();
1348 JSONArray jsonArray = new JSONArray();
1349 JSONArray listMirrorMaker = new JSONArray();
1351 msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
1352 jsonArray = new JSONArray(msgFrmSubscribe);
1353 jsonObj = jsonArray.getJSONObject(0);
1355 for (int i = 0; i < jsonArray.length(); i++) {
1356 jsonObj = jsonArray.getJSONObject(i);
1358 if (jsonObj.has("messageID") && jsonObj.get("messageID").equals(randomStr) && jsonObj.has("listMirrorMaker")) {
1359 listMirrorMaker = jsonObj.getJSONArray("listMirrorMaker");
1363 return listMirrorMaker;
1367 public JSONObject validateMMExists(DMaaPContext ctx, String name) throws Exception {
1368 // Create a listAllMirrorMaker Json object
1369 JSONObject listAll = new JSONObject();
1371 listAll.put("listAllMirrorMaker", new JSONObject());
1373 } catch (JSONException e) {
1374 LOGGER.error("Error while creating a listAllMirrorMaker Json object:", e);
1377 // set a random number as messageID
1378 String randomStr = getRandomNum();
1379 listAll.put("messageID", randomStr);
1380 InputStream inStream = null;
1382 // convert listAll Json object to InputStream object
1383 inStream = IOUtils.toInputStream(listAll.toString(), "UTF-8");
1385 JSONObject listMirrorMaker = new JSONObject();
1386 listMirrorMaker = callPubSub(randomStr, ctx, inStream, name, false);
1387 if (null != listMirrorMaker && listMirrorMaker.length() > 0) {
1388 listMirrorMaker.put("exists", true);
1389 return listMirrorMaker;
1393 if(null != listMirrorMaker) {
1394 listMirrorMaker.put("exists", false);
1397 return listMirrorMaker;