1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.nsa.dmaap.service;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.util.ArrayList;
27 import java.util.Date;
28 import java.util.List;
30 import javax.servlet.http.HttpServletRequest;
31 import javax.servlet.http.HttpServletResponse;
32 import javax.ws.rs.POST;
33 import javax.ws.rs.Path;
34 import javax.ws.rs.Produces;
35 import javax.ws.rs.core.Context;
36 import org.json.JSONObject;
37 import org.apache.commons.io.IOUtils;
38 import org.apache.commons.lang.StringUtils;
39 import com.att.eelf.configuration.EELFLogger;
40 import com.att.eelf.configuration.EELFManager;
41 import org.springframework.beans.factory.annotation.Autowired;
42 import org.springframework.beans.factory.annotation.Qualifier;
43 import org.springframework.stereotype.Component;
45 import com.att.nsa.cambria.utils.ConfigurationReader;
46 import com.att.nsa.cambria.utils.DMaaPResponseBuilder;
47 import com.att.nsa.cambria.utils.Utils;
48 import com.att.nsa.configs.ConfigDbException;
49 import com.att.nsa.dmaap.mmagent.*;
50 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
51 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
52 import com.google.gson.Gson;
53 import com.google.gson.JsonSyntaxException;
55 import edu.emory.mathcs.backport.java.util.Arrays;
57 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
58 import com.att.nsa.cambria.CambriaApiException;
59 import com.att.nsa.cambria.backends.ConsumerFactory.UnavailableException;
61 import org.json.JSONArray;
62 import org.json.JSONException;
63 import com.att.nsa.cambria.beans.DMaaPContext;
64 import com.att.nsa.cambria.constants.CambriaConstants;
65 import com.att.nsa.cambria.exception.DMaaPErrorMessages;
66 import com.att.nsa.cambria.metabroker.Broker.TopicExistsException;
67 import com.att.nsa.cambria.security.DMaaPAAFAuthenticator;
68 import com.att.nsa.cambria.security.DMaaPAAFAuthenticatorImpl;
69 import com.att.nsa.cambria.security.DMaaPAuthenticatorImpl;
70 import com.att.nsa.cambria.service.MMService;
73 * Rest Service class for Mirror Maker proxy Rest Services
75 * @author <a href="mailto:"></a>
81 public class MMRestService {
83 private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(MMRestService.class);
84 private static final String NO_ADMIN_PERMISSION = "No Mirror Maker Admin permission.";
85 private static final String NO_USER_PERMISSION = "No Mirror Maker User permission.";
86 private static final String NO_USER_CREATE_PERMISSION = "No Mirror Maker User Create permission.";
87 private static final String NAME_DOES_NOT_MEET_REQUIREMENT = "Mirror Maker name can only contain alpha numeric";
88 private static final String INVALID_IPPORT = "This is not a valid IP:Port";
89 private static final String MIRROR_MAKERADMIN = "msgRtr.mirrormakeradmin.aaf";
90 private static final String MIRROR_MAKERUSER = "msgRtr.mirrormakeruser.aaf";
91 private static final String UTF_8 = "UTF-8";
92 private static final String MESSAGE = "message";
93 private static final String LISTMIRRORMAKER = "listMirrorMaker";
94 private static final String ERROR = "error";
95 private static final String NAMESPACE = "namespace";
99 private String consumergroup;
100 private String consumerid;
103 @Qualifier("configurationReader")
104 private ConfigurationReader configReader;
107 private HttpServletRequest request;
110 private HttpServletResponse response;
113 private MMService mirrorService;
116 private DMaaPErrorMessages errorMessages;
118 private DMaaPAAFAuthenticator dmaapAAFauthenticator = new DMaaPAAFAuthenticatorImpl();
121 * This method is used for taking Configuration Object,HttpServletRequest
122 * Object,HttpServletRequest HttpServletResponse Object,HttpServletSession
125 * @return DMaaPContext object from where user can get Configuration
126 * Object,HttpServlet Object
129 private DMaaPContext getDmaapContext() {
130 DMaaPContext dmaapContext = new DMaaPContext();
131 dmaapContext.setRequest(request);
132 dmaapContext.setResponse(response);
133 dmaapContext.setConfigReader(configReader);
134 dmaapContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
140 @Produces("application/json")
142 public void callCreateMirrorMaker(InputStream msg) {
144 DMaaPContext ctx = getDmaapContext();
145 if (checkMirrorMakerPermission(ctx,
146 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERADMIN))) {
150 String randomStr = getRandomNum();
152 InputStream inStream = null;
153 Gson gson = new Gson();
154 CreateMirrorMaker createMirrorMaker = new CreateMirrorMaker();
157 input = IOUtils.toString(msg, UTF_8);
159 if (input != null && input.length() > 0) {
160 input = removeExtraChar(input);
163 // Check if the request has CreateMirrorMaker
165 createMirrorMaker = gson.fromJson(input, CreateMirrorMaker.class);
167 } catch (JsonSyntaxException ex) {
169 sendErrResponse(ctx, errorMessages.getIncorrectJson());
170 LOGGER.error("JsonSyntaxException: ", ex);
172 String name = createMirrorMaker.getCreateMirrorMaker()==null? "":createMirrorMaker.getCreateMirrorMaker().getName();
173 // send error message if it is not a CreateMirrorMaker request.
174 if (createMirrorMaker.getCreateMirrorMaker() == null) {
175 sendErrResponse(ctx, "This is not a CreateMirrorMaker request. Please try again.");
178 // MirrorMaker whitelist and status should not be passed
179 else if (createMirrorMaker.getCreateMirrorMaker().getWhitelist() != null
180 || createMirrorMaker.getCreateMirrorMaker().getStatus() != null) {
181 sendErrResponse(ctx, "This is not a CreateMirrorMaker request. Please try again.");
184 // if empty, blank name is entered
185 else if (StringUtils.isBlank(name)) {
186 sendErrResponse(ctx, "Name can not be empty or blank.");
189 // Check if the name contains only Alpha Numeric
190 else if (!isAlphaNumeric(name)) {
191 sendErrResponse(ctx, NAME_DOES_NOT_MEET_REQUIREMENT);
195 // Validate the IP and Port
196 else if (!StringUtils.isBlank(createMirrorMaker.getCreateMirrorMaker().getConsumer())
197 && !StringUtils.isBlank(createMirrorMaker.getCreateMirrorMaker().getProducer())
198 && !validateIPPort(createMirrorMaker.getCreateMirrorMaker().getConsumer())
199 || !validateIPPort(createMirrorMaker.getCreateMirrorMaker().getProducer())) {
200 sendErrResponse(ctx, INVALID_IPPORT);
203 // Set a random number as messageID, convert Json Object to
204 // InputStream and finally call publisher and subscriber
205 else if (isAlphaNumeric(name) && validateIPPort(createMirrorMaker.getCreateMirrorMaker().getConsumer())
206 && validateIPPort(createMirrorMaker.getCreateMirrorMaker().getProducer())) {
208 createMirrorMaker.setMessageID(randomStr);
209 inStream = IOUtils.toInputStream(gson.toJson(createMirrorMaker), UTF_8);
210 callPubSub(randomStr, ctx, inStream);
213 } catch (IOException e) {
215 LOGGER.error("IOException: ", e);
218 // Send error response if user does not provide Authorization
220 sendErrResponse(ctx, NO_ADMIN_PERMISSION);
225 @Produces("application/json")
227 public void callListAllMirrorMaker(InputStream msg) throws CambriaApiException {
228 DMaaPContext ctx = getDmaapContext();
230 if (checkMirrorMakerPermission(ctx,
231 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERADMIN))) {
238 input = IOUtils.toString(msg, UTF_8);
240 if (input != null && input.length() > 0) {
241 input = removeExtraChar(input);
244 String randomStr = getRandomNum();
245 JSONObject jsonOb = null;
248 jsonOb = new JSONObject(input);
250 } catch (JSONException ex) {
252 sendErrResponse(ctx, errorMessages.getIncorrectJson());
253 LOGGER.error("JSONException: ", ex);
256 // Check if request has listAllMirrorMaker and
257 // listAllMirrorMaker is empty
258 if ((jsonOb != null) && (jsonOb.has("listAllMirrorMaker")
259 && jsonOb.getJSONObject("listAllMirrorMaker").length() == 0)) {
260 jsonOb.put("messageID", randomStr);
261 InputStream inStream = null;
264 inStream = IOUtils.toInputStream(jsonOb.toString(), UTF_8);
266 } catch (IOException ioe) {
267 LOGGER.error("IOException: ", ioe);
270 callPubSub(randomStr, ctx, inStream);
274 sendErrResponse(ctx, "This is not a ListAllMirrorMaker request. Please try again.");
277 } catch (IOException ioe) {
279 LOGGER.error("IOException: ", ioe);
284 sendErrResponse(getDmaapContext(), NO_ADMIN_PERMISSION);
289 @Produces("application/json")
291 public void callUpdateMirrorMaker(InputStream msg) throws CambriaApiException {
293 DMaaPContext ctx = getDmaapContext();
294 if (checkMirrorMakerPermission(ctx,
295 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERADMIN))) {
299 String randomStr = getRandomNum();
301 InputStream inStream = null;
302 Gson gson = new Gson();
303 UpdateMirrorMaker updateMirrorMaker = new UpdateMirrorMaker();
306 input = IOUtils.toString(msg, UTF_8);
308 if (input != null && input.length() > 0) {
309 input = removeExtraChar(input);
312 // Check if the request has UpdateMirrorMaker
314 updateMirrorMaker = gson.fromJson(input, UpdateMirrorMaker.class);
316 } catch (JsonSyntaxException ex) {
318 sendErrResponse(ctx, errorMessages.getIncorrectJson());
319 LOGGER.error("JsonSyntaxException: ", ex);
322 String name = updateMirrorMaker.getUpdateMirrorMaker()==null? "":updateMirrorMaker.getUpdateMirrorMaker().getName();
323 // send error message if it is not a UpdateMirrorMaker request.
324 if (updateMirrorMaker.getUpdateMirrorMaker() == null) {
325 sendErrResponse(ctx, "This is not a UpdateMirrorMaker request. Please try again.");
328 // MirrorMaker whitelist and status should not be passed
329 else if (updateMirrorMaker.getUpdateMirrorMaker().getWhitelist() != null
330 || updateMirrorMaker.getUpdateMirrorMaker().getStatus() != null) {
331 sendErrResponse(ctx, "This is not a UpdateMirrorMaker request. Please try again.");
334 // if empty, blank name is entered
335 else if (StringUtils.isBlank(name)) {
336 sendErrResponse(ctx, "Name can not be empty or blank.");
339 // Check if the name contains only Alpha Numeric
340 else if (!isAlphaNumeric(name)) {
341 sendErrResponse(ctx, NAME_DOES_NOT_MEET_REQUIREMENT);
345 // Validate the IP and Port
346 else if (!StringUtils.isBlank(updateMirrorMaker.getUpdateMirrorMaker().getConsumer())
347 && !StringUtils.isBlank(updateMirrorMaker.getUpdateMirrorMaker().getProducer())
348 && !validateIPPort(updateMirrorMaker.getUpdateMirrorMaker().getConsumer())
349 || !validateIPPort(updateMirrorMaker.getUpdateMirrorMaker().getProducer())) {
350 sendErrResponse(ctx, INVALID_IPPORT);
353 // Set a random number as messageID, convert Json Object to
354 // InputStream and finally call publisher and subscriber
355 else if (isAlphaNumeric(name) && validateIPPort(updateMirrorMaker.getUpdateMirrorMaker().getConsumer())
356 && validateIPPort(updateMirrorMaker.getUpdateMirrorMaker().getProducer())) {
358 updateMirrorMaker.setMessageID(randomStr);
359 inStream = IOUtils.toInputStream(gson.toJson(updateMirrorMaker), UTF_8);
360 callPubSub(randomStr, ctx, inStream);
363 } catch (IOException e) {
365 LOGGER.error("IOException: ", e);
368 // Send error response if user does not provide Authorization
370 sendErrResponse(ctx, NO_ADMIN_PERMISSION);
375 @Produces("application/json")
377 public void callDeleteMirrorMaker(InputStream msg) throws CambriaApiException {
378 DMaaPContext ctx = getDmaapContext();
380 if (checkMirrorMakerPermission(ctx,
381 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERADMIN))) {
388 input = IOUtils.toString(msg, UTF_8);
390 if (input != null && input.length() > 0) {
391 input = removeExtraChar(input);
394 String randomStr = getRandomNum();
395 JSONObject jsonOb = null;
398 jsonOb = new JSONObject(input);
400 } catch (JSONException ex) {
402 sendErrResponse(ctx, errorMessages.getIncorrectJson());
403 LOGGER.error("JSONException: ", ex);
406 // Check if request has DeleteMirrorMaker and
407 // DeleteMirrorMaker has MirrorMaker object with name variable
408 // and check if the name contain only alpha numeric
410 && (jsonOb.has("deleteMirrorMaker") && jsonOb.getJSONObject("deleteMirrorMaker").length() == 1
411 && jsonOb.getJSONObject("deleteMirrorMaker").has("name")
412 && !StringUtils.isBlank(jsonOb.getJSONObject("deleteMirrorMaker").getString("name"))
413 && isAlphaNumeric(jsonOb.getJSONObject("deleteMirrorMaker").getString("name")))) {
415 jsonOb.put("messageID", randomStr);
416 InputStream inStream = null;
419 inStream = IOUtils.toInputStream(jsonOb.toString(), UTF_8);
421 } catch (IOException ioe) {
422 LOGGER.error("IOException: ", ioe);
425 callPubSub(randomStr, ctx, inStream);
429 sendErrResponse(ctx, "This is not a DeleteMirrorMaker request. Please try again.");
432 } catch (IOException ioe) {
433 LOGGER.error("IOException: ", ioe);
438 sendErrResponse(getDmaapContext(), NO_ADMIN_PERMISSION);
442 private boolean isListMirrorMaker(String msg, String messageID) {
443 String topicmsg = msg;
444 topicmsg = removeExtraChar(topicmsg);
448 boolean exist = false;
450 if (!StringUtils.isBlank(topicmsg) && topicmsg.length() > 2) {
451 jArray = new JSONArray(topicmsg);
453 for (int i = 0; i < jArray.length(); i++) {
454 jObj = jArray.getJSONObject(i);
456 JSONObject obj = new JSONObject();
457 if (jObj.has(MESSAGE)) {
458 obj = jObj.getJSONObject(MESSAGE);
460 if (obj.has("messageID") && obj.get("messageID").equals(messageID) && obj.has(LISTMIRRORMAKER)) {
469 private void loadProperty() {
471 this.timeout = Integer.parseInt(
472 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.timeout").trim());
473 this.topic = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.topic").trim();
474 this.consumergroup = AJSCPropertiesMap
475 .getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.consumergroup").trim();
476 this.consumerid = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.consumerid")
480 private String removeExtraChar(String message) {
481 String str = message;
482 str = checkJsonFormate(str);
484 if (str != null && str.length() > 0) {
485 str = str.replace("\\", "");
486 str = str.replace("\"{", "{");
487 str = str.replace("}\"", "}");
492 private String getRandomNum() {
493 long random = Math.round(Math.random() * 89999) + 10000;
494 String strLong = Long.toString(random);
498 private boolean isAlphaNumeric(String name) {
499 String pattern = "^[a-zA-Z0-9]*$";
500 if (name.matches(pattern)) {
506 // This method validate IPv4
507 private boolean validateIPPort(String ipPort) {
508 String pattern = "^([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\."
509 + "([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.([01]?\\d\\d?|2[0-4]\\d|25[0-5]):"
510 + "([1-9][0-9]{0,3}|[1-5][0-9]{4}|6[0-4][0-9]{3}|65[0-4][0-9]{2}|655[0-2][0-9]|6553[0-5])$";
511 if (ipPort.matches(pattern)) {
517 private String checkJsonFormate(String jsonStr) {
519 String json = jsonStr;
520 if (jsonStr != null && jsonStr.length() > 0 && jsonStr.startsWith("[") && !jsonStr.endsWith("]")) {
526 private boolean checkMirrorMakerPermission(DMaaPContext ctx, String permission) {
528 boolean hasPermission = false;
530 if (dmaapAAFauthenticator.aafAuthentication(ctx.getRequest(), permission)) {
531 hasPermission = true;
533 return hasPermission;
536 private void callPubSub(String randomstr, DMaaPContext ctx, InputStream inStream) {
538 mirrorService.pushEvents(ctx, topic, inStream, null, null);
539 long startTime = System.currentTimeMillis();
540 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
542 while (!isListMirrorMaker(msgFrmSubscribe, randomstr)
543 && (System.currentTimeMillis() - startTime) < timeout) {
544 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
548 JSONObject finalJsonObj = new JSONObject();
551 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
552 && isListMirrorMaker(msgFrmSubscribe, randomstr)) {
553 msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
554 jsonArray = new JSONArray(msgFrmSubscribe);
556 for (int i = 0; i < jsonArray.length(); i++) {
557 jsonObj = jsonArray.getJSONObject(i);
559 JSONObject obj = new JSONObject();
560 if (jsonObj.has(MESSAGE)) {
561 obj = jsonObj.getJSONObject(MESSAGE);
563 if (obj.has("messageID") && obj.get("messageID").equals(randomstr) && obj.has(LISTMIRRORMAKER)) {
564 finalJsonObj.put(LISTMIRRORMAKER, obj.get(LISTMIRRORMAKER));
569 DMaaPResponseBuilder.respondOk(ctx, finalJsonObj);
573 JSONObject err = new JSONObject();
574 err.append(ERROR, "listMirrorMaker is not available, please make sure MirrorMakerAgent is running");
575 DMaaPResponseBuilder.respondOk(ctx, err);
578 } catch (Exception e) {
579 LOGGER.error("Exception: ", e);
583 private void sendErrResponse(DMaaPContext ctx, String errMsg) {
584 JSONObject err = new JSONObject();
585 err.append(ERROR, errMsg);
588 DMaaPResponseBuilder.respondOk(ctx, err);
589 LOGGER.error(errMsg);
591 } catch (JSONException | IOException e) {
592 LOGGER.error("Error at sendErrResponse method:" + errMsg + "Exception name:" + e);
596 @SuppressWarnings("unchecked")
598 @Produces("application/json")
599 @Path("/listallwhitelist")
600 public void listWhiteList(InputStream msg) {
602 DMaaPContext ctx = getDmaapContext();
603 if (checkMirrorMakerPermission(ctx,
604 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERUSER))) {
610 input = IOUtils.toString(msg, UTF_8);
612 if (input != null && input.length() > 0) {
613 input = removeExtraChar(input);
616 // Check if it is correct Json object
617 JSONObject jsonOb = null;
620 jsonOb = new JSONObject(input);
622 } catch (JSONException ex) {
624 sendErrResponse(ctx, errorMessages.getIncorrectJson());
625 LOGGER.error("JSONException: ", ex);
628 // Check if the request has name and name contains only alpha
630 // and check if the request has namespace and namespace contains
631 // only alpha numeric
632 if (jsonOb != null && jsonOb.length() == 2 && jsonOb.has("name")
633 && !StringUtils.isBlank(jsonOb.getString("name")) && isAlphaNumeric(jsonOb.getString("name"))
634 && jsonOb.has(NAMESPACE) && !StringUtils.isBlank(jsonOb.getString(NAMESPACE))) {
636 String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
637 "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString(NAMESPACE) + "|create";
639 // Check if the user have create permission for the
641 if (checkMirrorMakerPermission(ctx, permission)) {
643 JSONObject listAll = new JSONObject();
644 JSONObject emptyObject = new JSONObject();
646 // Create a listAllMirrorMaker Json object
648 listAll.put("listAllMirrorMaker", emptyObject);
650 } catch (JSONException e) {
652 LOGGER.error("JSONException: ", e);
655 // set a random number as messageID
656 String randomStr = getRandomNum();
657 listAll.put("messageID", randomStr);
658 InputStream inStream = null;
660 // convert listAll Json object to InputStream object
662 inStream = IOUtils.toInputStream(listAll.toString(), UTF_8);
664 } catch (IOException ioe) {
665 LOGGER.error("IOException: ", ioe);
667 // call listAllMirrorMaker
668 mirrorService.pushEvents(ctx, topic, inStream, null, null);
670 // subscribe for listMirrorMaker
671 long startTime = System.currentTimeMillis();
672 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
674 while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
675 && (System.currentTimeMillis() - startTime) < timeout) {
676 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
679 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
680 && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
682 JSONArray listMirrorMaker;
683 listMirrorMaker = getListMirrorMaker(msgFrmSubscribe, randomStr);
685 String whitelist = null;
686 for (int i = 0; i < listMirrorMaker.length(); i++) {
689 mm = listMirrorMaker.getJSONObject(i);
690 String name = mm.getString("name");
692 if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) {
693 whitelist = mm.getString("whitelist");
698 if (!StringUtils.isBlank(whitelist)) {
700 List<String> topicList = new ArrayList<>();
701 List<String> finalTopicList = new ArrayList<>();
702 topicList = Arrays.asList(whitelist.split(","));
704 for (String topic : topicList) {
705 if (topic != null && !topic.equals("null")
706 && getNamespace(topic).equals(jsonOb.getString(NAMESPACE))) {
708 finalTopicList.add(topic);
712 String topicNames = "";
714 if (!finalTopicList.isEmpty()) {
715 topicNames = StringUtils.join(finalTopicList, ",");
718 JSONObject listAllWhiteList = new JSONObject();
719 listAllWhiteList.put("name", jsonOb.getString("name"));
720 listAllWhiteList.put("whitelist", topicNames);
722 DMaaPResponseBuilder.respondOk(ctx, listAllWhiteList);
727 JSONObject err = new JSONObject();
729 "listWhiteList is not available, please make sure MirrorMakerAgent is running");
730 DMaaPResponseBuilder.respondOk(ctx, err);
734 sendErrResponse(ctx, NO_USER_CREATE_PERMISSION);
739 sendErrResponse(ctx, "This is not a ListAllWhitelist request. Please try again.");
742 } catch (IOException | CambriaApiException | ConfigDbException | AccessDeniedException
743 | TopicExistsException | missingReqdSetting | UnavailableException e) {
745 LOGGER.error("IOException: ", e);
748 sendErrResponse(ctx, NO_USER_PERMISSION);
752 @SuppressWarnings("unchecked")
754 @Produces("application/json")
755 @Path("/createwhitelist")
756 public void createWhiteList(InputStream msg) {
758 DMaaPContext ctx = getDmaapContext();
759 if (checkMirrorMakerPermission(ctx,
760 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERUSER))) {
766 input = IOUtils.toString(msg, UTF_8);
768 if (input != null && input.length() > 0) {
769 input = removeExtraChar(input);
772 // Check if it is correct Json object
773 JSONObject jsonOb = null;
776 jsonOb = new JSONObject(input);
778 } catch (JSONException ex) {
780 sendErrResponse(ctx, errorMessages.getIncorrectJson());
781 LOGGER.error("JSONException: ", ex);
784 // Check if the request has name and name contains only alpha
786 // check if the request has namespace and
787 // check if the request has whitelistTopicName
788 // check if the topic name contains only alpha numeric
789 if (jsonOb != null && jsonOb.length() == 3 && jsonOb.has("name")
790 && !StringUtils.isBlank(jsonOb.getString("name")) && isAlphaNumeric(jsonOb.getString("name"))
791 && jsonOb.has(NAMESPACE) && !StringUtils.isBlank(jsonOb.getString(NAMESPACE))
792 && jsonOb.has("whitelistTopicName")
793 && !StringUtils.isBlank(jsonOb.getString("whitelistTopicName"))
794 && isAlphaNumeric(jsonOb.getString("whitelistTopicName").substring(
795 jsonOb.getString("whitelistTopicName").lastIndexOf(".") + 1,
796 jsonOb.getString("whitelistTopicName").length()))) {
798 String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
799 "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString(NAMESPACE) + "|create";
801 // Check if the user have create permission for the
803 if (checkMirrorMakerPermission(ctx, permission)) {
805 JSONObject listAll = new JSONObject();
806 JSONObject emptyObject = new JSONObject();
808 // Create a listAllMirrorMaker Json object
810 listAll.put("listAllMirrorMaker", emptyObject);
812 } catch (JSONException e) {
814 LOGGER.error("JSONException: ", e);
817 // set a random number as messageID
818 String randomStr = getRandomNum();
819 listAll.put("messageID", randomStr);
820 InputStream inStream = null;
822 // convert listAll Json object to InputStream object
824 inStream = IOUtils.toInputStream(listAll.toString(), UTF_8);
826 } catch (IOException ioe) {
827 LOGGER.error("IOException: ", ioe);
829 // call listAllMirrorMaker
830 mirrorService.pushEvents(ctx, topic, inStream, null, null);
832 // subscribe for listMirrorMaker
833 long startTime = System.currentTimeMillis();
834 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
836 while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
837 && (System.currentTimeMillis() - startTime) < timeout) {
838 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
841 JSONArray listMirrorMaker;
843 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
844 && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
846 listMirrorMaker = getListMirrorMaker(msgFrmSubscribe, randomStr);
847 String whitelist = null;
849 for (int i = 0; i < listMirrorMaker.length(); i++) {
851 mm = listMirrorMaker.getJSONObject(i);
852 String name = mm.getString("name");
854 if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) {
855 whitelist = mm.getString("whitelist");
860 List<String> topicList = new ArrayList<>();
861 List<String> finalTopicList = new ArrayList<>();
863 if (whitelist != null) {
864 topicList = Arrays.asList(whitelist.split(","));
867 for (String st : topicList) {
868 if (!StringUtils.isBlank(st)) {
869 finalTopicList.add(st);
873 String newTopic = jsonOb.getString("whitelistTopicName");
875 if (!topicList.contains(newTopic)
876 && getNamespace(newTopic).equals(jsonOb.getString(NAMESPACE))) {
878 UpdateWhiteList updateWhiteList = new UpdateWhiteList();
879 MirrorMaker mirrorMaker = new MirrorMaker();
880 mirrorMaker.setName(jsonOb.getString("name"));
881 finalTopicList.add(newTopic);
882 String newWhitelist = "";
884 if (!finalTopicList.isEmpty()) {
885 newWhitelist = StringUtils.join(finalTopicList, ",");
888 mirrorMaker.setWhitelist(newWhitelist);
890 String newRandom = getRandomNum();
891 updateWhiteList.setMessageID(newRandom);
892 updateWhiteList.setUpdateWhiteList(mirrorMaker);
895 g.toJson(updateWhiteList);
896 InputStream inputStream;
897 inputStream = IOUtils.toInputStream(g.toJson(updateWhiteList), UTF_8);
898 // callPubSub(newRandom, ctx, inputStream);
899 callPubSubForWhitelist(newRandom, ctx, inputStream, jsonOb.getString(NAMESPACE));
901 } else if (topicList.contains(newTopic)) {
902 sendErrResponse(ctx, "The topic already exist.");
904 } else if (!getNamespace(newTopic).equals(jsonOb.getString(NAMESPACE))) {
906 "The namespace of the topic does not match with the namespace you provided.");
910 JSONObject err = new JSONObject();
912 "listWhiteList is not available, please make sure MirrorMakerAgent is running");
913 DMaaPResponseBuilder.respondOk(ctx, err);
917 sendErrResponse(ctx, NO_USER_CREATE_PERMISSION);
922 sendErrResponse(ctx, "This is not a createWhitelist request. Please try again.");
925 } catch (IOException | CambriaApiException | ConfigDbException | AccessDeniedException
926 | TopicExistsException | missingReqdSetting | UnavailableException e) {
928 LOGGER.error("IOException: ", e);
931 // Send error response if user does not provide Authorization
933 sendErrResponse(ctx, NO_USER_PERMISSION);
937 @SuppressWarnings("unchecked")
939 @Produces("application/json")
940 @Path("/deletewhitelist")
941 public void deleteWhiteList(InputStream msg) {
943 DMaaPContext ctx = getDmaapContext();
944 if (checkMirrorMakerPermission(ctx,
945 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERUSER))) {
951 input = IOUtils.toString(msg, UTF_8);
953 if (input != null && input.length() > 0) {
954 input = removeExtraChar(input);
957 // Check if it is correct Json object
958 JSONObject jsonOb = null;
961 jsonOb = new JSONObject(input);
963 } catch (JSONException ex) {
965 sendErrResponse(ctx, errorMessages.getIncorrectJson());
966 LOGGER.error("JSONException: ", ex);
969 // Check if the request has name and name contains only alpha
971 // check if the request has namespace and
972 // check if the request has whitelistTopicName
973 if (jsonOb != null && jsonOb.length() == 3 && jsonOb.has("name")
974 && isAlphaNumeric(jsonOb.getString("name")) && jsonOb.has(NAMESPACE)
975 && jsonOb.has("whitelistTopicName")
976 && isAlphaNumeric(jsonOb.getString("whitelistTopicName").substring(
977 jsonOb.getString("whitelistTopicName").lastIndexOf(".") + 1,
978 jsonOb.getString("whitelistTopicName").length()))) {
980 String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
981 "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString(NAMESPACE) + "|create";
983 // Check if the user have create permission for the
985 if (checkMirrorMakerPermission(ctx, permission)) {
987 JSONObject listAll = new JSONObject();
988 JSONObject emptyObject = new JSONObject();
990 // Create a listAllMirrorMaker Json object
992 listAll.put("listAllMirrorMaker", emptyObject);
994 } catch (JSONException e) {
996 LOGGER.error("JSONException: ", e);
999 // set a random number as messageID
1000 String randomStr = getRandomNum();
1001 listAll.put("messageID", randomStr);
1002 InputStream inStream = null;
1004 // convert listAll Json object to InputStream object
1006 inStream = IOUtils.toInputStream(listAll.toString(), UTF_8);
1008 } catch (IOException ioe) {
1009 LOGGER.error("IOException: ", ioe);
1011 // call listAllMirrorMaker
1012 mirrorService.pushEvents(ctx, topic, inStream, null, null);
1014 // subscribe for listMirrorMaker
1015 long startTime = System.currentTimeMillis();
1016 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1018 while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
1019 && (System.currentTimeMillis() - startTime) < timeout) {
1020 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1024 JSONArray jsonArray;
1025 JSONArray listMirrorMaker = null;
1027 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
1028 && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
1029 msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
1030 jsonArray = new JSONArray(msgFrmSubscribe);
1032 for (int i = 0; i < jsonArray.length(); i++) {
1033 jsonObj = jsonArray.getJSONObject(i);
1035 JSONObject obj = new JSONObject();
1036 if (jsonObj.has(MESSAGE)) {
1037 obj = jsonObj.getJSONObject(MESSAGE);
1039 if (obj.has("messageID") && obj.get("messageID").equals(randomStr)
1040 && obj.has(LISTMIRRORMAKER)) {
1041 listMirrorMaker = obj.getJSONArray(LISTMIRRORMAKER);
1045 String whitelist = null;
1046 if (listMirrorMaker != null) {
1047 for (int i = 0; i < listMirrorMaker.length(); i++) {
1049 JSONObject mm = new JSONObject();
1050 mm = listMirrorMaker.getJSONObject(i);
1051 String name = mm.getString("name");
1053 if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) {
1054 whitelist = mm.getString("whitelist");
1060 List<String> topicList = new ArrayList<>();
1062 if (whitelist != null) {
1063 topicList = Arrays.asList(whitelist.split(","));
1065 boolean removeTopic = false;
1066 String topicToRemove = jsonOb.getString("whitelistTopicName");
1068 if (topicList.contains(topicToRemove)) {
1071 sendErrResponse(ctx, "The topic does not exist.");
1075 UpdateWhiteList updateWhiteList = new UpdateWhiteList();
1076 MirrorMaker mirrorMaker = new MirrorMaker();
1078 mirrorMaker.setName(jsonOb.getString("name"));
1079 mirrorMaker.setWhitelist(removeTopic(whitelist, topicToRemove));
1081 String newRandom = getRandomNum();
1083 updateWhiteList.setMessageID(newRandom);
1084 updateWhiteList.setUpdateWhiteList(mirrorMaker);
1086 Gson g = new Gson();
1087 g.toJson(updateWhiteList);
1089 InputStream inputStream;
1090 inputStream = IOUtils.toInputStream(g.toJson(updateWhiteList), UTF_8);
1091 callPubSubForWhitelist(newRandom, ctx, inputStream, getNamespace(topicToRemove));
1096 JSONObject err = new JSONObject();
1098 "listWhiteList is not available, please make sure MirrorMakerAgent is running");
1099 DMaaPResponseBuilder.respondOk(ctx, err);
1103 sendErrResponse(ctx, NO_USER_CREATE_PERMISSION);
1108 sendErrResponse(ctx, "This is not a DeleteAllWhitelist request. Please try again.");
1111 } catch (IOException | CambriaApiException | ConfigDbException | AccessDeniedException
1112 | TopicExistsException | missingReqdSetting | UnavailableException e) {
1114 LOGGER.error("IOException: ", e);
1117 // Send error response if user does not provide Authorization
1119 sendErrResponse(ctx, NO_USER_PERMISSION);
1123 private String getNamespace(String topic) {
1124 return topic.substring(0, topic.lastIndexOf("."));
1127 private String removeTopic(String whitelist, String topicToRemove) {
1128 List<String> topicList = new ArrayList<>();
1129 List<String> newTopicList = new ArrayList<>();
1131 if (whitelist.contains(",")) {
1132 topicList = Arrays.asList(whitelist.split(","));
1136 if (topicList.contains(topicToRemove)) {
1137 for (String topic : topicList) {
1138 if (!topic.equals(topicToRemove)) {
1139 newTopicList.add(topic);
1144 String newWhitelist = StringUtils.join(newTopicList, ",");
1146 return newWhitelist;
1149 private void callPubSubForWhitelist(String randomStr, DMaaPContext ctx, InputStream inStream, String namespace) {
1152 mirrorService.pushEvents(ctx, topic, inStream, null, null);
1153 long startTime = System.currentTimeMillis();
1154 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1156 while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
1157 && (System.currentTimeMillis() - startTime) < timeout) {
1158 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1162 JSONArray jsonArray;
1163 JSONArray jsonArrayNamespace = null;
1165 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
1166 && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
1167 msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
1168 jsonArray = new JSONArray(msgFrmSubscribe);
1170 for (int i = 0; i < jsonArray.length(); i++) {
1171 jsonObj = jsonArray.getJSONObject(i);
1173 JSONObject obj = new JSONObject();
1174 if (jsonObj.has(MESSAGE)) {
1175 obj = jsonObj.getJSONObject(MESSAGE);
1177 if (obj.has("messageID") && obj.get("messageID").equals(randomStr) && obj.has(LISTMIRRORMAKER)) {
1178 jsonArrayNamespace = obj.getJSONArray(LISTMIRRORMAKER);
1181 JSONObject finalJasonObj = new JSONObject();
1182 JSONArray finalJsonArray = new JSONArray();
1184 for (int i = 0; i < jsonArrayNamespace.length(); i++) {
1187 mmObj = jsonArrayNamespace.getJSONObject(i);
1190 if (mmObj.has("whitelist")) {
1191 whitelist = getWhitelistByNamespace(mmObj.getString("whitelist"), namespace);
1193 if (whitelist != null) {
1194 mmObj.remove("whitelist");
1195 mmObj.put("whitelist", whitelist);
1197 mmObj.remove("whitelist");
1200 finalJsonArray.put(mmObj);
1202 finalJasonObj.put(LISTMIRRORMAKER, finalJsonArray);
1204 DMaaPResponseBuilder.respondOk(ctx, finalJasonObj);
1208 JSONObject err = new JSONObject();
1209 err.append(ERROR, "listMirrorMaker is not available, please make sure MirrorMakerAgent is running");
1210 DMaaPResponseBuilder.respondOk(ctx, err);
1213 } catch (Exception e) {
1214 LOGGER.error("Exception: ", e);
1218 private String getWhitelistByNamespace(String originalWhitelist, String namespace) {
1220 String whitelist = null;
1221 List<String> resultList = new ArrayList<>();
1222 List<String> whitelistList = new ArrayList<>();
1223 whitelistList = Arrays.asList(originalWhitelist.split(","));
1225 for (String topic : whitelistList) {
1226 if (StringUtils.isNotBlank(originalWhitelist) && getNamespace(topic).equals(namespace)) {
1227 resultList.add(topic);
1230 if (!resultList.isEmpty()) {
1231 whitelist = StringUtils.join(resultList, ",");
1237 private JSONArray getListMirrorMaker(String msgFrmSubscribe, String randomStr) {
1239 JSONArray jsonArray;
1240 JSONArray listMirrorMaker = new JSONArray();
1242 msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
1243 jsonArray = new JSONArray(msgFrmSubscribe);
1245 for (int i = 0; i < jsonArray.length(); i++) {
1246 jsonObj = jsonArray.getJSONObject(i);
1248 JSONObject obj = new JSONObject();
1249 if (jsonObj.has(MESSAGE)) {
1250 obj = jsonObj.getJSONObject(MESSAGE);
1252 if (obj.has("messageID") && obj.get("messageID").equals(randomStr) && obj.has(LISTMIRRORMAKER)) {
1253 listMirrorMaker = obj.getJSONArray(LISTMIRRORMAKER);
1257 return listMirrorMaker;