1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.nsa.dmaap.service;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.util.ArrayList;
27 import java.util.Date;
28 import java.util.List;
31 import javax.servlet.http.HttpServletRequest;
32 import javax.servlet.http.HttpServletResponse;
33 import javax.ws.rs.POST;
34 import javax.ws.rs.Path;
35 import javax.ws.rs.Produces;
36 import javax.ws.rs.core.Context;
37 import org.json.JSONObject;
38 import org.apache.commons.io.IOUtils;
39 import org.apache.commons.lang.StringUtils;
40 import com.att.eelf.configuration.EELFLogger;
41 import com.att.eelf.configuration.EELFManager;
42 import org.springframework.beans.factory.annotation.Autowired;
43 import org.springframework.beans.factory.annotation.Qualifier;
44 import org.springframework.stereotype.Component;
46 import com.att.nsa.cambria.utils.ConfigurationReader;
47 import com.att.nsa.cambria.utils.DMaaPResponseBuilder;
48 import com.att.nsa.cambria.utils.Utils;
49 import com.att.nsa.configs.ConfigDbException;
50 import com.att.nsa.dmaap.mmagent.*;
51 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
52 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
53 import com.google.gson.Gson;
54 import com.google.gson.JsonSyntaxException;
56 import edu.emory.mathcs.backport.java.util.Arrays;
58 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
59 import com.att.nsa.cambria.CambriaApiException;
60 import com.att.nsa.cambria.backends.ConsumerFactory.UnavailableException;
62 import org.json.JSONArray;
63 import org.json.JSONException;
64 import com.att.nsa.cambria.beans.DMaaPContext;
65 import com.att.nsa.cambria.constants.CambriaConstants;
66 import com.att.nsa.cambria.exception.DMaaPErrorMessages;
67 import com.att.nsa.cambria.metabroker.Broker.TopicExistsException;
68 import com.att.nsa.cambria.security.DMaaPAAFAuthenticator;
69 import com.att.nsa.cambria.security.DMaaPAAFAuthenticatorImpl;
70 import com.att.nsa.cambria.service.MMService;
73 * Rest Service class for Mirror Maker proxy Rest Services
75 * @author <a href="mailto:"></a>
81 public class MMRestService {
83 private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(MMRestService.class);
84 private static final String NO_ADMIN_PERMISSION = "No Mirror Maker Admin permission.";
85 private static final String NO_USER_PERMISSION = "No Mirror Maker User permission.";
86 private static final String NO_USER_CREATE_PERMISSION = "No Mirror Maker User Create permission.";
87 private static final String NAME_DOES_NOT_MEET_REQUIREMENT = "Mirror Maker name can only contain alpha numeric";
88 private static final String INVALID_IPPORT = "This is not a valid IP:Port";
89 private static final String MIRROR_MAKERADMIN = "msgRtr.mirrormakeradmin.aaf";
90 private static final String MIRROR_MAKERUSER = "msgRtr.mirrormakeruser.aaf";
91 private static final String UTF_8 = "UTF-8";
92 private static final String MESSAGE = "message";
93 private static final String LISTMIRRORMAKER = "listMirrorMaker";
94 private static final String ERROR = "error";
95 private static final String NAMESPACE = "namespace";
99 private String consumergroup;
100 private String consumerid;
103 @Qualifier("configurationReader")
104 private ConfigurationReader configReader;
107 private HttpServletRequest request;
110 private HttpServletResponse response;
113 private MMService mirrorService;
116 private DMaaPErrorMessages errorMessages;
119 * This method is used for taking Configuration Object,HttpServletRequest
120 * Object,HttpServletRequest HttpServletResponse Object,HttpServletSession
123 * @return DMaaPContext object from where user can get Configuration
124 * Object,HttpServlet Object
127 private DMaaPContext getDmaapContext() {
128 DMaaPContext dmaapContext = new DMaaPContext();
129 dmaapContext.setRequest(request);
130 dmaapContext.setResponse(response);
131 dmaapContext.setConfigReader(configReader);
132 dmaapContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
138 @Produces("application/json")
140 public void callCreateMirrorMaker(InputStream msg) {
142 DMaaPContext ctx = getDmaapContext();
143 if (checkMirrorMakerPermission(ctx,
144 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERADMIN))) {
148 String randomStr = getRandomNum();
150 InputStream inStream = null;
151 Gson gson = new Gson();
152 CreateMirrorMaker createMirrorMaker = new CreateMirrorMaker();
155 input = IOUtils.toString(msg, UTF_8);
157 if (input != null && input.length() > 0) {
158 input = removeExtraChar(input);
161 // Check if the request has CreateMirrorMaker
163 createMirrorMaker = gson.fromJson(input, CreateMirrorMaker.class);
165 } catch (JsonSyntaxException ex) {
167 sendErrResponse(ctx, errorMessages.getIncorrectJson());
168 LOGGER.error("JsonSyntaxException: ", ex);
170 String name = createMirrorMaker.getCreateMirrorMaker().getName();
171 // send error message if it is not a CreateMirrorMaker request.
172 if (createMirrorMaker.getCreateMirrorMaker() == null) {
173 sendErrResponse(ctx, "This is not a CreateMirrorMaker request. Please try again.");
176 // MirrorMaker whitelist and status should not be passed
177 else if (createMirrorMaker.getCreateMirrorMaker().getWhitelist() != null
178 || createMirrorMaker.getCreateMirrorMaker().getStatus() != null) {
179 sendErrResponse(ctx, "This is not a CreateMirrorMaker request. Please try again.");
182 // if empty, blank name is entered
183 else if (StringUtils.isBlank(name)) {
184 sendErrResponse(ctx, "Name can not be empty or blank.");
187 // Check if the name contains only Alpha Numeric
188 else if (!isAlphaNumeric(name)) {
189 sendErrResponse(ctx, NAME_DOES_NOT_MEET_REQUIREMENT);
193 // Validate the IP and Port
194 else if (!StringUtils.isBlank(createMirrorMaker.getCreateMirrorMaker().getConsumer())
195 && !StringUtils.isBlank(createMirrorMaker.getCreateMirrorMaker().getProducer())
196 && !validateIPPort(createMirrorMaker.getCreateMirrorMaker().getConsumer())
197 || !validateIPPort(createMirrorMaker.getCreateMirrorMaker().getProducer())) {
198 sendErrResponse(ctx, INVALID_IPPORT);
201 // Set a random number as messageID, convert Json Object to
202 // InputStream and finally call publisher and subscriber
203 else if (isAlphaNumeric(name) && validateIPPort(createMirrorMaker.getCreateMirrorMaker().getConsumer())
204 && validateIPPort(createMirrorMaker.getCreateMirrorMaker().getProducer())) {
206 createMirrorMaker.setMessageID(randomStr);
207 inStream = IOUtils.toInputStream(gson.toJson(createMirrorMaker), UTF_8);
208 callPubSub(randomStr, ctx, inStream);
211 } catch (IOException e) {
213 LOGGER.error("IOException: ", e);
216 // Send error response if user does not provide Authorization
218 sendErrResponse(ctx, NO_ADMIN_PERMISSION);
223 @Produces("application/json")
225 public void callListAllMirrorMaker(InputStream msg) throws CambriaApiException {
226 DMaaPContext ctx = getDmaapContext();
228 if (checkMirrorMakerPermission(ctx,
229 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERADMIN))) {
236 input = IOUtils.toString(msg, UTF_8);
238 if (input != null && input.length() > 0) {
239 input = removeExtraChar(input);
242 String randomStr = getRandomNum();
243 JSONObject jsonOb = null;
246 jsonOb = new JSONObject(input);
248 } catch (JSONException ex) {
250 sendErrResponse(ctx, errorMessages.getIncorrectJson());
251 LOGGER.error("JSONException: ", ex);
254 // Check if request has listAllMirrorMaker and
255 // listAllMirrorMaker is empty
256 if ((jsonOb != null) && (jsonOb.has("listAllMirrorMaker") &&
257 jsonOb.getJSONObject("listAllMirrorMaker").length() == 0)) {
258 jsonOb.put("messageID", randomStr);
259 InputStream inStream = null;
262 inStream = IOUtils.toInputStream(jsonOb.toString(), UTF_8);
264 } catch (IOException ioe) {
265 LOGGER.error("IOException: ", ioe);
268 callPubSub(randomStr, ctx, inStream);
272 sendErrResponse(ctx, "This is not a ListAllMirrorMaker request. Please try again.");
275 } catch (IOException ioe) {
277 LOGGER.error("IOException: ", ioe);
282 sendErrResponse(getDmaapContext(), NO_ADMIN_PERMISSION);
287 @Produces("application/json")
289 public void callUpdateMirrorMaker(InputStream msg) throws CambriaApiException {
291 DMaaPContext ctx = getDmaapContext();
292 if (checkMirrorMakerPermission(ctx,
293 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERADMIN))) {
297 String randomStr = getRandomNum();
299 InputStream inStream = null;
300 Gson gson = new Gson();
301 UpdateMirrorMaker updateMirrorMaker = new UpdateMirrorMaker();
304 input = IOUtils.toString(msg, UTF_8);
306 if (input != null && input.length() > 0) {
307 input = removeExtraChar(input);
310 // Check if the request has UpdateMirrorMaker
312 updateMirrorMaker = gson.fromJson(input, UpdateMirrorMaker.class);
314 } catch (JsonSyntaxException ex) {
316 sendErrResponse(ctx, errorMessages.getIncorrectJson());
317 LOGGER.error("JsonSyntaxException: ", ex);
320 String name = updateMirrorMaker.getUpdateMirrorMaker().getName();
322 // send error message if it is not a UpdateMirrorMaker request.
323 if (updateMirrorMaker.getUpdateMirrorMaker() == null) {
324 sendErrResponse(ctx, "This is not a UpdateMirrorMaker request. Please try again.");
327 // MirrorMaker whitelist and status should not be passed
328 else if (updateMirrorMaker.getUpdateMirrorMaker().getWhitelist() != null
329 || updateMirrorMaker.getUpdateMirrorMaker().getStatus() != null) {
330 sendErrResponse(ctx, "This is not a UpdateMirrorMaker request. Please try again.");
333 // if empty, blank name is entered
334 else if (StringUtils.isBlank(name)) {
335 sendErrResponse(ctx, "Name can not be empty or blank.");
338 // Check if the name contains only Alpha Numeric
339 else if (!isAlphaNumeric(name)) {
340 sendErrResponse(ctx, NAME_DOES_NOT_MEET_REQUIREMENT);
344 // Validate the IP and Port
345 else if (!StringUtils.isBlank(updateMirrorMaker.getUpdateMirrorMaker().getConsumer())
346 && !StringUtils.isBlank(updateMirrorMaker.getUpdateMirrorMaker().getProducer())
347 && !validateIPPort(updateMirrorMaker.getUpdateMirrorMaker().getConsumer())
348 || !validateIPPort(updateMirrorMaker.getUpdateMirrorMaker().getProducer())) {
349 sendErrResponse(ctx, INVALID_IPPORT);
352 // Set a random number as messageID, convert Json Object to
353 // InputStream and finally call publisher and subscriber
354 else if (isAlphaNumeric(name) && validateIPPort(updateMirrorMaker.getUpdateMirrorMaker().getConsumer())
355 && validateIPPort(updateMirrorMaker.getUpdateMirrorMaker().getProducer())) {
357 updateMirrorMaker.setMessageID(randomStr);
358 inStream = IOUtils.toInputStream(gson.toJson(updateMirrorMaker), UTF_8);
359 callPubSub(randomStr, ctx, inStream);
362 } catch (IOException e) {
364 LOGGER.error("IOException: ", e);
367 // Send error response if user does not provide Authorization
369 sendErrResponse(ctx, NO_ADMIN_PERMISSION);
374 @Produces("application/json")
376 public void callDeleteMirrorMaker(InputStream msg) throws CambriaApiException {
377 DMaaPContext ctx = getDmaapContext();
379 if (checkMirrorMakerPermission(ctx,
380 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERADMIN))) {
387 input = IOUtils.toString(msg, UTF_8);
389 if (input != null && input.length() > 0) {
390 input = removeExtraChar(input);
393 String randomStr = getRandomNum();
394 JSONObject jsonOb = null;
397 jsonOb = new JSONObject(input);
399 } catch (JSONException ex) {
401 sendErrResponse(ctx, errorMessages.getIncorrectJson());
402 LOGGER.error("JSONException: ", ex);
405 // Check if request has DeleteMirrorMaker and
406 // DeleteMirrorMaker has MirrorMaker object with name variable
407 // and check if the name contain only alpha numeric
408 if ((jsonOb != null) && (jsonOb.has("deleteMirrorMaker")
409 && jsonOb.getJSONObject("deleteMirrorMaker").length() == 1
410 && jsonOb.getJSONObject("deleteMirrorMaker").has("name")
411 && !StringUtils.isBlank(jsonOb.getJSONObject("deleteMirrorMaker").getString("name"))
412 && isAlphaNumeric(jsonOb.getJSONObject("deleteMirrorMaker").getString("name")))) {
414 jsonOb.put("messageID", randomStr);
415 InputStream inStream = null;
418 inStream = IOUtils.toInputStream(jsonOb.toString(), UTF_8);
420 } catch (IOException ioe) {
421 LOGGER.error("IOException: ", ioe);
424 callPubSub(randomStr, ctx, inStream);
428 sendErrResponse(ctx, "This is not a DeleteMirrorMaker request. Please try again.");
431 } catch (IOException ioe) {
433 ioe.printStackTrace();
434 LOGGER.error("IOException: ", ioe);
439 sendErrResponse(getDmaapContext(), NO_ADMIN_PERMISSION);
443 private boolean isListMirrorMaker(String msg, String messageID) {
444 String topicmsg = msg;
445 topicmsg = removeExtraChar(topicmsg);
449 boolean exist = false;
451 if (!StringUtils.isBlank(topicmsg) && topicmsg.length() > 2) {
452 jArray = new JSONArray(topicmsg);
454 for (int i = 0; i < jArray.length(); i++) {
455 jObj = jArray.getJSONObject(i);
457 JSONObject obj = new JSONObject();
458 if (jObj.has(MESSAGE)) {
459 obj = jObj.getJSONObject(MESSAGE);
461 if (obj.has("messageID") && obj.get("messageID").equals(messageID) && obj.has(LISTMIRRORMAKER)) {
470 private void loadProperty() {
472 this.timeout = Integer.parseInt(
473 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.timeout").trim());
474 this.topic = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.topic").trim();
475 this.consumergroup = AJSCPropertiesMap
476 .getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.consumergroup").trim();
477 this.consumerid = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.consumerid")
481 private String removeExtraChar(String message) {
482 String str = message;
483 str = checkJsonFormate(str);
485 if (str != null && str.length() > 0) {
486 str = str.replace("\\", "");
487 str = str.replace("\"{", "{");
488 str = str.replace("}\"", "}");
493 private String getRandomNum() {
494 long random = Math.round(Math.random() * 89999) + 10000;
495 String strLong = Long.toString(random);
499 private boolean isAlphaNumeric(String name) {
500 String pattern = "^[a-zA-Z0-9]*$";
501 if (name.matches(pattern)) {
507 // This method validate IPv4
508 private boolean validateIPPort(String ipPort) {
509 String pattern = "^([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\."
510 + "([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.([01]?\\d\\d?|2[0-4]\\d|25[0-5]):"
511 + "([1-9][0-9]{0,3}|[1-5][0-9]{4}|6[0-4][0-9]{3}|65[0-4][0-9]{2}|655[0-2][0-9]|6553[0-5])$";
512 if (ipPort.matches(pattern)) {
518 private String checkJsonFormate(String jsonStr) {
520 String json = jsonStr;
521 if (jsonStr != null && jsonStr.length() > 0 && jsonStr.startsWith("[") && !jsonStr.endsWith("]")) {
527 private boolean checkMirrorMakerPermission(DMaaPContext ctx, String permission) {
529 boolean hasPermission = false;
531 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
533 if (aaf.aafAuthentication(ctx.getRequest(), permission)) {
534 hasPermission = true;
536 return hasPermission;
539 private void callPubSub(String randomstr, DMaaPContext ctx, InputStream inStream) {
541 mirrorService.pushEvents(ctx, topic, inStream, null, null);
542 long startTime = System.currentTimeMillis();
543 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
545 while (!isListMirrorMaker(msgFrmSubscribe, randomstr)
546 && (System.currentTimeMillis() - startTime) < timeout) {
547 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
551 JSONObject finalJsonObj = new JSONObject();
554 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
555 && isListMirrorMaker(msgFrmSubscribe, randomstr)) {
556 msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
557 jsonArray = new JSONArray(msgFrmSubscribe);
559 for (int i = 0; i < jsonArray.length(); i++) {
560 jsonObj = jsonArray.getJSONObject(i);
562 JSONObject obj = new JSONObject();
563 if (jsonObj.has(MESSAGE)) {
564 obj = jsonObj.getJSONObject(MESSAGE);
566 if (obj.has("messageID") && obj.get("messageID").equals(randomstr) && obj.has(LISTMIRRORMAKER)) {
567 finalJsonObj.put(LISTMIRRORMAKER, obj.get(LISTMIRRORMAKER));
572 DMaaPResponseBuilder.respondOk(ctx, finalJsonObj);
576 JSONObject err = new JSONObject();
577 err.append(ERROR, "listMirrorMaker is not available, please make sure MirrorMakerAgent is running");
578 DMaaPResponseBuilder.respondOk(ctx, err);
581 } catch (Exception e) {
583 LOGGER.error("Exception: ", e);
587 private void sendErrResponse(DMaaPContext ctx, String errMsg) {
588 JSONObject err = new JSONObject();
589 err.append(ERROR, errMsg);
592 DMaaPResponseBuilder.respondOk(ctx, err);
593 LOGGER.error(errMsg);
595 } catch (JSONException | IOException e) {
596 LOGGER.error(errMsg);
600 @SuppressWarnings("unchecked")
602 @Produces("application/json")
603 @Path("/listallwhitelist")
604 public void listWhiteList(InputStream msg) {
606 DMaaPContext ctx = getDmaapContext();
607 if (checkMirrorMakerPermission(ctx,
608 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERUSER))) {
614 input = IOUtils.toString(msg, UTF_8);
616 if (input != null && input.length() > 0) {
617 input = removeExtraChar(input);
620 // Check if it is correct Json object
621 JSONObject jsonOb = null;
624 jsonOb = new JSONObject(input);
626 } catch (JSONException ex) {
628 sendErrResponse(ctx, errorMessages.getIncorrectJson());
629 LOGGER.error("JSONException: ", ex);
632 // Check if the request has name and name contains only alpha
634 // and check if the request has namespace and namespace contains
635 // only alpha numeric
637 && jsonOb.length() == 2 && jsonOb.has("name")
638 && !StringUtils.isBlank(jsonOb.getString("name"))
639 && isAlphaNumeric(jsonOb.getString("name")) && jsonOb.has(NAMESPACE)
640 && !StringUtils.isBlank(jsonOb.getString(NAMESPACE))) {
642 String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
643 "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString(NAMESPACE) + "|create";
645 // Check if the user have create permission for the
647 if (checkMirrorMakerPermission(ctx, permission)) {
649 JSONObject listAll = new JSONObject();
650 JSONObject emptyObject = new JSONObject();
652 // Create a listAllMirrorMaker Json object
654 listAll.put("listAllMirrorMaker", emptyObject);
656 } catch (JSONException e) {
658 LOGGER.error("JSONException: ", e);
661 // set a random number as messageID
662 String randomStr = getRandomNum();
663 listAll.put("messageID", randomStr);
664 InputStream inStream = null;
666 // convert listAll Json object to InputStream object
668 inStream = IOUtils.toInputStream(listAll.toString(), UTF_8);
670 } catch (IOException ioe) {
671 ioe.printStackTrace();
672 LOGGER.error("IOException: ", ioe);
674 // call listAllMirrorMaker
675 mirrorService.pushEvents(ctx, topic, inStream, null, null);
677 // subscribe for listMirrorMaker
678 long startTime = System.currentTimeMillis();
679 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
681 while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
682 && (System.currentTimeMillis() - startTime) < timeout) {
683 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
686 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
687 && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
689 JSONArray listMirrorMaker;
690 listMirrorMaker = getListMirrorMaker(msgFrmSubscribe, randomStr);
692 String whitelist = null;
693 for (int i = 0; i < listMirrorMaker.length(); i++) {
696 mm = listMirrorMaker.getJSONObject(i);
697 String name = mm.getString("name");
699 if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) {
700 whitelist = mm.getString("whitelist");
705 if (!StringUtils.isBlank(whitelist)) {
707 List<String> topicList = new ArrayList<>();
708 List<String> finalTopicList = new ArrayList<>();
709 topicList = Arrays.asList(whitelist.split(","));
711 for (String topic : topicList) {
712 if (topic != null && !topic.equals("null")
713 && getNamespace(topic).equals(jsonOb.getString(NAMESPACE))) {
715 finalTopicList.add(topic);
719 String topicNames = "";
721 if (!finalTopicList.isEmpty()) {
722 topicNames = StringUtils.join(finalTopicList, ",");
725 JSONObject listAllWhiteList = new JSONObject();
726 listAllWhiteList.put("name", jsonOb.getString("name"));
727 listAllWhiteList.put("whitelist", topicNames);
729 DMaaPResponseBuilder.respondOk(ctx, listAllWhiteList);
734 JSONObject err = new JSONObject();
736 "listWhiteList is not available, please make sure MirrorMakerAgent is running");
737 DMaaPResponseBuilder.respondOk(ctx, err);
741 sendErrResponse(ctx, NO_USER_CREATE_PERMISSION);
746 sendErrResponse(ctx, "This is not a ListAllWhitelist request. Please try again.");
749 } catch (IOException | CambriaApiException | ConfigDbException | AccessDeniedException
750 | TopicExistsException | missingReqdSetting | UnavailableException e) {
752 LOGGER.error("IOException: ", e);
755 sendErrResponse(ctx, NO_USER_PERMISSION);
759 @SuppressWarnings("unchecked")
761 @Produces("application/json")
762 @Path("/createwhitelist")
763 public void createWhiteList(InputStream msg) {
765 DMaaPContext ctx = getDmaapContext();
766 if (checkMirrorMakerPermission(ctx,
767 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERUSER))) {
773 input = IOUtils.toString(msg, UTF_8);
775 if (input != null && input.length() > 0) {
776 input = removeExtraChar(input);
779 // Check if it is correct Json object
780 JSONObject jsonOb = null;
783 jsonOb = new JSONObject(input);
785 } catch (JSONException ex) {
787 sendErrResponse(ctx, errorMessages.getIncorrectJson());
788 LOGGER.error("JSONException: ", ex);
791 // Check if the request has name and name contains only alpha numeric,
792 // check if the request has namespace and
793 // check if the request has whitelistTopicName
794 // check if the topic name contains only alpha numeric
795 if (jsonOb != null && jsonOb.length() == 3 && jsonOb.has("name")
796 && !StringUtils.isBlank(jsonOb.getString("name"))
797 && isAlphaNumeric(jsonOb.getString("name"))
798 && jsonOb.has(NAMESPACE) && !StringUtils.isBlank(jsonOb.getString(NAMESPACE))
799 && jsonOb.has("whitelistTopicName") && !StringUtils.isBlank(jsonOb.getString("whitelistTopicName"))
800 && isAlphaNumeric(jsonOb.getString("whitelistTopicName").substring(jsonOb.getString("whitelistTopicName").lastIndexOf(".")+1,
801 jsonOb.getString("whitelistTopicName").length()))) {
803 String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
804 "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString(NAMESPACE) + "|create";
806 // Check if the user have create permission for the
808 if (checkMirrorMakerPermission(ctx, permission)) {
810 JSONObject listAll = new JSONObject();
811 JSONObject emptyObject = new JSONObject();
813 // Create a listAllMirrorMaker Json object
815 listAll.put("listAllMirrorMaker", emptyObject);
817 } catch (JSONException e) {
819 LOGGER.error("JSONException: ", e);
822 // set a random number as messageID
823 String randomStr = getRandomNum();
824 listAll.put("messageID", randomStr);
825 InputStream inStream = null;
827 // convert listAll Json object to InputStream object
829 inStream = IOUtils.toInputStream(listAll.toString(), UTF_8);
831 } catch (IOException ioe) {
832 ioe.printStackTrace();
833 LOGGER.error("IOException: ", ioe);
835 // call listAllMirrorMaker
836 mirrorService.pushEvents(ctx, topic, inStream, null, null);
838 // subscribe for listMirrorMaker
839 long startTime = System.currentTimeMillis();
840 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
842 while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
843 && (System.currentTimeMillis() - startTime) < timeout) {
844 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
847 JSONArray listMirrorMaker;
849 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
850 && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
852 listMirrorMaker = getListMirrorMaker(msgFrmSubscribe, randomStr);
853 String whitelist = null;
855 for (int i = 0; i < listMirrorMaker.length(); i++) {
857 mm = listMirrorMaker.getJSONObject(i);
858 String name = mm.getString("name");
860 if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) {
861 whitelist = mm.getString("whitelist");
866 List<String> topicList = new ArrayList<>();
867 List<String> finalTopicList = new ArrayList<>();
869 if (whitelist != null) {
870 topicList = Arrays.asList(whitelist.split(","));
873 for (String st : topicList) {
874 if (!StringUtils.isBlank(st)) {
875 finalTopicList.add(st);
879 String newTopic = jsonOb.getString("whitelistTopicName");
881 if (!topicList.contains(newTopic)
882 && getNamespace(newTopic).equals(jsonOb.getString(NAMESPACE))) {
884 UpdateWhiteList updateWhiteList = new UpdateWhiteList();
885 MirrorMaker mirrorMaker = new MirrorMaker();
886 mirrorMaker.setName(jsonOb.getString("name"));
887 finalTopicList.add(newTopic);
888 String newWhitelist = "";
890 if (!finalTopicList.isEmpty()) {
891 newWhitelist = StringUtils.join(finalTopicList, ",");
894 mirrorMaker.setWhitelist(newWhitelist);
896 String newRandom = getRandomNum();
897 updateWhiteList.setMessageID(newRandom);
898 updateWhiteList.setUpdateWhiteList(mirrorMaker);
901 g.toJson(updateWhiteList);
902 InputStream inputStream;
903 inputStream = IOUtils.toInputStream(g.toJson(updateWhiteList), UTF_8);
904 // callPubSub(newRandom, ctx, inputStream);
905 callPubSubForWhitelist(newRandom, ctx, inputStream, jsonOb.getString(NAMESPACE));
907 } else if (topicList.contains(newTopic)) {
908 sendErrResponse(ctx, "The topic already exist.");
910 } else if (!getNamespace(newTopic).equals(jsonOb.getString(NAMESPACE))) {
912 "The namespace of the topic does not match with the namespace you provided.");
916 JSONObject err = new JSONObject();
918 "listWhiteList is not available, please make sure MirrorMakerAgent is running");
919 DMaaPResponseBuilder.respondOk(ctx, err);
923 sendErrResponse(ctx, NO_USER_CREATE_PERMISSION);
928 sendErrResponse(ctx, "This is not a createWhitelist request. Please try again.");
931 } catch (IOException | CambriaApiException | ConfigDbException | AccessDeniedException
932 | TopicExistsException | missingReqdSetting | UnavailableException e) {
934 LOGGER.error("IOException: ", e);
937 // Send error response if user does not provide Authorization
939 sendErrResponse(ctx, NO_USER_PERMISSION);
943 @SuppressWarnings("unchecked")
945 @Produces("application/json")
946 @Path("/deletewhitelist")
947 public void deleteWhiteList(InputStream msg) {
949 DMaaPContext ctx = getDmaapContext();
950 if (checkMirrorMakerPermission(ctx,
951 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERUSER))) {
957 input = IOUtils.toString(msg, UTF_8);
959 if (input != null && input.length() > 0) {
960 input = removeExtraChar(input);
963 // Check if it is correct Json object
964 JSONObject jsonOb = null;
967 jsonOb = new JSONObject(input);
969 } catch (JSONException ex) {
971 sendErrResponse(ctx, errorMessages.getIncorrectJson());
972 LOGGER.error("JSONException: ", ex);
975 // Check if the request has name and name contains only alpha numeric,
976 // check if the request has namespace and
977 // check if the request has whitelistTopicName
978 if (jsonOb != null && jsonOb.length() == 3 && jsonOb.has("name") && isAlphaNumeric(jsonOb.getString("name"))
979 && jsonOb.has(NAMESPACE) && jsonOb.has("whitelistTopicName")
980 && isAlphaNumeric(jsonOb.getString("whitelistTopicName").substring(jsonOb.getString("whitelistTopicName").lastIndexOf(".")+1,
981 jsonOb.getString("whitelistTopicName").length()))) {
983 String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
984 "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString(NAMESPACE) + "|create";
986 // Check if the user have create permission for the
988 if (checkMirrorMakerPermission(ctx, permission)) {
990 JSONObject listAll = new JSONObject();
991 JSONObject emptyObject = new JSONObject();
993 // Create a listAllMirrorMaker Json object
995 listAll.put("listAllMirrorMaker", emptyObject);
997 } catch (JSONException e) {
999 LOGGER.error("JSONException: ", e);
1002 // set a random number as messageID
1003 String randomStr = getRandomNum();
1004 listAll.put("messageID", randomStr);
1005 InputStream inStream = null;
1007 // convert listAll Json object to InputStream object
1009 inStream = IOUtils.toInputStream(listAll.toString(), UTF_8);
1011 } catch (IOException ioe) {
1012 ioe.printStackTrace();
1013 LOGGER.error("IOException: ", ioe);
1015 // call listAllMirrorMaker
1016 mirrorService.pushEvents(ctx, topic, inStream, null, null);
1018 // subscribe for listMirrorMaker
1019 long startTime = System.currentTimeMillis();
1020 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1022 while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
1023 && (System.currentTimeMillis() - startTime) < timeout) {
1024 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1028 JSONArray jsonArray;
1029 JSONArray listMirrorMaker = null;
1031 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
1032 && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
1033 msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
1034 jsonArray = new JSONArray(msgFrmSubscribe);
1036 for (int i = 0; i < jsonArray.length(); i++) {
1037 jsonObj = jsonArray.getJSONObject(i);
1039 JSONObject obj = new JSONObject();
1040 if (jsonObj.has(MESSAGE)) {
1041 obj = jsonObj.getJSONObject(MESSAGE);
1043 if (obj.has("messageID") && obj.get("messageID").equals(randomStr) && obj.has(LISTMIRRORMAKER)) {
1044 listMirrorMaker = obj.getJSONArray(LISTMIRRORMAKER);
1048 String whitelist = null;
1049 if (listMirrorMaker != null) {
1050 for (int i = 0; i < listMirrorMaker.length(); i++) {
1052 JSONObject mm = new JSONObject();
1053 mm = listMirrorMaker.getJSONObject(i);
1054 String name = mm.getString("name");
1056 if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) {
1057 whitelist = mm.getString("whitelist");
1063 List<String> topicList = new ArrayList<>();
1065 if (whitelist != null) {
1066 topicList = Arrays.asList(whitelist.split(","));
1068 boolean removeTopic = false;
1069 String topicToRemove = jsonOb.getString("whitelistTopicName");
1071 if (topicList.contains(topicToRemove)) {
1074 sendErrResponse(ctx, "The topic does not exist.");
1079 UpdateWhiteList updateWhiteList = new UpdateWhiteList();
1080 MirrorMaker mirrorMaker = new MirrorMaker();
1082 mirrorMaker.setName(jsonOb.getString("name"));
1083 mirrorMaker.setWhitelist(removeTopic(whitelist, topicToRemove));
1085 String newRandom = getRandomNum();
1087 updateWhiteList.setMessageID(newRandom);
1088 updateWhiteList.setUpdateWhiteList(mirrorMaker);
1090 Gson g = new Gson();
1091 g.toJson(updateWhiteList);
1093 InputStream inputStream;
1094 inputStream = IOUtils.toInputStream(g.toJson(updateWhiteList), UTF_8);
1095 callPubSubForWhitelist(newRandom, ctx, inputStream, getNamespace(topicToRemove));
1100 JSONObject err = new JSONObject();
1102 "listWhiteList is not available, please make sure MirrorMakerAgent is running");
1103 DMaaPResponseBuilder.respondOk(ctx, err);
1107 sendErrResponse(ctx, NO_USER_CREATE_PERMISSION);
1112 sendErrResponse(ctx, "This is not a DeleteAllWhitelist request. Please try again.");
1115 } catch (IOException | CambriaApiException | ConfigDbException | AccessDeniedException
1116 | TopicExistsException | missingReqdSetting | UnavailableException e) {
1118 LOGGER.error("IOException: ", e);
1121 // Send error response if user does not provide Authorization
1123 sendErrResponse(ctx, NO_USER_PERMISSION);
1127 private String getNamespace(String topic) {
1128 return topic.substring(0, topic.lastIndexOf("."));
1131 private String removeTopic(String whitelist, String topicToRemove) {
1132 List<String> topicList = new ArrayList<>();
1133 List<String> newTopicList = new ArrayList<>();
1135 if (whitelist.contains(",")) {
1136 topicList = Arrays.asList(whitelist.split(","));
1140 if (topicList.contains(topicToRemove)) {
1141 for (String topic: topicList) {
1142 if (!topic.equals(topicToRemove)) {
1143 newTopicList.add(topic);
1148 String newWhitelist = StringUtils.join(newTopicList, ",");
1150 return newWhitelist;
1153 private void callPubSubForWhitelist(String randomStr, DMaaPContext ctx, InputStream inStream, String namespace) {
1156 mirrorService.pushEvents(ctx, topic, inStream, null, null);
1157 long startTime = System.currentTimeMillis();
1158 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1160 while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
1161 && (System.currentTimeMillis() - startTime) < timeout) {
1162 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1166 JSONArray jsonArray;
1167 JSONArray jsonArrayNamespace = null;
1169 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
1170 && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
1171 msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
1172 jsonArray = new JSONArray(msgFrmSubscribe);
1174 for (int i = 0; i < jsonArray.length(); i++) {
1175 jsonObj = jsonArray.getJSONObject(i);
1177 JSONObject obj = new JSONObject();
1178 if (jsonObj.has(MESSAGE)) {
1179 obj = jsonObj.getJSONObject(MESSAGE);
1181 if (obj.has("messageID") && obj.get("messageID").equals(randomStr) && obj.has(LISTMIRRORMAKER)) {
1182 jsonArrayNamespace = obj.getJSONArray(LISTMIRRORMAKER);
1185 JSONObject finalJasonObj = new JSONObject();
1186 JSONArray finalJsonArray = new JSONArray();
1188 for (int i = 0; i < jsonArrayNamespace.length(); i++) {
1191 mmObj = jsonArrayNamespace.getJSONObject(i);
1194 if (mmObj.has("whitelist")) {
1195 whitelist = getWhitelistByNamespace(mmObj.getString("whitelist"), namespace);
1197 if (whitelist != null) {
1198 mmObj.remove("whitelist");
1199 mmObj.put("whitelist", whitelist);
1201 mmObj.remove("whitelist");
1204 finalJsonArray.put(mmObj);
1206 finalJasonObj.put(LISTMIRRORMAKER, finalJsonArray);
1208 DMaaPResponseBuilder.respondOk(ctx, finalJasonObj);
1212 JSONObject err = new JSONObject();
1213 err.append(ERROR, "listMirrorMaker is not available, please make sure MirrorMakerAgent is running");
1214 DMaaPResponseBuilder.respondOk(ctx, err);
1217 } catch (Exception e) {
1218 LOGGER.error("Exception: ", e);
1222 private String getWhitelistByNamespace(String originalWhitelist, String namespace) {
1224 String whitelist = null;
1225 List<String> resultList = new ArrayList<>();
1226 List<String> whitelistList = new ArrayList<>();
1227 whitelistList = Arrays.asList(originalWhitelist.split(","));
1229 for (String topic : whitelistList) {
1230 if (StringUtils.isNotBlank(originalWhitelist) && getNamespace(topic).equals(namespace)) {
1231 resultList.add(topic);
1234 if (!resultList.isEmpty()) {
1235 whitelist = StringUtils.join(resultList, ",");
1241 private JSONArray getListMirrorMaker(String msgFrmSubscribe, String randomStr) {
1243 JSONArray jsonArray;
1244 JSONArray listMirrorMaker = new JSONArray();
1246 msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
1247 jsonArray = new JSONArray(msgFrmSubscribe);
1249 for (int i = 0; i < jsonArray.length(); i++) {
1250 jsonObj = jsonArray.getJSONObject(i);
1252 JSONObject obj = new JSONObject();
1253 if (jsonObj.has(MESSAGE)) {
1254 obj = jsonObj.getJSONObject(MESSAGE);
1256 if (obj.has("messageID") && obj.get("messageID").equals(randomStr) && obj.has(LISTMIRRORMAKER)) {
1257 listMirrorMaker = obj.getJSONArray(LISTMIRRORMAKER);
1261 return listMirrorMaker;