1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.nsa.dmaap.service;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.util.ArrayList;
27 import java.util.Date;
28 import java.util.List;
31 import javax.servlet.http.HttpServletRequest;
32 import javax.servlet.http.HttpServletResponse;
33 import javax.ws.rs.POST;
34 import javax.ws.rs.Path;
35 import javax.ws.rs.Produces;
36 import javax.ws.rs.core.Context;
37 import org.json.JSONObject;
38 import org.apache.commons.io.IOUtils;
39 import org.apache.commons.lang.StringUtils;
40 import com.att.eelf.configuration.EELFLogger;
41 import com.att.eelf.configuration.EELFManager;
42 import org.springframework.beans.factory.annotation.Autowired;
43 import org.springframework.beans.factory.annotation.Qualifier;
44 import org.springframework.stereotype.Component;
46 import com.att.nsa.cambria.utils.ConfigurationReader;
47 import com.att.nsa.cambria.utils.DMaaPResponseBuilder;
48 import com.att.nsa.cambria.utils.Utils;
49 import com.att.nsa.configs.ConfigDbException;
50 import com.att.nsa.dmaap.mmagent.*;
51 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
52 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
53 import com.google.gson.Gson;
54 import com.google.gson.JsonSyntaxException;
56 import edu.emory.mathcs.backport.java.util.Arrays;
58 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
59 import com.att.nsa.cambria.CambriaApiException;
60 import com.att.nsa.cambria.backends.ConsumerFactory.UnavailableException;
62 import org.json.JSONArray;
63 import org.json.JSONException;
64 import com.att.nsa.cambria.beans.DMaaPContext;
65 import com.att.nsa.cambria.constants.CambriaConstants;
66 import com.att.nsa.cambria.exception.DMaaPErrorMessages;
67 import com.att.nsa.cambria.metabroker.Broker.TopicExistsException;
68 import com.att.nsa.cambria.security.DMaaPAAFAuthenticator;
69 import com.att.nsa.cambria.security.DMaaPAAFAuthenticatorImpl;
70 import com.att.nsa.cambria.service.MMService;
73 * Rest Service class for Mirror Maker proxy Rest Services
75 * @author <a href="mailto:"></a>
81 public class MMRestService {
83 private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(MMRestService.class);
84 private static final String NO_ADMIN_PERMISSION = "No Mirror Maker Admin permission.";
85 private static final String NO_USER_PERMISSION = "No Mirror Maker User permission.";
86 private static final String NO_USER_CREATE_PERMISSION = "No Mirror Maker User Create permission.";
87 private static final String NAME_DOES_NOT_MEET_REQUIREMENT = "Mirror Maker name can only contain alpha numeric";
88 private static final String INVALID_IPPORT = "This is not a valid IP:Port";
89 private static final String MIRROR_MAKERADMIN = "msgRtr.mirrormakeradmin.aaf";
90 private static final String MIRROR_MAKERUSER = "msgRtr.mirrormakeruser.aaf";
91 private static final String UTF_8 = "UTF-8";
92 private static final String MESSAGE = "message";
93 private static final String LISTMIRRORMAKER = "listMirrorMaker";
94 private static final String ERROR = "error";
95 private static final String NAMESPACE = "namespace";
99 private String consumergroup;
100 private String consumerid;
103 @Qualifier("configurationReader")
104 private ConfigurationReader configReader;
107 private HttpServletRequest request;
110 private HttpServletResponse response;
113 private MMService mirrorService;
116 private DMaaPErrorMessages errorMessages;
119 * This method is used for taking Configuration Object,HttpServletRequest
120 * Object,HttpServletRequest HttpServletResponse Object,HttpServletSession
123 * @return DMaaPContext object from where user can get Configuration
124 * Object,HttpServlet Object
127 private DMaaPContext getDmaapContext() {
128 DMaaPContext dmaapContext = new DMaaPContext();
129 dmaapContext.setRequest(request);
130 dmaapContext.setResponse(response);
131 dmaapContext.setConfigReader(configReader);
132 dmaapContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
138 @Produces("application/json")
140 public void callCreateMirrorMaker(InputStream msg) {
142 DMaaPContext ctx = getDmaapContext();
143 if (checkMirrorMakerPermission(ctx,
144 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERADMIN))) {
148 String randomStr = getRandomNum();
150 InputStream inStream = null;
151 Gson gson = new Gson();
152 CreateMirrorMaker createMirrorMaker = new CreateMirrorMaker();
155 input = IOUtils.toString(msg, UTF_8);
157 if (input != null && input.length() > 0) {
158 input = removeExtraChar(input);
161 // Check if the request has CreateMirrorMaker
163 createMirrorMaker = gson.fromJson(input, CreateMirrorMaker.class);
165 } catch (JsonSyntaxException ex) {
167 sendErrResponse(ctx, errorMessages.getIncorrectJson());
168 LOGGER.error("JsonSyntaxException: ", ex);
170 String name = createMirrorMaker.getCreateMirrorMaker().getName();
171 // send error message if it is not a CreateMirrorMaker request.
172 if (createMirrorMaker.getCreateMirrorMaker() == null) {
173 sendErrResponse(ctx, "This is not a CreateMirrorMaker request. Please try again.");
176 // MirrorMaker whitelist and status should not be passed
177 else if (createMirrorMaker.getCreateMirrorMaker().getWhitelist() != null
178 || createMirrorMaker.getCreateMirrorMaker().getStatus() != null) {
179 sendErrResponse(ctx, "This is not a CreateMirrorMaker request. Please try again.");
182 // if empty, blank name is entered
183 else if (StringUtils.isBlank(name)) {
184 sendErrResponse(ctx, "Name can not be empty or blank.");
187 // Check if the name contains only Alpha Numeric
188 else if (!isAlphaNumeric(name)) {
189 sendErrResponse(ctx, NAME_DOES_NOT_MEET_REQUIREMENT);
193 // Validate the IP and Port
194 else if (!StringUtils.isBlank(createMirrorMaker.getCreateMirrorMaker().getConsumer())
195 && !StringUtils.isBlank(createMirrorMaker.getCreateMirrorMaker().getProducer())
196 && !validateIPPort(createMirrorMaker.getCreateMirrorMaker().getConsumer())
197 || !validateIPPort(createMirrorMaker.getCreateMirrorMaker().getProducer())) {
198 sendErrResponse(ctx, INVALID_IPPORT);
201 // Set a random number as messageID, convert Json Object to
202 // InputStream and finally call publisher and subscriber
203 else if (isAlphaNumeric(name) && validateIPPort(createMirrorMaker.getCreateMirrorMaker().getConsumer())
204 && validateIPPort(createMirrorMaker.getCreateMirrorMaker().getProducer())) {
206 createMirrorMaker.setMessageID(randomStr);
207 inStream = IOUtils.toInputStream(gson.toJson(createMirrorMaker), UTF_8);
208 callPubSub(randomStr, ctx, inStream);
211 } catch (IOException e) {
213 LOGGER.error("IOException: ", e);
216 // Send error response if user does not provide Authorization
218 sendErrResponse(ctx, NO_ADMIN_PERMISSION);
223 @Produces("application/json")
225 public void callListAllMirrorMaker(InputStream msg) throws CambriaApiException {
226 DMaaPContext ctx = getDmaapContext();
228 if (checkMirrorMakerPermission(ctx,
229 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERADMIN))) {
236 input = IOUtils.toString(msg, UTF_8);
238 if (input != null && input.length() > 0) {
239 input = removeExtraChar(input);
242 String randomStr = getRandomNum();
243 JSONObject jsonOb = null;
246 jsonOb = new JSONObject(input);
248 } catch (JSONException ex) {
250 sendErrResponse(ctx, errorMessages.getIncorrectJson());
251 LOGGER.error("JSONException: ", ex);
254 // Check if request has listAllMirrorMaker and
255 // listAllMirrorMaker is empty
256 if ((jsonOb != null) && (jsonOb.has("listAllMirrorMaker") &&
257 jsonOb.getJSONObject("listAllMirrorMaker").length() == 0)) {
258 jsonOb.put("messageID", randomStr);
259 InputStream inStream = null;
262 inStream = IOUtils.toInputStream(jsonOb.toString(), UTF_8);
264 } catch (IOException ioe) {
265 LOGGER.error("IOException: ", ioe);
268 callPubSub(randomStr, ctx, inStream);
272 sendErrResponse(ctx, "This is not a ListAllMirrorMaker request. Please try again.");
275 } catch (IOException ioe) {
277 ioe.printStackTrace();
278 LOGGER.error("IOException: ", ioe);
283 sendErrResponse(getDmaapContext(), NO_ADMIN_PERMISSION);
288 @Produces("application/json")
290 public void callUpdateMirrorMaker(InputStream msg) throws CambriaApiException {
292 DMaaPContext ctx = getDmaapContext();
293 if (checkMirrorMakerPermission(ctx,
294 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERADMIN))) {
298 String randomStr = getRandomNum();
300 InputStream inStream = null;
301 Gson gson = new Gson();
302 UpdateMirrorMaker updateMirrorMaker = new UpdateMirrorMaker();
305 input = IOUtils.toString(msg, UTF_8);
307 if (input != null && input.length() > 0) {
308 input = removeExtraChar(input);
311 // Check if the request has UpdateMirrorMaker
313 updateMirrorMaker = gson.fromJson(input, UpdateMirrorMaker.class);
315 } catch (JsonSyntaxException ex) {
317 sendErrResponse(ctx, errorMessages.getIncorrectJson());
318 LOGGER.error("JsonSyntaxException: ", ex);
321 String name = updateMirrorMaker.getUpdateMirrorMaker().getName();
323 // send error message if it is not a UpdateMirrorMaker request.
324 if (updateMirrorMaker.getUpdateMirrorMaker() == null) {
325 sendErrResponse(ctx, "This is not a UpdateMirrorMaker request. Please try again.");
328 // MirrorMaker whitelist and status should not be passed
329 else if (updateMirrorMaker.getUpdateMirrorMaker().getWhitelist() != null
330 || updateMirrorMaker.getUpdateMirrorMaker().getStatus() != null) {
331 sendErrResponse(ctx, "This is not a UpdateMirrorMaker request. Please try again.");
334 // if empty, blank name is entered
335 else if (StringUtils.isBlank(name)) {
336 sendErrResponse(ctx, "Name can not be empty or blank.");
339 // Check if the name contains only Alpha Numeric
340 else if (!isAlphaNumeric(name)) {
341 sendErrResponse(ctx, NAME_DOES_NOT_MEET_REQUIREMENT);
345 // Validate the IP and Port
346 else if (!StringUtils.isBlank(updateMirrorMaker.getUpdateMirrorMaker().getConsumer())
347 && !StringUtils.isBlank(updateMirrorMaker.getUpdateMirrorMaker().getProducer())
348 && !validateIPPort(updateMirrorMaker.getUpdateMirrorMaker().getConsumer())
349 || !validateIPPort(updateMirrorMaker.getUpdateMirrorMaker().getProducer())) {
350 sendErrResponse(ctx, INVALID_IPPORT);
353 // Set a random number as messageID, convert Json Object to
354 // InputStream and finally call publisher and subscriber
355 else if (isAlphaNumeric(name) && validateIPPort(updateMirrorMaker.getUpdateMirrorMaker().getConsumer())
356 && validateIPPort(updateMirrorMaker.getUpdateMirrorMaker().getProducer())) {
358 updateMirrorMaker.setMessageID(randomStr);
359 inStream = IOUtils.toInputStream(gson.toJson(updateMirrorMaker), UTF_8);
360 callPubSub(randomStr, ctx, inStream);
363 } catch (IOException e) {
365 LOGGER.error("IOException: ", e);
368 // Send error response if user does not provide Authorization
370 sendErrResponse(ctx, NO_ADMIN_PERMISSION);
375 @Produces("application/json")
377 public void callDeleteMirrorMaker(InputStream msg) throws CambriaApiException {
378 DMaaPContext ctx = getDmaapContext();
380 if (checkMirrorMakerPermission(ctx,
381 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERADMIN))) {
388 input = IOUtils.toString(msg, UTF_8);
390 if (input != null && input.length() > 0) {
391 input = removeExtraChar(input);
394 String randomStr = getRandomNum();
395 JSONObject jsonOb = null;
398 jsonOb = new JSONObject(input);
400 } catch (JSONException ex) {
402 sendErrResponse(ctx, errorMessages.getIncorrectJson());
403 LOGGER.error("JSONException: ", ex);
406 // Check if request has DeleteMirrorMaker and
407 // DeleteMirrorMaker has MirrorMaker object with name variable
408 // and check if the name contain only alpha numeric
409 if ((jsonOb != null) && (jsonOb.has("deleteMirrorMaker")
410 && jsonOb.getJSONObject("deleteMirrorMaker").length() == 1
411 && jsonOb.getJSONObject("deleteMirrorMaker").has("name")
412 && !StringUtils.isBlank(jsonOb.getJSONObject("deleteMirrorMaker").getString("name"))
413 && isAlphaNumeric(jsonOb.getJSONObject("deleteMirrorMaker").getString("name")))) {
415 jsonOb.put("messageID", randomStr);
416 InputStream inStream = null;
419 inStream = IOUtils.toInputStream(jsonOb.toString(), UTF_8);
421 } catch (IOException ioe) {
422 ioe.printStackTrace();
423 LOGGER.error("IOException: ", ioe);
426 callPubSub(randomStr, ctx, inStream);
430 sendErrResponse(ctx, "This is not a DeleteMirrorMaker request. Please try again.");
433 } catch (IOException ioe) {
435 ioe.printStackTrace();
436 LOGGER.error("IOException: ", ioe);
441 sendErrResponse(getDmaapContext(), NO_ADMIN_PERMISSION);
445 private boolean isListMirrorMaker(String msg, String messageID) {
446 String topicmsg = msg;
447 topicmsg = removeExtraChar(topicmsg);
451 boolean exist = false;
453 if (!StringUtils.isBlank(topicmsg) && topicmsg.length() > 2) {
454 jArray = new JSONArray(topicmsg);
456 for (int i = 0; i < jArray.length(); i++) {
457 jObj = jArray.getJSONObject(i);
459 JSONObject obj = new JSONObject();
460 if (jObj.has(MESSAGE)) {
461 obj = jObj.getJSONObject(MESSAGE);
463 if (obj.has("messageID") && obj.get("messageID").equals(messageID) && obj.has(LISTMIRRORMAKER)) {
472 private void loadProperty() {
474 this.timeout = Integer.parseInt(
475 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.timeout").trim());
476 this.topic = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.topic").trim();
477 this.consumergroup = AJSCPropertiesMap
478 .getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.consumergroup").trim();
479 this.consumerid = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.consumerid")
483 private String removeExtraChar(String message) {
484 String str = message;
485 str = checkJsonFormate(str);
487 if (str != null && str.length() > 0) {
488 str = str.replace("\\", "");
489 str = str.replace("\"{", "{");
490 str = str.replace("}\"", "}");
495 private String getRandomNum() {
496 long random = Math.round(Math.random() * 89999) + 10000;
497 String strLong = Long.toString(random);
501 private boolean isAlphaNumeric(String name) {
502 String pattern = "^[a-zA-Z0-9]*$";
503 if (name.matches(pattern)) {
509 // This method validate IPv4
510 private boolean validateIPPort(String ipPort) {
511 String pattern = "^([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\."
512 + "([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.([01]?\\d\\d?|2[0-4]\\d|25[0-5]):"
513 + "([1-9][0-9]{0,3}|[1-5][0-9]{4}|6[0-4][0-9]{3}|65[0-4][0-9]{2}|655[0-2][0-9]|6553[0-5])$";
514 if (ipPort.matches(pattern)) {
520 private String checkJsonFormate(String jsonStr) {
522 String json = jsonStr;
523 if (jsonStr != null && jsonStr.length() > 0 && jsonStr.startsWith("[") && !jsonStr.endsWith("]")) {
529 private boolean checkMirrorMakerPermission(DMaaPContext ctx, String permission) {
531 boolean hasPermission = false;
533 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
535 if (aaf.aafAuthentication(ctx.getRequest(), permission)) {
536 hasPermission = true;
538 return hasPermission;
541 private void callPubSub(String randomstr, DMaaPContext ctx, InputStream inStream) {
543 mirrorService.pushEvents(ctx, topic, inStream, null, null);
544 long startTime = System.currentTimeMillis();
545 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
547 while (!isListMirrorMaker(msgFrmSubscribe, randomstr)
548 && (System.currentTimeMillis() - startTime) < timeout) {
549 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
553 JSONObject finalJsonObj = new JSONObject();
556 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
557 && isListMirrorMaker(msgFrmSubscribe, randomstr)) {
558 msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
559 jsonArray = new JSONArray(msgFrmSubscribe);
561 for (int i = 0; i < jsonArray.length(); i++) {
562 jsonObj = jsonArray.getJSONObject(i);
564 JSONObject obj = new JSONObject();
565 if (jsonObj.has(MESSAGE)) {
566 obj = jsonObj.getJSONObject(MESSAGE);
568 if (obj.has("messageID") && obj.get("messageID").equals(randomstr) && obj.has(LISTMIRRORMAKER)) {
569 finalJsonObj.put(LISTMIRRORMAKER, obj.get(LISTMIRRORMAKER));
574 DMaaPResponseBuilder.respondOk(ctx, finalJsonObj);
578 JSONObject err = new JSONObject();
579 err.append(ERROR, "listMirrorMaker is not available, please make sure MirrorMakerAgent is running");
580 DMaaPResponseBuilder.respondOk(ctx, err);
583 } catch (Exception e) {
585 LOGGER.error("Exception: ", e);
589 private void sendErrResponse(DMaaPContext ctx, String errMsg) {
590 JSONObject err = new JSONObject();
591 err.append(ERROR, errMsg);
594 DMaaPResponseBuilder.respondOk(ctx, err);
595 LOGGER.error(errMsg);
597 } catch (JSONException | IOException e) {
598 LOGGER.error(errMsg);
602 @SuppressWarnings("unchecked")
604 @Produces("application/json")
605 @Path("/listallwhitelist")
606 public void listWhiteList(InputStream msg) {
608 DMaaPContext ctx = getDmaapContext();
609 if (checkMirrorMakerPermission(ctx,
610 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERUSER))) {
616 input = IOUtils.toString(msg, UTF_8);
618 if (input != null && input.length() > 0) {
619 input = removeExtraChar(input);
622 // Check if it is correct Json object
623 JSONObject jsonOb = null;
626 jsonOb = new JSONObject(input);
628 } catch (JSONException ex) {
630 sendErrResponse(ctx, errorMessages.getIncorrectJson());
631 LOGGER.error("JSONException: ", ex);
634 // Check if the request has name and name contains only alpha
636 // and check if the request has namespace and namespace contains
637 // only alpha numeric
639 && jsonOb.length() == 2 && jsonOb.has("name")
640 && !StringUtils.isBlank(jsonOb.getString("name"))
641 && isAlphaNumeric(jsonOb.getString("name")) && jsonOb.has(NAMESPACE)
642 && !StringUtils.isBlank(jsonOb.getString(NAMESPACE))) {
644 String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
645 "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString(NAMESPACE) + "|create";
647 // Check if the user have create permission for the
649 if (checkMirrorMakerPermission(ctx, permission)) {
651 JSONObject listAll = new JSONObject();
652 JSONObject emptyObject = new JSONObject();
654 // Create a listAllMirrorMaker Json object
656 listAll.put("listAllMirrorMaker", emptyObject);
658 } catch (JSONException e) {
660 LOGGER.error("JSONException: ", e);
663 // set a random number as messageID
664 String randomStr = getRandomNum();
665 listAll.put("messageID", randomStr);
666 InputStream inStream = null;
668 // convert listAll Json object to InputStream object
670 inStream = IOUtils.toInputStream(listAll.toString(), UTF_8);
672 } catch (IOException ioe) {
673 ioe.printStackTrace();
674 LOGGER.error("IOException: ", ioe);
676 // call listAllMirrorMaker
677 mirrorService.pushEvents(ctx, topic, inStream, null, null);
679 // subscribe for listMirrorMaker
680 long startTime = System.currentTimeMillis();
681 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
683 while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
684 && (System.currentTimeMillis() - startTime) < timeout) {
685 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
688 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
689 && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
691 JSONArray listMirrorMaker;
692 listMirrorMaker = getListMirrorMaker(msgFrmSubscribe, randomStr);
694 String whitelist = null;
695 for (int i = 0; i < listMirrorMaker.length(); i++) {
698 mm = listMirrorMaker.getJSONObject(i);
699 String name = mm.getString("name");
701 if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) {
702 whitelist = mm.getString("whitelist");
707 if (!StringUtils.isBlank(whitelist)) {
709 List<String> topicList = new ArrayList<>();
710 List<String> finalTopicList = new ArrayList<>();
711 topicList = Arrays.asList(whitelist.split(","));
713 for (String topic : topicList) {
714 if (topic != null && !topic.equals("null")
715 && getNamespace(topic).equals(jsonOb.getString(NAMESPACE))) {
717 finalTopicList.add(topic);
721 String topicNames = "";
723 if (!finalTopicList.isEmpty()) {
724 topicNames = StringUtils.join(finalTopicList, ",");
727 JSONObject listAllWhiteList = new JSONObject();
728 listAllWhiteList.put("name", jsonOb.getString("name"));
729 listAllWhiteList.put("whitelist", topicNames);
731 DMaaPResponseBuilder.respondOk(ctx, listAllWhiteList);
736 JSONObject err = new JSONObject();
738 "listWhiteList is not available, please make sure MirrorMakerAgent is running");
739 DMaaPResponseBuilder.respondOk(ctx, err);
743 sendErrResponse(ctx, NO_USER_CREATE_PERMISSION);
748 sendErrResponse(ctx, "This is not a ListAllWhitelist request. Please try again.");
751 } catch (IOException | CambriaApiException | ConfigDbException | AccessDeniedException
752 | TopicExistsException | missingReqdSetting | UnavailableException e) {
754 LOGGER.error("IOException: ", e);
757 sendErrResponse(ctx, NO_USER_PERMISSION);
761 @SuppressWarnings("unchecked")
763 @Produces("application/json")
764 @Path("/createwhitelist")
765 public void createWhiteList(InputStream msg) {
767 DMaaPContext ctx = getDmaapContext();
768 if (checkMirrorMakerPermission(ctx,
769 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERUSER))) {
775 input = IOUtils.toString(msg, UTF_8);
777 if (input != null && input.length() > 0) {
778 input = removeExtraChar(input);
781 // Check if it is correct Json object
782 JSONObject jsonOb = null;
785 jsonOb = new JSONObject(input);
787 } catch (JSONException ex) {
789 sendErrResponse(ctx, errorMessages.getIncorrectJson());
790 LOGGER.error("JSONException: ", ex);
793 // Check if the request has name and name contains only alpha numeric,
794 // check if the request has namespace and
795 // check if the request has whitelistTopicName
796 // check if the topic name contains only alpha numeric
797 if (jsonOb != null && jsonOb.length() == 3 && jsonOb.has("name")
798 && !StringUtils.isBlank(jsonOb.getString("name"))
799 && isAlphaNumeric(jsonOb.getString("name"))
800 && jsonOb.has(NAMESPACE) && !StringUtils.isBlank(jsonOb.getString(NAMESPACE))
801 && jsonOb.has("whitelistTopicName") && !StringUtils.isBlank(jsonOb.getString("whitelistTopicName"))
802 && isAlphaNumeric(jsonOb.getString("whitelistTopicName").substring(jsonOb.getString("whitelistTopicName").lastIndexOf(".")+1,
803 jsonOb.getString("whitelistTopicName").length()))) {
805 String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
806 "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString(NAMESPACE) + "|create";
808 // Check if the user have create permission for the
810 if (checkMirrorMakerPermission(ctx, permission)) {
812 JSONObject listAll = new JSONObject();
813 JSONObject emptyObject = new JSONObject();
815 // Create a listAllMirrorMaker Json object
817 listAll.put("listAllMirrorMaker", emptyObject);
819 } catch (JSONException e) {
821 LOGGER.error("JSONException: ", e);
824 // set a random number as messageID
825 String randomStr = getRandomNum();
826 listAll.put("messageID", randomStr);
827 InputStream inStream = null;
829 // convert listAll Json object to InputStream object
831 inStream = IOUtils.toInputStream(listAll.toString(), UTF_8);
833 } catch (IOException ioe) {
834 ioe.printStackTrace();
835 LOGGER.error("IOException: ", ioe);
837 // call listAllMirrorMaker
838 mirrorService.pushEvents(ctx, topic, inStream, null, null);
840 // subscribe for listMirrorMaker
841 long startTime = System.currentTimeMillis();
842 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
844 while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
845 && (System.currentTimeMillis() - startTime) < timeout) {
846 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
849 JSONArray listMirrorMaker;
851 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
852 && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
854 listMirrorMaker = getListMirrorMaker(msgFrmSubscribe, randomStr);
855 String whitelist = null;
857 for (int i = 0; i < listMirrorMaker.length(); i++) {
859 mm = listMirrorMaker.getJSONObject(i);
860 String name = mm.getString("name");
862 if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) {
863 whitelist = mm.getString("whitelist");
868 List<String> topicList = new ArrayList<>();
869 List<String> finalTopicList = new ArrayList<>();
871 if (whitelist != null) {
872 topicList = Arrays.asList(whitelist.split(","));
875 for (String st : topicList) {
876 if (!StringUtils.isBlank(st)) {
877 finalTopicList.add(st);
881 String newTopic = jsonOb.getString("whitelistTopicName");
883 if (!topicList.contains(newTopic)
884 && getNamespace(newTopic).equals(jsonOb.getString(NAMESPACE))) {
886 UpdateWhiteList updateWhiteList = new UpdateWhiteList();
887 MirrorMaker mirrorMaker = new MirrorMaker();
888 mirrorMaker.setName(jsonOb.getString("name"));
889 finalTopicList.add(newTopic);
890 String newWhitelist = "";
892 if (!finalTopicList.isEmpty()) {
893 newWhitelist = StringUtils.join(finalTopicList, ",");
896 mirrorMaker.setWhitelist(newWhitelist);
898 String newRandom = getRandomNum();
899 updateWhiteList.setMessageID(newRandom);
900 updateWhiteList.setUpdateWhiteList(mirrorMaker);
903 g.toJson(updateWhiteList);
904 InputStream inputStream;
905 inputStream = IOUtils.toInputStream(g.toJson(updateWhiteList), UTF_8);
906 // callPubSub(newRandom, ctx, inputStream);
907 callPubSubForWhitelist(newRandom, ctx, inputStream, jsonOb.getString(NAMESPACE));
909 } else if (topicList.contains(newTopic)) {
910 sendErrResponse(ctx, "The topic already exist.");
912 } else if (!getNamespace(newTopic).equals(jsonOb.getString(NAMESPACE))) {
914 "The namespace of the topic does not match with the namespace you provided.");
918 JSONObject err = new JSONObject();
920 "listWhiteList is not available, please make sure MirrorMakerAgent is running");
921 DMaaPResponseBuilder.respondOk(ctx, err);
925 sendErrResponse(ctx, NO_USER_CREATE_PERMISSION);
930 sendErrResponse(ctx, "This is not a createWhitelist request. Please try again.");
933 } catch (IOException | CambriaApiException | ConfigDbException | AccessDeniedException
934 | TopicExistsException | missingReqdSetting | UnavailableException e) {
936 LOGGER.error("IOException: ", e);
939 // Send error response if user does not provide Authorization
941 sendErrResponse(ctx, NO_USER_PERMISSION);
945 @SuppressWarnings("unchecked")
947 @Produces("application/json")
948 @Path("/deletewhitelist")
949 public void deleteWhiteList(InputStream msg) {
951 DMaaPContext ctx = getDmaapContext();
952 if (checkMirrorMakerPermission(ctx,
953 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERUSER))) {
959 input = IOUtils.toString(msg, UTF_8);
961 if (input != null && input.length() > 0) {
962 input = removeExtraChar(input);
965 // Check if it is correct Json object
966 JSONObject jsonOb = null;
969 jsonOb = new JSONObject(input);
971 } catch (JSONException ex) {
973 sendErrResponse(ctx, errorMessages.getIncorrectJson());
974 LOGGER.error("JSONException: ", ex);
977 // Check if the request has name and name contains only alpha numeric,
978 // check if the request has namespace and
979 // check if the request has whitelistTopicName
980 if (jsonOb != null && jsonOb.length() == 3 && jsonOb.has("name") && isAlphaNumeric(jsonOb.getString("name"))
981 && jsonOb.has(NAMESPACE) && jsonOb.has("whitelistTopicName")
982 && isAlphaNumeric(jsonOb.getString("whitelistTopicName").substring(jsonOb.getString("whitelistTopicName").lastIndexOf(".")+1,
983 jsonOb.getString("whitelistTopicName").length()))) {
985 String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
986 "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString(NAMESPACE) + "|create";
988 // Check if the user have create permission for the
990 if (checkMirrorMakerPermission(ctx, permission)) {
992 JSONObject listAll = new JSONObject();
993 JSONObject emptyObject = new JSONObject();
995 // Create a listAllMirrorMaker Json object
997 listAll.put("listAllMirrorMaker", emptyObject);
999 } catch (JSONException e) {
1001 LOGGER.error("JSONException: ", e);
1004 // set a random number as messageID
1005 String randomStr = getRandomNum();
1006 listAll.put("messageID", randomStr);
1007 InputStream inStream = null;
1009 // convert listAll Json object to InputStream object
1011 inStream = IOUtils.toInputStream(listAll.toString(), UTF_8);
1013 } catch (IOException ioe) {
1014 ioe.printStackTrace();
1015 LOGGER.error("IOException: ", ioe);
1017 // call listAllMirrorMaker
1018 mirrorService.pushEvents(ctx, topic, inStream, null, null);
1020 // subscribe for listMirrorMaker
1021 long startTime = System.currentTimeMillis();
1022 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1024 while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
1025 && (System.currentTimeMillis() - startTime) < timeout) {
1026 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1030 JSONArray jsonArray;
1031 JSONArray listMirrorMaker = null;
1033 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
1034 && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
1035 msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
1036 jsonArray = new JSONArray(msgFrmSubscribe);
1038 for (int i = 0; i < jsonArray.length(); i++) {
1039 jsonObj = jsonArray.getJSONObject(i);
1041 JSONObject obj = new JSONObject();
1042 if (jsonObj.has(MESSAGE)) {
1043 obj = jsonObj.getJSONObject(MESSAGE);
1045 if (obj.has("messageID") && obj.get("messageID").equals(randomStr) && obj.has(LISTMIRRORMAKER)) {
1046 listMirrorMaker = obj.getJSONArray(LISTMIRRORMAKER);
1050 String whitelist = null;
1051 if (listMirrorMaker != null) {
1052 for (int i = 0; i < listMirrorMaker.length(); i++) {
1054 JSONObject mm = new JSONObject();
1055 mm = listMirrorMaker.getJSONObject(i);
1056 String name = mm.getString("name");
1058 if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) {
1059 whitelist = mm.getString("whitelist");
1065 List<String> topicList = new ArrayList<>();
1067 if (whitelist != null) {
1068 topicList = Arrays.asList(whitelist.split(","));
1070 boolean removeTopic = false;
1071 String topicToRemove = jsonOb.getString("whitelistTopicName");
1073 if (topicList.contains(topicToRemove)) {
1076 sendErrResponse(ctx, "The topic does not exist.");
1081 UpdateWhiteList updateWhiteList = new UpdateWhiteList();
1082 MirrorMaker mirrorMaker = new MirrorMaker();
1084 mirrorMaker.setName(jsonOb.getString("name"));
1085 mirrorMaker.setWhitelist(removeTopic(whitelist, topicToRemove));
1087 String newRandom = getRandomNum();
1089 updateWhiteList.setMessageID(newRandom);
1090 updateWhiteList.setUpdateWhiteList(mirrorMaker);
1092 Gson g = new Gson();
1093 g.toJson(updateWhiteList);
1095 InputStream inputStream;
1096 inputStream = IOUtils.toInputStream(g.toJson(updateWhiteList), UTF_8);
1097 callPubSubForWhitelist(newRandom, ctx, inputStream, getNamespace(topicToRemove));
1102 JSONObject err = new JSONObject();
1104 "listWhiteList is not available, please make sure MirrorMakerAgent is running");
1105 DMaaPResponseBuilder.respondOk(ctx, err);
1109 sendErrResponse(ctx, NO_USER_CREATE_PERMISSION);
1114 sendErrResponse(ctx, "This is not a DeleteAllWhitelist request. Please try again.");
1117 } catch (IOException | CambriaApiException | ConfigDbException | AccessDeniedException
1118 | TopicExistsException | missingReqdSetting | UnavailableException e) {
1120 LOGGER.error("IOException: ", e);
1123 // Send error response if user does not provide Authorization
1125 sendErrResponse(ctx, NO_USER_PERMISSION);
1129 private String getNamespace(String topic) {
1130 return topic.substring(0, topic.lastIndexOf("."));
1133 private String removeTopic(String whitelist, String topicToRemove) {
1134 List<String> topicList = new ArrayList<>();
1135 List<String> newTopicList = new ArrayList<>();
1137 if (whitelist.contains(",")) {
1138 topicList = Arrays.asList(whitelist.split(","));
1142 if (topicList.contains(topicToRemove)) {
1143 for (String topic: topicList) {
1144 if (!topic.equals(topicToRemove)) {
1145 newTopicList.add(topic);
1150 String newWhitelist = StringUtils.join(newTopicList, ",");
1152 return newWhitelist;
1155 private void callPubSubForWhitelist(String randomStr, DMaaPContext ctx, InputStream inStream, String namespace) {
1158 mirrorService.pushEvents(ctx, topic, inStream, null, null);
1159 long startTime = System.currentTimeMillis();
1160 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1162 while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
1163 && (System.currentTimeMillis() - startTime) < timeout) {
1164 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1168 JSONArray jsonArray;
1169 JSONArray jsonArrayNamespace = null;
1171 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
1172 && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
1173 msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
1174 jsonArray = new JSONArray(msgFrmSubscribe);
1176 for (int i = 0; i < jsonArray.length(); i++) {
1177 jsonObj = jsonArray.getJSONObject(i);
1179 JSONObject obj = new JSONObject();
1180 if (jsonObj.has(MESSAGE)) {
1181 obj = jsonObj.getJSONObject(MESSAGE);
1183 if (obj.has("messageID") && obj.get("messageID").equals(randomStr) && obj.has(LISTMIRRORMAKER)) {
1184 jsonArrayNamespace = obj.getJSONArray(LISTMIRRORMAKER);
1187 JSONObject finalJasonObj = new JSONObject();
1188 JSONArray finalJsonArray = new JSONArray();
1190 for (int i = 0; i < jsonArrayNamespace.length(); i++) {
1193 mmObj = jsonArrayNamespace.getJSONObject(i);
1196 if (mmObj.has("whitelist")) {
1197 whitelist = getWhitelistByNamespace(mmObj.getString("whitelist"), namespace);
1199 if (whitelist != null) {
1200 mmObj.remove("whitelist");
1201 mmObj.put("whitelist", whitelist);
1203 mmObj.remove("whitelist");
1206 finalJsonArray.put(mmObj);
1208 finalJasonObj.put(LISTMIRRORMAKER, finalJsonArray);
1210 DMaaPResponseBuilder.respondOk(ctx, finalJasonObj);
1214 JSONObject err = new JSONObject();
1215 err.append(ERROR, "listMirrorMaker is not available, please make sure MirrorMakerAgent is running");
1216 DMaaPResponseBuilder.respondOk(ctx, err);
1219 } catch (Exception e) {
1220 LOGGER.error("Exception: ", e);
1224 private String getWhitelistByNamespace(String originalWhitelist, String namespace) {
1226 String whitelist = null;
1227 List<String> resultList = new ArrayList<>();
1228 List<String> whitelistList = new ArrayList<>();
1229 whitelistList = Arrays.asList(originalWhitelist.split(","));
1231 for (String topic : whitelistList) {
1232 if (StringUtils.isNotBlank(originalWhitelist) && getNamespace(topic).equals(namespace)) {
1233 resultList.add(topic);
1236 if (!resultList.isEmpty()) {
1237 whitelist = StringUtils.join(resultList, ",");
1243 private JSONArray getListMirrorMaker(String msgFrmSubscribe, String randomStr) {
1245 JSONArray jsonArray;
1246 JSONArray listMirrorMaker = new JSONArray();
1248 msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
1249 jsonArray = new JSONArray(msgFrmSubscribe);
1251 for (int i = 0; i < jsonArray.length(); i++) {
1252 jsonObj = jsonArray.getJSONObject(i);
1254 JSONObject obj = new JSONObject();
1255 if (jsonObj.has(MESSAGE)) {
1256 obj = jsonObj.getJSONObject(MESSAGE);
1258 if (obj.has("messageID") && obj.get("messageID").equals(randomStr) && obj.has(LISTMIRRORMAKER)) {
1259 listMirrorMaker = obj.getJSONArray(LISTMIRRORMAKER);
1263 return listMirrorMaker;