1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.nsa.dmaap.service;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.util.ArrayList;
27 import java.util.Date;
28 import java.util.List;
31 import javax.servlet.http.HttpServletRequest;
32 import javax.servlet.http.HttpServletResponse;
33 import javax.ws.rs.POST;
34 import javax.ws.rs.Path;
35 import javax.ws.rs.Produces;
36 import javax.ws.rs.core.Context;
37 import org.json.JSONObject;
38 import org.apache.commons.io.IOUtils;
39 import org.apache.commons.lang.StringUtils;
40 import com.att.eelf.configuration.EELFLogger;
41 import com.att.eelf.configuration.EELFManager;
42 import org.springframework.beans.factory.annotation.Autowired;
43 import org.springframework.beans.factory.annotation.Qualifier;
44 import org.springframework.stereotype.Component;
46 import com.att.nsa.cambria.utils.ConfigurationReader;
47 import com.att.nsa.cambria.utils.DMaaPResponseBuilder;
48 import com.att.nsa.cambria.utils.Utils;
49 import com.att.nsa.configs.ConfigDbException;
50 import com.att.nsa.dmaap.mmagent.*;
51 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
52 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
53 import com.google.gson.Gson;
54 import com.google.gson.JsonSyntaxException;
56 import edu.emory.mathcs.backport.java.util.Arrays;
58 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
59 import com.att.nsa.cambria.CambriaApiException;
60 import com.att.nsa.cambria.backends.ConsumerFactory.UnavailableException;
62 import org.json.JSONArray;
63 import org.json.JSONException;
64 import com.att.nsa.cambria.beans.DMaaPContext;
65 import com.att.nsa.cambria.constants.CambriaConstants;
66 import com.att.nsa.cambria.exception.DMaaPErrorMessages;
67 import com.att.nsa.cambria.metabroker.Broker.TopicExistsException;
68 import com.att.nsa.cambria.security.DMaaPAAFAuthenticator;
69 import com.att.nsa.cambria.security.DMaaPAAFAuthenticatorImpl;
70 import com.att.nsa.cambria.service.MMService;
73 * Rest Service class for Mirror Maker proxy Rest Services
75 * @author <a href="mailto:"></a>
81 public class MMRestService {
83 private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(MMRestService.class);
84 private static final String NO_ADMIN_PERMISSION = "No Mirror Maker Admin permission.";
85 private static final String NO_USER_PERMISSION = "No Mirror Maker User permission.";
86 private static final String NO_USER_CREATE_PERMISSION = "No Mirror Maker User Create permission.";
87 private static final String NAME_DOES_NOT_MEET_REQUIREMENT = "Mirror Maker name can only contain alpha numeric";
88 private static final String INVALID_IPPORT = "This is not a valid IP:Port";
89 private static final String MIRROR_MAKERADMIN = "msgRtr.mirrormakeradmin.aaf";
90 private static final String MIRROR_MAKERUSER = "msgRtr.mirrormakeruser.aaf";
91 private static final String UTF_8 = "UTF-8";
92 private static final String MESSAGE = "message";
93 private static final String LISTMIRRORMAKER = "listMirrorMaker";
94 private static final String ERROR = "error";
95 private static final String NAMESPACE = "namespace";
99 private String consumergroup;
100 private String consumerid;
103 @Qualifier("configurationReader")
104 private ConfigurationReader configReader;
107 private HttpServletRequest request;
110 private HttpServletResponse response;
113 private MMService mirrorService;
116 private DMaaPErrorMessages errorMessages;
119 * This method is used for taking Configuration Object,HttpServletRequest
120 * Object,HttpServletRequest HttpServletResponse Object,HttpServletSession
123 * @return DMaaPContext object from where user can get Configuration
124 * Object,HttpServlet Object
127 private DMaaPContext getDmaapContext() {
128 DMaaPContext dmaapContext = new DMaaPContext();
129 dmaapContext.setRequest(request);
130 dmaapContext.setResponse(response);
131 dmaapContext.setConfigReader(configReader);
132 dmaapContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
138 @Produces("application/json")
140 public void callCreateMirrorMaker(InputStream msg) {
142 DMaaPContext ctx = getDmaapContext();
143 if (checkMirrorMakerPermission(ctx,
144 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERADMIN))) {
148 String randomStr = getRandomNum();
150 InputStream inStream = null;
151 Gson gson = new Gson();
152 CreateMirrorMaker createMirrorMaker = new CreateMirrorMaker();
155 input = IOUtils.toString(msg, UTF_8);
157 if (input != null && input.length() > 0) {
158 input = removeExtraChar(input);
161 // Check if the request has CreateMirrorMaker
163 createMirrorMaker = gson.fromJson(input, CreateMirrorMaker.class);
165 } catch (JsonSyntaxException ex) {
167 sendErrResponse(ctx, errorMessages.getIncorrectJson());
168 LOGGER.error("JsonSyntaxException: ", ex);
170 String name = createMirrorMaker.getCreateMirrorMaker().getName();
171 // send error message if it is not a CreateMirrorMaker request.
172 if (createMirrorMaker.getCreateMirrorMaker() == null) {
173 sendErrResponse(ctx, "This is not a CreateMirrorMaker request. Please try again.");
176 // MirrorMaker whitelist and status should not be passed
177 else if (createMirrorMaker.getCreateMirrorMaker().getWhitelist() != null
178 || createMirrorMaker.getCreateMirrorMaker().getStatus() != null) {
179 sendErrResponse(ctx, "This is not a CreateMirrorMaker request. Please try again.");
182 // if empty, blank name is entered
183 else if (StringUtils.isBlank(name)) {
184 sendErrResponse(ctx, "Name can not be empty or blank.");
187 // Check if the name contains only Alpha Numeric
188 else if (!isAlphaNumeric(name)) {
189 sendErrResponse(ctx, NAME_DOES_NOT_MEET_REQUIREMENT);
193 // Validate the IP and Port
194 else if (!StringUtils.isBlank(createMirrorMaker.getCreateMirrorMaker().getConsumer())
195 && !StringUtils.isBlank(createMirrorMaker.getCreateMirrorMaker().getProducer())
196 && !validateIPPort(createMirrorMaker.getCreateMirrorMaker().getConsumer())
197 || !validateIPPort(createMirrorMaker.getCreateMirrorMaker().getProducer())) {
198 sendErrResponse(ctx, INVALID_IPPORT);
201 // Set a random number as messageID, convert Json Object to
202 // InputStream and finally call publisher and subscriber
203 else if (isAlphaNumeric(name) && validateIPPort(createMirrorMaker.getCreateMirrorMaker().getConsumer())
204 && validateIPPort(createMirrorMaker.getCreateMirrorMaker().getProducer())) {
206 createMirrorMaker.setMessageID(randomStr);
207 inStream = IOUtils.toInputStream(gson.toJson(createMirrorMaker), UTF_8);
208 callPubSub(randomStr, ctx, inStream);
211 } catch (IOException e) {
213 LOGGER.error("IOException: ", e);
216 // Send error response if user does not provide Authorization
218 sendErrResponse(ctx, NO_ADMIN_PERMISSION);
223 @Produces("application/json")
225 public void callListAllMirrorMaker(InputStream msg) throws CambriaApiException {
226 DMaaPContext ctx = getDmaapContext();
228 if (checkMirrorMakerPermission(ctx,
229 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERADMIN))) {
236 input = IOUtils.toString(msg, UTF_8);
238 if (input != null && input.length() > 0) {
239 input = removeExtraChar(input);
242 String randomStr = getRandomNum();
243 JSONObject jsonOb = null;
246 jsonOb = new JSONObject(input);
248 } catch (JSONException ex) {
250 sendErrResponse(ctx, errorMessages.getIncorrectJson());
251 LOGGER.error("JSONException: ", ex);
254 // Check if request has listAllMirrorMaker and
255 // listAllMirrorMaker is empty
256 if ((jsonOb != null) && (jsonOb.has("listAllMirrorMaker") &&
257 jsonOb.getJSONObject("listAllMirrorMaker").length() == 0)) {
258 jsonOb.put("messageID", randomStr);
259 InputStream inStream = null;
262 inStream = IOUtils.toInputStream(jsonOb.toString(), UTF_8);
264 } catch (IOException ioe) {
265 ioe.printStackTrace();
266 LOGGER.error("IOException: ", ioe);
269 callPubSub(randomStr, ctx, inStream);
273 sendErrResponse(ctx, "This is not a ListAllMirrorMaker request. Please try again.");
276 } catch (IOException ioe) {
278 ioe.printStackTrace();
279 LOGGER.error("IOException: ", ioe);
284 sendErrResponse(getDmaapContext(), NO_ADMIN_PERMISSION);
289 @Produces("application/json")
291 public void callUpdateMirrorMaker(InputStream msg) throws CambriaApiException {
293 DMaaPContext ctx = getDmaapContext();
294 if (checkMirrorMakerPermission(ctx,
295 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERADMIN))) {
299 String randomStr = getRandomNum();
301 InputStream inStream = null;
302 Gson gson = new Gson();
303 UpdateMirrorMaker updateMirrorMaker = new UpdateMirrorMaker();
306 input = IOUtils.toString(msg, UTF_8);
308 if (input != null && input.length() > 0) {
309 input = removeExtraChar(input);
312 // Check if the request has UpdateMirrorMaker
314 updateMirrorMaker = gson.fromJson(input, UpdateMirrorMaker.class);
316 } catch (JsonSyntaxException ex) {
318 sendErrResponse(ctx, errorMessages.getIncorrectJson());
319 LOGGER.error("JsonSyntaxException: ", ex);
322 String name = updateMirrorMaker.getUpdateMirrorMaker().getName();
324 // send error message if it is not a UpdateMirrorMaker request.
325 if (updateMirrorMaker.getUpdateMirrorMaker() == null) {
326 sendErrResponse(ctx, "This is not a UpdateMirrorMaker request. Please try again.");
329 // MirrorMaker whitelist and status should not be passed
330 else if (updateMirrorMaker.getUpdateMirrorMaker().getWhitelist() != null
331 || updateMirrorMaker.getUpdateMirrorMaker().getStatus() != null) {
332 sendErrResponse(ctx, "This is not a UpdateMirrorMaker request. Please try again.");
335 // if empty, blank name is entered
336 else if (StringUtils.isBlank(name)) {
337 sendErrResponse(ctx, "Name can not be empty or blank.");
340 // Check if the name contains only Alpha Numeric
341 else if (!isAlphaNumeric(name)) {
342 sendErrResponse(ctx, NAME_DOES_NOT_MEET_REQUIREMENT);
346 // Validate the IP and Port
347 else if (!StringUtils.isBlank(updateMirrorMaker.getUpdateMirrorMaker().getConsumer())
348 && !StringUtils.isBlank(updateMirrorMaker.getUpdateMirrorMaker().getProducer())
349 && !validateIPPort(updateMirrorMaker.getUpdateMirrorMaker().getConsumer())
350 || !validateIPPort(updateMirrorMaker.getUpdateMirrorMaker().getProducer())) {
351 sendErrResponse(ctx, INVALID_IPPORT);
354 // Set a random number as messageID, convert Json Object to
355 // InputStream and finally call publisher and subscriber
356 else if (isAlphaNumeric(name) && validateIPPort(updateMirrorMaker.getUpdateMirrorMaker().getConsumer())
357 && validateIPPort(updateMirrorMaker.getUpdateMirrorMaker().getProducer())) {
359 updateMirrorMaker.setMessageID(randomStr);
360 inStream = IOUtils.toInputStream(gson.toJson(updateMirrorMaker), UTF_8);
361 callPubSub(randomStr, ctx, inStream);
364 } catch (IOException e) {
366 LOGGER.error("IOException: ", e);
369 // Send error response if user does not provide Authorization
371 sendErrResponse(ctx, NO_ADMIN_PERMISSION);
376 @Produces("application/json")
378 public void callDeleteMirrorMaker(InputStream msg) throws CambriaApiException {
379 DMaaPContext ctx = getDmaapContext();
381 if (checkMirrorMakerPermission(ctx,
382 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERADMIN))) {
389 input = IOUtils.toString(msg, UTF_8);
391 if (input != null && input.length() > 0) {
392 input = removeExtraChar(input);
395 String randomStr = getRandomNum();
396 JSONObject jsonOb = null;
399 jsonOb = new JSONObject(input);
401 } catch (JSONException ex) {
403 sendErrResponse(ctx, errorMessages.getIncorrectJson());
404 LOGGER.error("JSONException: ", ex);
407 // Check if request has DeleteMirrorMaker and
408 // DeleteMirrorMaker has MirrorMaker object with name variable
409 // and check if the name contain only alpha numeric
410 if ((jsonOb != null) && (jsonOb.has("deleteMirrorMaker")
411 && jsonOb.getJSONObject("deleteMirrorMaker").length() == 1
412 && jsonOb.getJSONObject("deleteMirrorMaker").has("name")
413 && !StringUtils.isBlank(jsonOb.getJSONObject("deleteMirrorMaker").getString("name"))
414 && isAlphaNumeric(jsonOb.getJSONObject("deleteMirrorMaker").getString("name")))) {
416 jsonOb.put("messageID", randomStr);
417 InputStream inStream = null;
420 inStream = IOUtils.toInputStream(jsonOb.toString(), UTF_8);
422 } catch (IOException ioe) {
423 ioe.printStackTrace();
424 LOGGER.error("IOException: ", ioe);
427 callPubSub(randomStr, ctx, inStream);
431 sendErrResponse(ctx, "This is not a DeleteMirrorMaker request. Please try again.");
434 } catch (IOException ioe) {
436 ioe.printStackTrace();
437 LOGGER.error("IOException: ", ioe);
442 sendErrResponse(getDmaapContext(), NO_ADMIN_PERMISSION);
446 private boolean isListMirrorMaker(String msg, String messageID) {
447 String topicmsg = msg;
448 topicmsg = removeExtraChar(topicmsg);
452 boolean exist = false;
454 if (!StringUtils.isBlank(topicmsg) && topicmsg.length() > 2) {
455 jArray = new JSONArray(topicmsg);
457 for (int i = 0; i < jArray.length(); i++) {
458 jObj = jArray.getJSONObject(i);
460 JSONObject obj = new JSONObject();
461 if (jObj.has(MESSAGE)) {
462 obj = jObj.getJSONObject(MESSAGE);
464 if (obj.has("messageID") && obj.get("messageID").equals(messageID) && obj.has(LISTMIRRORMAKER)) {
473 private void loadProperty() {
475 this.timeout = Integer.parseInt(
476 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.timeout").trim());
477 this.topic = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.topic").trim();
478 this.consumergroup = AJSCPropertiesMap
479 .getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.consumergroup").trim();
480 this.consumerid = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.consumerid")
484 private String removeExtraChar(String message) {
485 String str = message;
486 str = checkJsonFormate(str);
488 if (str != null && str.length() > 0) {
489 str = str.replace("\\", "");
490 str = str.replace("\"{", "{");
491 str = str.replace("}\"", "}");
496 private String getRandomNum() {
497 long random = Math.round(Math.random() * 89999) + 10000;
498 String strLong = Long.toString(random);
502 private boolean isAlphaNumeric(String name) {
503 String pattern = "^[a-zA-Z0-9]*$";
504 if (name.matches(pattern)) {
510 // This method validate IPv4
511 private boolean validateIPPort(String ipPort) {
512 String pattern = "^([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\."
513 + "([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.([01]?\\d\\d?|2[0-4]\\d|25[0-5]):"
514 + "([1-9][0-9]{0,3}|[1-5][0-9]{4}|6[0-4][0-9]{3}|65[0-4][0-9]{2}|655[0-2][0-9]|6553[0-5])$";
515 if (ipPort.matches(pattern)) {
521 private String checkJsonFormate(String jsonStr) {
523 String json = jsonStr;
524 if (jsonStr != null && jsonStr.length() > 0 && jsonStr.startsWith("[") && !jsonStr.endsWith("]")) {
530 private boolean checkMirrorMakerPermission(DMaaPContext ctx, String permission) {
532 boolean hasPermission = false;
534 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
536 if (aaf.aafAuthentication(ctx.getRequest(), permission)) {
537 hasPermission = true;
539 return hasPermission;
542 private void callPubSub(String randomstr, DMaaPContext ctx, InputStream inStream) {
544 mirrorService.pushEvents(ctx, topic, inStream, null, null);
545 long startTime = System.currentTimeMillis();
546 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
548 while (!isListMirrorMaker(msgFrmSubscribe, randomstr)
549 && (System.currentTimeMillis() - startTime) < timeout) {
550 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
554 JSONObject finalJsonObj = new JSONObject();
557 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
558 && isListMirrorMaker(msgFrmSubscribe, randomstr)) {
559 msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
560 jsonArray = new JSONArray(msgFrmSubscribe);
562 for (int i = 0; i < jsonArray.length(); i++) {
563 jsonObj = jsonArray.getJSONObject(i);
565 JSONObject obj = new JSONObject();
566 if (jsonObj.has(MESSAGE)) {
567 obj = jsonObj.getJSONObject(MESSAGE);
569 if (obj.has("messageID") && obj.get("messageID").equals(randomstr) && obj.has(LISTMIRRORMAKER)) {
570 finalJsonObj.put(LISTMIRRORMAKER, obj.get(LISTMIRRORMAKER));
575 DMaaPResponseBuilder.respondOk(ctx, finalJsonObj);
579 JSONObject err = new JSONObject();
580 err.append(ERROR, "listMirrorMaker is not available, please make sure MirrorMakerAgent is running");
581 DMaaPResponseBuilder.respondOk(ctx, err);
584 } catch (Exception e) {
586 LOGGER.error("Exception: ", e);
590 private void sendErrResponse(DMaaPContext ctx, String errMsg) {
591 JSONObject err = new JSONObject();
592 err.append(ERROR, errMsg);
595 DMaaPResponseBuilder.respondOk(ctx, err);
596 LOGGER.error(errMsg);
598 } catch (JSONException | IOException e) {
599 LOGGER.error(errMsg);
603 @SuppressWarnings("unchecked")
605 @Produces("application/json")
606 @Path("/listallwhitelist")
607 public void listWhiteList(InputStream msg) {
609 DMaaPContext ctx = getDmaapContext();
610 if (checkMirrorMakerPermission(ctx,
611 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERUSER))) {
617 input = IOUtils.toString(msg, UTF_8);
619 if (input != null && input.length() > 0) {
620 input = removeExtraChar(input);
623 // Check if it is correct Json object
624 JSONObject jsonOb = null;
627 jsonOb = new JSONObject(input);
629 } catch (JSONException ex) {
631 sendErrResponse(ctx, errorMessages.getIncorrectJson());
632 LOGGER.error("JSONException: ", ex);
635 // Check if the request has name and name contains only alpha
637 // and check if the request has namespace and namespace contains
638 // only alpha numeric
640 && jsonOb.length() == 2 && jsonOb.has("name")
641 && !StringUtils.isBlank(jsonOb.getString("name"))
642 && isAlphaNumeric(jsonOb.getString("name")) && jsonOb.has(NAMESPACE)
643 && !StringUtils.isBlank(jsonOb.getString(NAMESPACE))) {
645 String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
646 "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString(NAMESPACE) + "|create";
648 // Check if the user have create permission for the
650 if (checkMirrorMakerPermission(ctx, permission)) {
652 JSONObject listAll = new JSONObject();
653 JSONObject emptyObject = new JSONObject();
655 // Create a listAllMirrorMaker Json object
657 listAll.put("listAllMirrorMaker", emptyObject);
659 } catch (JSONException e) {
661 LOGGER.error("JSONException: ", e);
664 // set a random number as messageID
665 String randomStr = getRandomNum();
666 listAll.put("messageID", randomStr);
667 InputStream inStream = null;
669 // convert listAll Json object to InputStream object
671 inStream = IOUtils.toInputStream(listAll.toString(), UTF_8);
673 } catch (IOException ioe) {
674 ioe.printStackTrace();
675 LOGGER.error("IOException: ", ioe);
677 // call listAllMirrorMaker
678 mirrorService.pushEvents(ctx, topic, inStream, null, null);
680 // subscribe for listMirrorMaker
681 long startTime = System.currentTimeMillis();
682 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
684 while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
685 && (System.currentTimeMillis() - startTime) < timeout) {
686 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
689 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
690 && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
692 JSONArray listMirrorMaker;
693 listMirrorMaker = getListMirrorMaker(msgFrmSubscribe, randomStr);
695 String whitelist = null;
696 for (int i = 0; i < listMirrorMaker.length(); i++) {
699 mm = listMirrorMaker.getJSONObject(i);
700 String name = mm.getString("name");
702 if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) {
703 whitelist = mm.getString("whitelist");
708 if (!StringUtils.isBlank(whitelist)) {
710 List<String> topicList = new ArrayList<>();
711 List<String> finalTopicList = new ArrayList<>();
712 topicList = Arrays.asList(whitelist.split(","));
714 for (String topic : topicList) {
715 if (topic != null && !topic.equals("null")
716 && getNamespace(topic).equals(jsonOb.getString(NAMESPACE))) {
718 finalTopicList.add(topic);
722 String topicNames = "";
724 if (!finalTopicList.isEmpty()) {
725 topicNames = StringUtils.join(finalTopicList, ",");
728 JSONObject listAllWhiteList = new JSONObject();
729 listAllWhiteList.put("name", jsonOb.getString("name"));
730 listAllWhiteList.put("whitelist", topicNames);
732 DMaaPResponseBuilder.respondOk(ctx, listAllWhiteList);
737 JSONObject err = new JSONObject();
739 "listWhiteList is not available, please make sure MirrorMakerAgent is running");
740 DMaaPResponseBuilder.respondOk(ctx, err);
744 sendErrResponse(ctx, NO_USER_CREATE_PERMISSION);
749 sendErrResponse(ctx, "This is not a ListAllWhitelist request. Please try again.");
752 } catch (IOException | CambriaApiException | ConfigDbException | AccessDeniedException
753 | TopicExistsException | missingReqdSetting | UnavailableException e) {
755 LOGGER.error("IOException: ", e);
758 sendErrResponse(ctx, NO_USER_PERMISSION);
762 @SuppressWarnings("unchecked")
764 @Produces("application/json")
765 @Path("/createwhitelist")
766 public void createWhiteList(InputStream msg) {
768 DMaaPContext ctx = getDmaapContext();
769 if (checkMirrorMakerPermission(ctx,
770 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERUSER))) {
776 input = IOUtils.toString(msg, UTF_8);
778 if (input != null && input.length() > 0) {
779 input = removeExtraChar(input);
782 // Check if it is correct Json object
783 JSONObject jsonOb = null;
786 jsonOb = new JSONObject(input);
788 } catch (JSONException ex) {
790 sendErrResponse(ctx, errorMessages.getIncorrectJson());
791 LOGGER.error("JSONException: ", ex);
794 // Check if the request has name and name contains only alpha numeric,
795 // check if the request has namespace and
796 // check if the request has whitelistTopicName
797 // check if the topic name contains only alpha numeric
798 if (jsonOb != null && jsonOb.length() == 3 && jsonOb.has("name")
799 && !StringUtils.isBlank(jsonOb.getString("name"))
800 && isAlphaNumeric(jsonOb.getString("name"))
801 && jsonOb.has(NAMESPACE) && !StringUtils.isBlank(jsonOb.getString(NAMESPACE))
802 && jsonOb.has("whitelistTopicName") && !StringUtils.isBlank(jsonOb.getString("whitelistTopicName"))
803 && isAlphaNumeric(jsonOb.getString("whitelistTopicName").substring(jsonOb.getString("whitelistTopicName").lastIndexOf(".")+1,
804 jsonOb.getString("whitelistTopicName").length()))) {
806 String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
807 "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString(NAMESPACE) + "|create";
809 // Check if the user have create permission for the
811 if (checkMirrorMakerPermission(ctx, permission)) {
813 JSONObject listAll = new JSONObject();
814 JSONObject emptyObject = new JSONObject();
816 // Create a listAllMirrorMaker Json object
818 listAll.put("listAllMirrorMaker", emptyObject);
820 } catch (JSONException e) {
822 LOGGER.error("JSONException: ", e);
825 // set a random number as messageID
826 String randomStr = getRandomNum();
827 listAll.put("messageID", randomStr);
828 InputStream inStream = null;
830 // convert listAll Json object to InputStream object
832 inStream = IOUtils.toInputStream(listAll.toString(), UTF_8);
834 } catch (IOException ioe) {
835 ioe.printStackTrace();
836 LOGGER.error("IOException: ", ioe);
838 // call listAllMirrorMaker
839 mirrorService.pushEvents(ctx, topic, inStream, null, null);
841 // subscribe for listMirrorMaker
842 long startTime = System.currentTimeMillis();
843 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
845 while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
846 && (System.currentTimeMillis() - startTime) < timeout) {
847 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
850 JSONArray listMirrorMaker;
852 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
853 && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
855 listMirrorMaker = getListMirrorMaker(msgFrmSubscribe, randomStr);
856 String whitelist = null;
858 for (int i = 0; i < listMirrorMaker.length(); i++) {
860 mm = listMirrorMaker.getJSONObject(i);
861 String name = mm.getString("name");
863 if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) {
864 whitelist = mm.getString("whitelist");
869 List<String> topicList = new ArrayList<>();
870 List<String> finalTopicList = new ArrayList<>();
872 if (whitelist != null) {
873 topicList = Arrays.asList(whitelist.split(","));
876 for (String st : topicList) {
877 if (!StringUtils.isBlank(st)) {
878 finalTopicList.add(st);
882 String newTopic = jsonOb.getString("whitelistTopicName");
884 if (!topicList.contains(newTopic)
885 && getNamespace(newTopic).equals(jsonOb.getString(NAMESPACE))) {
887 UpdateWhiteList updateWhiteList = new UpdateWhiteList();
888 MirrorMaker mirrorMaker = new MirrorMaker();
889 mirrorMaker.setName(jsonOb.getString("name"));
890 finalTopicList.add(newTopic);
891 String newWhitelist = "";
893 if (!finalTopicList.isEmpty()) {
894 newWhitelist = StringUtils.join(finalTopicList, ",");
897 mirrorMaker.setWhitelist(newWhitelist);
899 String newRandom = getRandomNum();
900 updateWhiteList.setMessageID(newRandom);
901 updateWhiteList.setUpdateWhiteList(mirrorMaker);
904 g.toJson(updateWhiteList);
905 InputStream inputStream;
906 inputStream = IOUtils.toInputStream(g.toJson(updateWhiteList), UTF_8);
907 // callPubSub(newRandom, ctx, inputStream);
908 callPubSubForWhitelist(newRandom, ctx, inputStream, jsonOb.getString(NAMESPACE));
910 } else if (topicList.contains(newTopic)) {
911 sendErrResponse(ctx, "The topic already exist.");
913 } else if (!getNamespace(newTopic).equals(jsonOb.getString(NAMESPACE))) {
915 "The namespace of the topic does not match with the namespace you provided.");
919 JSONObject err = new JSONObject();
921 "listWhiteList is not available, please make sure MirrorMakerAgent is running");
922 DMaaPResponseBuilder.respondOk(ctx, err);
926 sendErrResponse(ctx, NO_USER_CREATE_PERMISSION);
931 sendErrResponse(ctx, "This is not a createWhitelist request. Please try again.");
934 } catch (IOException | CambriaApiException | ConfigDbException | AccessDeniedException
935 | TopicExistsException | missingReqdSetting | UnavailableException e) {
937 LOGGER.error("IOException: ", e);
940 // Send error response if user does not provide Authorization
942 sendErrResponse(ctx, NO_USER_PERMISSION);
946 @SuppressWarnings("unchecked")
948 @Produces("application/json")
949 @Path("/deletewhitelist")
950 public void deleteWhiteList(InputStream msg) {
952 DMaaPContext ctx = getDmaapContext();
953 if (checkMirrorMakerPermission(ctx,
954 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERUSER))) {
960 input = IOUtils.toString(msg, UTF_8);
962 if (input != null && input.length() > 0) {
963 input = removeExtraChar(input);
966 // Check if it is correct Json object
967 JSONObject jsonOb = null;
970 jsonOb = new JSONObject(input);
972 } catch (JSONException ex) {
974 sendErrResponse(ctx, errorMessages.getIncorrectJson());
975 LOGGER.error("JSONException: ", ex);
978 // Check if the request has name and name contains only alpha numeric,
979 // check if the request has namespace and
980 // check if the request has whitelistTopicName
981 if (jsonOb != null && jsonOb.length() == 3 && jsonOb.has("name") && isAlphaNumeric(jsonOb.getString("name"))
982 && jsonOb.has(NAMESPACE) && jsonOb.has("whitelistTopicName")
983 && isAlphaNumeric(jsonOb.getString("whitelistTopicName").substring(jsonOb.getString("whitelistTopicName").lastIndexOf(".")+1,
984 jsonOb.getString("whitelistTopicName").length()))) {
986 String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
987 "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString(NAMESPACE) + "|create";
989 // Check if the user have create permission for the
991 if (checkMirrorMakerPermission(ctx, permission)) {
993 JSONObject listAll = new JSONObject();
994 JSONObject emptyObject = new JSONObject();
996 // Create a listAllMirrorMaker Json object
998 listAll.put("listAllMirrorMaker", emptyObject);
1000 } catch (JSONException e) {
1002 LOGGER.error("JSONException: ", e);
1005 // set a random number as messageID
1006 String randomStr = getRandomNum();
1007 listAll.put("messageID", randomStr);
1008 InputStream inStream = null;
1010 // convert listAll Json object to InputStream object
1012 inStream = IOUtils.toInputStream(listAll.toString(), UTF_8);
1014 } catch (IOException ioe) {
1015 ioe.printStackTrace();
1016 LOGGER.error("IOException: ", ioe);
1018 // call listAllMirrorMaker
1019 mirrorService.pushEvents(ctx, topic, inStream, null, null);
1021 // subscribe for listMirrorMaker
1022 long startTime = System.currentTimeMillis();
1023 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1025 while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
1026 && (System.currentTimeMillis() - startTime) < timeout) {
1027 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1031 JSONArray jsonArray;
1032 JSONArray listMirrorMaker = null;
1034 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
1035 && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
1036 msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
1037 jsonArray = new JSONArray(msgFrmSubscribe);
1039 for (int i = 0; i < jsonArray.length(); i++) {
1040 jsonObj = jsonArray.getJSONObject(i);
1042 JSONObject obj = new JSONObject();
1043 if (jsonObj.has(MESSAGE)) {
1044 obj = jsonObj.getJSONObject(MESSAGE);
1046 if (obj.has("messageID") && obj.get("messageID").equals(randomStr) && obj.has(LISTMIRRORMAKER)) {
1047 listMirrorMaker = obj.getJSONArray(LISTMIRRORMAKER);
1051 String whitelist = null;
1052 if (listMirrorMaker != null) {
1053 for (int i = 0; i < listMirrorMaker.length(); i++) {
1055 JSONObject mm = new JSONObject();
1056 mm = listMirrorMaker.getJSONObject(i);
1057 String name = mm.getString("name");
1059 if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) {
1060 whitelist = mm.getString("whitelist");
1066 List<String> topicList = new ArrayList<>();
1068 if (whitelist != null) {
1069 topicList = Arrays.asList(whitelist.split(","));
1071 boolean removeTopic = false;
1072 String topicToRemove = jsonOb.getString("whitelistTopicName");
1074 if (topicList.contains(topicToRemove)) {
1077 sendErrResponse(ctx, "The topic does not exist.");
1082 UpdateWhiteList updateWhiteList = new UpdateWhiteList();
1083 MirrorMaker mirrorMaker = new MirrorMaker();
1085 mirrorMaker.setName(jsonOb.getString("name"));
1086 mirrorMaker.setWhitelist(removeTopic(whitelist, topicToRemove));
1088 String newRandom = getRandomNum();
1090 updateWhiteList.setMessageID(newRandom);
1091 updateWhiteList.setUpdateWhiteList(mirrorMaker);
1093 Gson g = new Gson();
1094 g.toJson(updateWhiteList);
1096 InputStream inputStream;
1097 inputStream = IOUtils.toInputStream(g.toJson(updateWhiteList), UTF_8);
1098 callPubSubForWhitelist(newRandom, ctx, inputStream, getNamespace(topicToRemove));
1103 JSONObject err = new JSONObject();
1105 "listWhiteList is not available, please make sure MirrorMakerAgent is running");
1106 DMaaPResponseBuilder.respondOk(ctx, err);
1110 sendErrResponse(ctx, NO_USER_CREATE_PERMISSION);
1115 sendErrResponse(ctx, "This is not a DeleteAllWhitelist request. Please try again.");
1118 } catch (IOException | CambriaApiException | ConfigDbException | AccessDeniedException
1119 | TopicExistsException | missingReqdSetting | UnavailableException e) {
1121 LOGGER.error("IOException: ", e);
1124 // Send error response if user does not provide Authorization
1126 sendErrResponse(ctx, NO_USER_PERMISSION);
1130 private String getNamespace(String topic) {
1131 return topic.substring(0, topic.lastIndexOf("."));
1134 private String removeTopic(String whitelist, String topicToRemove) {
1135 List<String> topicList = new ArrayList<>();
1136 List<String> newTopicList = new ArrayList<>();
1138 if (whitelist.contains(",")) {
1139 topicList = Arrays.asList(whitelist.split(","));
1143 if (topicList.contains(topicToRemove)) {
1144 for (String topic: topicList) {
1145 if (!topic.equals(topicToRemove)) {
1146 newTopicList.add(topic);
1151 String newWhitelist = StringUtils.join(newTopicList, ",");
1153 return newWhitelist;
1156 private void callPubSubForWhitelist(String randomStr, DMaaPContext ctx, InputStream inStream, String namespace) {
1159 mirrorService.pushEvents(ctx, topic, inStream, null, null);
1160 long startTime = System.currentTimeMillis();
1161 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1163 while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
1164 && (System.currentTimeMillis() - startTime) < timeout) {
1165 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1169 JSONArray jsonArray;
1170 JSONArray jsonArrayNamespace = null;
1172 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
1173 && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
1174 msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
1175 jsonArray = new JSONArray(msgFrmSubscribe);
1177 for (int i = 0; i < jsonArray.length(); i++) {
1178 jsonObj = jsonArray.getJSONObject(i);
1180 JSONObject obj = new JSONObject();
1181 if (jsonObj.has(MESSAGE)) {
1182 obj = jsonObj.getJSONObject(MESSAGE);
1184 if (obj.has("messageID") && obj.get("messageID").equals(randomStr) && obj.has(LISTMIRRORMAKER)) {
1185 jsonArrayNamespace = obj.getJSONArray(LISTMIRRORMAKER);
1188 JSONObject finalJasonObj = new JSONObject();
1189 JSONArray finalJsonArray = new JSONArray();
1191 for (int i = 0; i < jsonArrayNamespace.length(); i++) {
1194 mmObj = jsonArrayNamespace.getJSONObject(i);
1197 if (mmObj.has("whitelist")) {
1198 whitelist = getWhitelistByNamespace(mmObj.getString("whitelist"), namespace);
1200 if (whitelist != null) {
1201 mmObj.remove("whitelist");
1202 mmObj.put("whitelist", whitelist);
1204 mmObj.remove("whitelist");
1207 finalJsonArray.put(mmObj);
1209 finalJasonObj.put(LISTMIRRORMAKER, finalJsonArray);
1211 DMaaPResponseBuilder.respondOk(ctx, finalJasonObj);
1215 JSONObject err = new JSONObject();
1216 err.append(ERROR, "listMirrorMaker is not available, please make sure MirrorMakerAgent is running");
1217 DMaaPResponseBuilder.respondOk(ctx, err);
1220 } catch (Exception e) {
1221 LOGGER.error("Exception: ", e);
1225 private String getWhitelistByNamespace(String originalWhitelist, String namespace) {
1227 String whitelist = null;
1228 List<String> resultList = new ArrayList<>();
1229 List<String> whitelistList = new ArrayList<>();
1230 whitelistList = Arrays.asList(originalWhitelist.split(","));
1232 for (String topic : whitelistList) {
1233 if (StringUtils.isNotBlank(originalWhitelist) && getNamespace(topic).equals(namespace)) {
1234 resultList.add(topic);
1237 if (!resultList.isEmpty()) {
1238 whitelist = StringUtils.join(resultList, ",");
1244 private JSONArray getListMirrorMaker(String msgFrmSubscribe, String randomStr) {
1246 JSONArray jsonArray;
1247 JSONArray listMirrorMaker = new JSONArray();
1249 msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
1250 jsonArray = new JSONArray(msgFrmSubscribe);
1252 for (int i = 0; i < jsonArray.length(); i++) {
1253 jsonObj = jsonArray.getJSONObject(i);
1255 JSONObject obj = new JSONObject();
1256 if (jsonObj.has(MESSAGE)) {
1257 obj = jsonObj.getJSONObject(MESSAGE);
1259 if (obj.has("messageID") && obj.get("messageID").equals(randomStr) && obj.has(LISTMIRRORMAKER)) {
1260 listMirrorMaker = obj.getJSONArray(LISTMIRRORMAKER);
1264 return listMirrorMaker;