1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.nsa.dmaap.service;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.util.ArrayList;
27 import java.util.Date;
28 import java.util.List;
30 import javax.servlet.http.HttpServletRequest;
31 import javax.servlet.http.HttpServletResponse;
32 import javax.ws.rs.POST;
33 import javax.ws.rs.Path;
34 import javax.ws.rs.Produces;
35 import javax.ws.rs.core.Context;
36 import org.json.JSONObject;
37 import org.apache.commons.io.IOUtils;
38 import org.apache.commons.lang.StringUtils;
39 import com.att.eelf.configuration.EELFLogger;
40 import com.att.eelf.configuration.EELFManager;
41 import org.springframework.beans.factory.annotation.Autowired;
42 import org.springframework.beans.factory.annotation.Qualifier;
43 import org.springframework.stereotype.Component;
45 import com.att.nsa.cambria.utils.ConfigurationReader;
46 import com.att.nsa.cambria.utils.DMaaPResponseBuilder;
47 import com.att.nsa.cambria.utils.Utils;
48 import com.att.nsa.configs.ConfigDbException;
49 import com.att.nsa.dmaap.mmagent.*;
50 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
51 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
52 import com.google.gson.Gson;
53 import com.google.gson.JsonSyntaxException;
55 import edu.emory.mathcs.backport.java.util.Arrays;
57 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
58 import com.att.nsa.cambria.CambriaApiException;
59 import com.att.nsa.cambria.backends.ConsumerFactory.UnavailableException;
61 import org.json.JSONArray;
62 import org.json.JSONException;
63 import com.att.nsa.cambria.beans.DMaaPContext;
64 import com.att.nsa.cambria.constants.CambriaConstants;
65 import com.att.nsa.cambria.exception.DMaaPErrorMessages;
66 import com.att.nsa.cambria.metabroker.Broker.TopicExistsException;
67 import com.att.nsa.cambria.security.DMaaPAAFAuthenticator;
68 import com.att.nsa.cambria.security.DMaaPAAFAuthenticatorImpl;
69 import com.att.nsa.cambria.service.MMService;
72 * Rest Service class for Mirror Maker proxy Rest Services
74 * @author <a href="mailto:"></a>
80 public class MMRestService {
82 private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(MMRestService.class);
83 private static final String NO_ADMIN_PERMISSION = "No Mirror Maker Admin permission.";
84 private static final String NO_USER_PERMISSION = "No Mirror Maker User permission.";
85 private static final String NO_USER_CREATE_PERMISSION = "No Mirror Maker User Create permission.";
86 private static final String NAME_DOES_NOT_MEET_REQUIREMENT = "Mirror Maker name can only contain alpha numeric";
87 private static final String INVALID_IPPORT = "This is not a valid IP:Port";
88 private static final String MIRROR_MAKERADMIN = "msgRtr.mirrormakeradmin.aaf";
89 private static final String MIRROR_MAKERUSER = "msgRtr.mirrormakeruser.aaf";
90 private static final String UTF_8 = "UTF-8";
91 private static final String MESSAGE = "message";
92 private static final String LISTMIRRORMAKER = "listMirrorMaker";
93 private static final String ERROR = "error";
94 private static final String NAMESPACE = "namespace";
98 private String consumergroup;
99 private String consumerid;
102 @Qualifier("configurationReader")
103 private ConfigurationReader configReader;
106 private HttpServletRequest request;
109 private HttpServletResponse response;
112 private MMService mirrorService;
115 private DMaaPErrorMessages errorMessages;
117 private DMaaPAAFAuthenticator dmaapAAFauthenticator = new DMaaPAAFAuthenticatorImpl();
120 * This method is used for taking Configuration Object,HttpServletRequest
121 * Object,HttpServletRequest HttpServletResponse Object,HttpServletSession
124 * @return DMaaPContext object from where user can get Configuration
125 * Object,HttpServlet Object
128 private DMaaPContext getDmaapContext() {
129 DMaaPContext dmaapContext = new DMaaPContext();
130 dmaapContext.setRequest(request);
131 dmaapContext.setResponse(response);
132 dmaapContext.setConfigReader(configReader);
133 dmaapContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
139 @Produces("application/json")
141 public void callCreateMirrorMaker(InputStream msg) {
143 DMaaPContext ctx = getDmaapContext();
144 if (checkMirrorMakerPermission(ctx,
145 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERADMIN))) {
149 String randomStr = getRandomNum();
151 InputStream inStream = null;
152 Gson gson = new Gson();
153 CreateMirrorMaker createMirrorMaker = new CreateMirrorMaker();
156 input = IOUtils.toString(msg, UTF_8);
158 if (input != null && input.length() > 0) {
159 input = removeExtraChar(input);
162 // Check if the request has CreateMirrorMaker
164 createMirrorMaker = gson.fromJson(input, CreateMirrorMaker.class);
166 } catch (JsonSyntaxException ex) {
168 sendErrResponse(ctx, errorMessages.getIncorrectJson());
169 LOGGER.error("JsonSyntaxException: ", ex);
171 String name = createMirrorMaker.getCreateMirrorMaker()==null? "":createMirrorMaker.getCreateMirrorMaker().getName();
172 // send error message if it is not a CreateMirrorMaker request.
173 if (createMirrorMaker.getCreateMirrorMaker() == null) {
174 sendErrResponse(ctx, "This is not a CreateMirrorMaker request. Please try again.");
177 // MirrorMaker whitelist and status should not be passed
178 else if (createMirrorMaker.getCreateMirrorMaker().getWhitelist() != null
179 || createMirrorMaker.getCreateMirrorMaker().getStatus() != null) {
180 sendErrResponse(ctx, "This is not a CreateMirrorMaker request. Please try again.");
183 // if empty, blank name is entered
184 else if (StringUtils.isBlank(name)) {
185 sendErrResponse(ctx, "Name can not be empty or blank.");
188 // Check if the name contains only Alpha Numeric
189 else if (!isAlphaNumeric(name)) {
190 sendErrResponse(ctx, NAME_DOES_NOT_MEET_REQUIREMENT);
194 // Validate the IP and Port
195 else if (!StringUtils.isBlank(createMirrorMaker.getCreateMirrorMaker().getConsumer())
196 && !StringUtils.isBlank(createMirrorMaker.getCreateMirrorMaker().getProducer())
197 && !validateIPPort(createMirrorMaker.getCreateMirrorMaker().getConsumer())
198 || !validateIPPort(createMirrorMaker.getCreateMirrorMaker().getProducer())) {
199 sendErrResponse(ctx, INVALID_IPPORT);
202 // Set a random number as messageID, convert Json Object to
203 // InputStream and finally call publisher and subscriber
204 else if (isAlphaNumeric(name) && validateIPPort(createMirrorMaker.getCreateMirrorMaker().getConsumer())
205 && validateIPPort(createMirrorMaker.getCreateMirrorMaker().getProducer())) {
207 createMirrorMaker.setMessageID(randomStr);
208 inStream = IOUtils.toInputStream(gson.toJson(createMirrorMaker), UTF_8);
209 callPubSub(randomStr, ctx, inStream);
212 } catch (IOException e) {
214 LOGGER.error("IOException: ", e);
217 // Send error response if user does not provide Authorization
219 sendErrResponse(ctx, NO_ADMIN_PERMISSION);
224 @Produces("application/json")
226 public void callListAllMirrorMaker(InputStream msg) throws CambriaApiException {
227 DMaaPContext ctx = getDmaapContext();
229 if (checkMirrorMakerPermission(ctx,
230 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERADMIN))) {
237 input = IOUtils.toString(msg, UTF_8);
239 if (input != null && input.length() > 0) {
240 input = removeExtraChar(input);
243 String randomStr = getRandomNum();
244 JSONObject jsonOb = null;
247 jsonOb = new JSONObject(input);
249 } catch (JSONException ex) {
251 sendErrResponse(ctx, errorMessages.getIncorrectJson());
252 LOGGER.error("JSONException: ", ex);
255 // Check if request has listAllMirrorMaker and
256 // listAllMirrorMaker is empty
257 if ((jsonOb != null) && (jsonOb.has("listAllMirrorMaker")
258 && jsonOb.getJSONObject("listAllMirrorMaker").length() == 0)) {
259 jsonOb.put("messageID", randomStr);
260 InputStream inStream = null;
263 inStream = IOUtils.toInputStream(jsonOb.toString(), UTF_8);
265 } catch (IOException ioe) {
266 LOGGER.error("IOException: ", ioe);
269 callPubSub(randomStr, ctx, inStream);
273 sendErrResponse(ctx, "This is not a ListAllMirrorMaker request. Please try again.");
276 } catch (IOException ioe) {
278 LOGGER.error("IOException: ", ioe);
283 sendErrResponse(getDmaapContext(), NO_ADMIN_PERMISSION);
288 @Produces("application/json")
290 public void callUpdateMirrorMaker(InputStream msg) throws CambriaApiException {
292 DMaaPContext ctx = getDmaapContext();
293 if (checkMirrorMakerPermission(ctx,
294 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERADMIN))) {
298 String randomStr = getRandomNum();
300 InputStream inStream = null;
301 Gson gson = new Gson();
302 UpdateMirrorMaker updateMirrorMaker = new UpdateMirrorMaker();
305 input = IOUtils.toString(msg, UTF_8);
307 if (input != null && input.length() > 0) {
308 input = removeExtraChar(input);
311 // Check if the request has UpdateMirrorMaker
313 updateMirrorMaker = gson.fromJson(input, UpdateMirrorMaker.class);
315 } catch (JsonSyntaxException ex) {
317 sendErrResponse(ctx, errorMessages.getIncorrectJson());
318 LOGGER.error("JsonSyntaxException: ", ex);
321 String name = updateMirrorMaker.getUpdateMirrorMaker()==null? "":updateMirrorMaker.getUpdateMirrorMaker().getName();
322 // send error message if it is not a UpdateMirrorMaker request.
323 if (updateMirrorMaker.getUpdateMirrorMaker() == null) {
324 sendErrResponse(ctx, "This is not a UpdateMirrorMaker request. Please try again.");
327 // MirrorMaker whitelist and status should not be passed
328 else if (updateMirrorMaker.getUpdateMirrorMaker().getWhitelist() != null
329 || updateMirrorMaker.getUpdateMirrorMaker().getStatus() != null) {
330 sendErrResponse(ctx, "This is not a UpdateMirrorMaker request. Please try again.");
333 // if empty, blank name is entered
334 else if (StringUtils.isBlank(name)) {
335 sendErrResponse(ctx, "Name can not be empty or blank.");
338 // Check if the name contains only Alpha Numeric
339 else if (!isAlphaNumeric(name)) {
340 sendErrResponse(ctx, NAME_DOES_NOT_MEET_REQUIREMENT);
344 // Validate the IP and Port
345 else if (!StringUtils.isBlank(updateMirrorMaker.getUpdateMirrorMaker().getConsumer())
346 && !StringUtils.isBlank(updateMirrorMaker.getUpdateMirrorMaker().getProducer())
347 && !validateIPPort(updateMirrorMaker.getUpdateMirrorMaker().getConsumer())
348 || !validateIPPort(updateMirrorMaker.getUpdateMirrorMaker().getProducer())) {
349 sendErrResponse(ctx, INVALID_IPPORT);
352 // Set a random number as messageID, convert Json Object to
353 // InputStream and finally call publisher and subscriber
354 else if (isAlphaNumeric(name) && validateIPPort(updateMirrorMaker.getUpdateMirrorMaker().getConsumer())
355 && validateIPPort(updateMirrorMaker.getUpdateMirrorMaker().getProducer())) {
357 updateMirrorMaker.setMessageID(randomStr);
358 inStream = IOUtils.toInputStream(gson.toJson(updateMirrorMaker), UTF_8);
359 callPubSub(randomStr, ctx, inStream);
362 } catch (IOException e) {
364 LOGGER.error("IOException: ", e);
367 // Send error response if user does not provide Authorization
369 sendErrResponse(ctx, NO_ADMIN_PERMISSION);
374 @Produces("application/json")
376 public void callDeleteMirrorMaker(InputStream msg) throws CambriaApiException {
377 DMaaPContext ctx = getDmaapContext();
379 if (checkMirrorMakerPermission(ctx,
380 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERADMIN))) {
387 input = IOUtils.toString(msg, UTF_8);
389 if (input != null && input.length() > 0) {
390 input = removeExtraChar(input);
393 String randomStr = getRandomNum();
394 JSONObject jsonOb = null;
397 jsonOb = new JSONObject(input);
399 } catch (JSONException ex) {
401 sendErrResponse(ctx, errorMessages.getIncorrectJson());
402 LOGGER.error("JSONException: ", ex);
405 // Check if request has DeleteMirrorMaker and
406 // DeleteMirrorMaker has MirrorMaker object with name variable
407 // and check if the name contain only alpha numeric
409 && (jsonOb.has("deleteMirrorMaker") && jsonOb.getJSONObject("deleteMirrorMaker").length() == 1
410 && jsonOb.getJSONObject("deleteMirrorMaker").has("name")
411 && !StringUtils.isBlank(jsonOb.getJSONObject("deleteMirrorMaker").getString("name"))
412 && isAlphaNumeric(jsonOb.getJSONObject("deleteMirrorMaker").getString("name")))) {
414 jsonOb.put("messageID", randomStr);
415 InputStream inStream = null;
418 inStream = IOUtils.toInputStream(jsonOb.toString(), UTF_8);
420 } catch (IOException ioe) {
421 LOGGER.error("IOException: ", ioe);
424 callPubSub(randomStr, ctx, inStream);
428 sendErrResponse(ctx, "This is not a DeleteMirrorMaker request. Please try again.");
431 } catch (IOException ioe) {
432 LOGGER.error("IOException: ", ioe);
437 sendErrResponse(getDmaapContext(), NO_ADMIN_PERMISSION);
441 private boolean isListMirrorMaker(String msg, String messageID) {
442 String topicmsg = msg;
443 topicmsg = removeExtraChar(topicmsg);
447 boolean exist = false;
449 if (!StringUtils.isBlank(topicmsg) && topicmsg.length() > 2) {
450 jArray = new JSONArray(topicmsg);
452 for (int i = 0; i < jArray.length(); i++) {
453 jObj = jArray.getJSONObject(i);
455 JSONObject obj = new JSONObject();
456 if (jObj.has(MESSAGE)) {
457 obj = jObj.getJSONObject(MESSAGE);
459 if (obj.has("messageID") && obj.get("messageID").equals(messageID) && obj.has(LISTMIRRORMAKER)) {
468 private void loadProperty() {
470 this.timeout = Integer.parseInt(
471 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.timeout").trim());
472 this.topic = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.topic").trim();
473 this.consumergroup = AJSCPropertiesMap
474 .getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.consumergroup").trim();
475 this.consumerid = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.consumerid")
479 private String removeExtraChar(String message) {
480 String str = message;
481 str = checkJsonFormate(str);
483 if (str != null && str.length() > 0) {
484 str = str.replace("\\", "");
485 str = str.replace("\"{", "{");
486 str = str.replace("}\"", "}");
491 private String getRandomNum() {
492 long random = Math.round(Math.random() * 89999) + 10000;
493 String strLong = Long.toString(random);
497 private boolean isAlphaNumeric(String name) {
498 String pattern = "^[a-zA-Z0-9]*$";
499 if (name.matches(pattern)) {
505 // This method validate IPv4
506 private boolean validateIPPort(String ipPort) {
507 String pattern = "^([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\."
508 + "([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.([01]?\\d\\d?|2[0-4]\\d|25[0-5]):"
509 + "([1-9][0-9]{0,3}|[1-5][0-9]{4}|6[0-4][0-9]{3}|65[0-4][0-9]{2}|655[0-2][0-9]|6553[0-5])$";
510 if (ipPort.matches(pattern)) {
516 private String checkJsonFormate(String jsonStr) {
518 String json = jsonStr;
519 if (jsonStr != null && jsonStr.length() > 0 && jsonStr.startsWith("[") && !jsonStr.endsWith("]")) {
525 private boolean checkMirrorMakerPermission(DMaaPContext ctx, String permission) {
527 boolean hasPermission = false;
529 if (dmaapAAFauthenticator.aafAuthentication(ctx.getRequest(), permission)) {
530 hasPermission = true;
532 return hasPermission;
535 private void callPubSub(String randomstr, DMaaPContext ctx, InputStream inStream) {
537 mirrorService.pushEvents(ctx, topic, inStream, null, null);
538 long startTime = System.currentTimeMillis();
539 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
541 while (!isListMirrorMaker(msgFrmSubscribe, randomstr)
542 && (System.currentTimeMillis() - startTime) < timeout) {
543 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
547 JSONObject finalJsonObj = new JSONObject();
550 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
551 && isListMirrorMaker(msgFrmSubscribe, randomstr)) {
552 msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
553 jsonArray = new JSONArray(msgFrmSubscribe);
555 for (int i = 0; i < jsonArray.length(); i++) {
556 jsonObj = jsonArray.getJSONObject(i);
558 JSONObject obj = new JSONObject();
559 if (jsonObj.has(MESSAGE)) {
560 obj = jsonObj.getJSONObject(MESSAGE);
562 if (obj.has("messageID") && obj.get("messageID").equals(randomstr) && obj.has(LISTMIRRORMAKER)) {
563 finalJsonObj.put(LISTMIRRORMAKER, obj.get(LISTMIRRORMAKER));
568 DMaaPResponseBuilder.respondOk(ctx, finalJsonObj);
572 JSONObject err = new JSONObject();
573 err.append(ERROR, "listMirrorMaker is not available, please make sure MirrorMakerAgent is running");
574 DMaaPResponseBuilder.respondOk(ctx, err);
577 } catch (Exception e) {
578 LOGGER.error("Exception: ", e);
582 private void sendErrResponse(DMaaPContext ctx, String errMsg) {
583 JSONObject err = new JSONObject();
584 err.append(ERROR, errMsg);
587 DMaaPResponseBuilder.respondOk(ctx, err);
588 LOGGER.error(errMsg);
590 } catch (JSONException | IOException e) {
591 LOGGER.error("Error at sendErrResponse method:" + errMsg + "Exception name:" + e.getMessage());
595 @SuppressWarnings("unchecked")
597 @Produces("application/json")
598 @Path("/listallwhitelist")
599 public void listWhiteList(InputStream msg) {
601 DMaaPContext ctx = getDmaapContext();
602 if (checkMirrorMakerPermission(ctx,
603 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERUSER))) {
609 input = IOUtils.toString(msg, UTF_8);
611 if (input != null && input.length() > 0) {
612 input = removeExtraChar(input);
615 // Check if it is correct Json object
616 JSONObject jsonOb = null;
619 jsonOb = new JSONObject(input);
621 } catch (JSONException ex) {
623 sendErrResponse(ctx, errorMessages.getIncorrectJson());
624 LOGGER.error("JSONException: ", ex);
627 // Check if the request has name and name contains only alpha
629 // and check if the request has namespace and namespace contains
630 // only alpha numeric
631 if (jsonOb != null && jsonOb.length() == 2 && jsonOb.has("name")
632 && !StringUtils.isBlank(jsonOb.getString("name")) && isAlphaNumeric(jsonOb.getString("name"))
633 && jsonOb.has(NAMESPACE) && !StringUtils.isBlank(jsonOb.getString(NAMESPACE))) {
635 String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
636 "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString(NAMESPACE) + "|create";
638 // Check if the user have create permission for the
640 if (checkMirrorMakerPermission(ctx, permission)) {
642 JSONObject listAll = new JSONObject();
643 JSONObject emptyObject = new JSONObject();
645 // Create a listAllMirrorMaker Json object
647 listAll.put("listAllMirrorMaker", emptyObject);
649 } catch (JSONException e) {
651 LOGGER.error("JSONException: ", e);
654 // set a random number as messageID
655 String randomStr = getRandomNum();
656 listAll.put("messageID", randomStr);
657 InputStream inStream = null;
659 // convert listAll Json object to InputStream object
661 inStream = IOUtils.toInputStream(listAll.toString(), UTF_8);
663 } catch (IOException ioe) {
664 LOGGER.error("IOException: ", ioe);
666 // call listAllMirrorMaker
667 mirrorService.pushEvents(ctx, topic, inStream, null, null);
669 // subscribe for listMirrorMaker
670 long startTime = System.currentTimeMillis();
671 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
673 while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
674 && (System.currentTimeMillis() - startTime) < timeout) {
675 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
678 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
679 && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
681 JSONArray listMirrorMaker;
682 listMirrorMaker = getListMirrorMaker(msgFrmSubscribe, randomStr);
684 String whitelist = null;
685 for (int i = 0; i < listMirrorMaker.length(); i++) {
688 mm = listMirrorMaker.getJSONObject(i);
689 String name = mm.getString("name");
691 if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) {
692 whitelist = mm.getString("whitelist");
697 if (!StringUtils.isBlank(whitelist)) {
699 List<String> topicList = new ArrayList<>();
700 List<String> finalTopicList = new ArrayList<>();
701 topicList = Arrays.asList(whitelist.split(","));
703 for (String topic : topicList) {
704 if (topic != null && !topic.equals("null")
705 && getNamespace(topic).equals(jsonOb.getString(NAMESPACE))) {
707 finalTopicList.add(topic);
711 String topicNames = "";
713 if (!finalTopicList.isEmpty()) {
714 topicNames = StringUtils.join(finalTopicList, ",");
717 JSONObject listAllWhiteList = new JSONObject();
718 listAllWhiteList.put("name", jsonOb.getString("name"));
719 listAllWhiteList.put("whitelist", topicNames);
721 DMaaPResponseBuilder.respondOk(ctx, listAllWhiteList);
726 JSONObject err = new JSONObject();
728 "listWhiteList is not available, please make sure MirrorMakerAgent is running");
729 DMaaPResponseBuilder.respondOk(ctx, err);
733 sendErrResponse(ctx, NO_USER_CREATE_PERMISSION);
738 sendErrResponse(ctx, "This is not a ListAllWhitelist request. Please try again.");
741 } catch (IOException | CambriaApiException | ConfigDbException | AccessDeniedException
742 | TopicExistsException | missingReqdSetting | UnavailableException e) {
744 LOGGER.error("IOException: ", e);
747 sendErrResponse(ctx, NO_USER_PERMISSION);
751 @SuppressWarnings("unchecked")
753 @Produces("application/json")
754 @Path("/createwhitelist")
755 public void createWhiteList(InputStream msg) {
757 DMaaPContext ctx = getDmaapContext();
758 if (checkMirrorMakerPermission(ctx,
759 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERUSER))) {
765 input = IOUtils.toString(msg, UTF_8);
767 if (input != null && input.length() > 0) {
768 input = removeExtraChar(input);
771 // Check if it is correct Json object
772 JSONObject jsonOb = null;
775 jsonOb = new JSONObject(input);
777 } catch (JSONException ex) {
779 sendErrResponse(ctx, errorMessages.getIncorrectJson());
780 LOGGER.error("JSONException: ", ex);
783 // Check if the request has name and name contains only alpha
785 // check if the request has namespace and
786 // check if the request has whitelistTopicName
787 // check if the topic name contains only alpha numeric
788 if (jsonOb != null && jsonOb.length() == 3 && jsonOb.has("name")
789 && !StringUtils.isBlank(jsonOb.getString("name")) && isAlphaNumeric(jsonOb.getString("name"))
790 && jsonOb.has(NAMESPACE) && !StringUtils.isBlank(jsonOb.getString(NAMESPACE))
791 && jsonOb.has("whitelistTopicName")
792 && !StringUtils.isBlank(jsonOb.getString("whitelistTopicName"))
793 && isAlphaNumeric(jsonOb.getString("whitelistTopicName").substring(
794 jsonOb.getString("whitelistTopicName").lastIndexOf(".") + 1,
795 jsonOb.getString("whitelistTopicName").length()))) {
797 String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
798 "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString(NAMESPACE) + "|create";
800 // Check if the user have create permission for the
802 if (checkMirrorMakerPermission(ctx, permission)) {
804 JSONObject listAll = new JSONObject();
805 JSONObject emptyObject = new JSONObject();
807 // Create a listAllMirrorMaker Json object
809 listAll.put("listAllMirrorMaker", emptyObject);
811 } catch (JSONException e) {
813 LOGGER.error("JSONException: ", e);
816 // set a random number as messageID
817 String randomStr = getRandomNum();
818 listAll.put("messageID", randomStr);
819 InputStream inStream = null;
821 // convert listAll Json object to InputStream object
823 inStream = IOUtils.toInputStream(listAll.toString(), UTF_8);
825 } catch (IOException ioe) {
826 LOGGER.error("IOException: ", ioe);
828 // call listAllMirrorMaker
829 mirrorService.pushEvents(ctx, topic, inStream, null, null);
831 // subscribe for listMirrorMaker
832 long startTime = System.currentTimeMillis();
833 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
835 while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
836 && (System.currentTimeMillis() - startTime) < timeout) {
837 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
840 JSONArray listMirrorMaker;
842 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
843 && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
845 listMirrorMaker = getListMirrorMaker(msgFrmSubscribe, randomStr);
846 String whitelist = null;
848 for (int i = 0; i < listMirrorMaker.length(); i++) {
850 mm = listMirrorMaker.getJSONObject(i);
851 String name = mm.getString("name");
853 if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) {
854 whitelist = mm.getString("whitelist");
859 List<String> topicList = new ArrayList<>();
860 List<String> finalTopicList = new ArrayList<>();
862 if (whitelist != null) {
863 topicList = Arrays.asList(whitelist.split(","));
866 for (String st : topicList) {
867 if (!StringUtils.isBlank(st)) {
868 finalTopicList.add(st);
872 String newTopic = jsonOb.getString("whitelistTopicName");
874 if (!topicList.contains(newTopic)
875 && getNamespace(newTopic).equals(jsonOb.getString(NAMESPACE))) {
877 UpdateWhiteList updateWhiteList = new UpdateWhiteList();
878 MirrorMaker mirrorMaker = new MirrorMaker();
879 mirrorMaker.setName(jsonOb.getString("name"));
880 finalTopicList.add(newTopic);
881 String newWhitelist = "";
883 if (!finalTopicList.isEmpty()) {
884 newWhitelist = StringUtils.join(finalTopicList, ",");
887 mirrorMaker.setWhitelist(newWhitelist);
889 String newRandom = getRandomNum();
890 updateWhiteList.setMessageID(newRandom);
891 updateWhiteList.setUpdateWhiteList(mirrorMaker);
894 g.toJson(updateWhiteList);
895 InputStream inputStream;
896 inputStream = IOUtils.toInputStream(g.toJson(updateWhiteList), UTF_8);
897 // callPubSub(newRandom, ctx, inputStream);
898 callPubSubForWhitelist(newRandom, ctx, inputStream, jsonOb.getString(NAMESPACE));
900 } else if (topicList.contains(newTopic)) {
901 sendErrResponse(ctx, "The topic already exist.");
903 } else if (!getNamespace(newTopic).equals(jsonOb.getString(NAMESPACE))) {
905 "The namespace of the topic does not match with the namespace you provided.");
909 JSONObject err = new JSONObject();
911 "listWhiteList is not available, please make sure MirrorMakerAgent is running");
912 DMaaPResponseBuilder.respondOk(ctx, err);
916 sendErrResponse(ctx, NO_USER_CREATE_PERMISSION);
921 sendErrResponse(ctx, "This is not a createWhitelist request. Please try again.");
924 } catch (IOException | CambriaApiException | ConfigDbException | AccessDeniedException
925 | TopicExistsException | missingReqdSetting | UnavailableException e) {
927 LOGGER.error("IOException: ", e);
930 // Send error response if user does not provide Authorization
932 sendErrResponse(ctx, NO_USER_PERMISSION);
936 @SuppressWarnings("unchecked")
938 @Produces("application/json")
939 @Path("/deletewhitelist")
940 public void deleteWhiteList(InputStream msg) {
942 DMaaPContext ctx = getDmaapContext();
943 if (checkMirrorMakerPermission(ctx,
944 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, MIRROR_MAKERUSER))) {
950 input = IOUtils.toString(msg, UTF_8);
952 if (input != null && input.length() > 0) {
953 input = removeExtraChar(input);
956 // Check if it is correct Json object
957 JSONObject jsonOb = null;
960 jsonOb = new JSONObject(input);
962 } catch (JSONException ex) {
964 sendErrResponse(ctx, errorMessages.getIncorrectJson());
965 LOGGER.error("JSONException: ", ex);
968 // Check if the request has name and name contains only alpha
970 // check if the request has namespace and
971 // check if the request has whitelistTopicName
972 if (jsonOb != null && jsonOb.length() == 3 && jsonOb.has("name")
973 && isAlphaNumeric(jsonOb.getString("name")) && jsonOb.has(NAMESPACE)
974 && jsonOb.has("whitelistTopicName")
975 && isAlphaNumeric(jsonOb.getString("whitelistTopicName").substring(
976 jsonOb.getString("whitelistTopicName").lastIndexOf(".") + 1,
977 jsonOb.getString("whitelistTopicName").length()))) {
979 String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
980 "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString(NAMESPACE) + "|create";
982 // Check if the user have create permission for the
984 if (checkMirrorMakerPermission(ctx, permission)) {
986 JSONObject listAll = new JSONObject();
987 JSONObject emptyObject = new JSONObject();
989 // Create a listAllMirrorMaker Json object
991 listAll.put("listAllMirrorMaker", emptyObject);
993 } catch (JSONException e) {
995 LOGGER.error("JSONException: ", e);
998 // set a random number as messageID
999 String randomStr = getRandomNum();
1000 listAll.put("messageID", randomStr);
1001 InputStream inStream = null;
1003 // convert listAll Json object to InputStream object
1005 inStream = IOUtils.toInputStream(listAll.toString(), UTF_8);
1007 } catch (IOException ioe) {
1008 LOGGER.error("IOException: ", ioe);
1010 // call listAllMirrorMaker
1011 mirrorService.pushEvents(ctx, topic, inStream, null, null);
1013 // subscribe for listMirrorMaker
1014 long startTime = System.currentTimeMillis();
1015 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1017 while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
1018 && (System.currentTimeMillis() - startTime) < timeout) {
1019 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1023 JSONArray jsonArray;
1024 JSONArray listMirrorMaker = null;
1026 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
1027 && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
1028 msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
1029 jsonArray = new JSONArray(msgFrmSubscribe);
1031 for (int i = 0; i < jsonArray.length(); i++) {
1032 jsonObj = jsonArray.getJSONObject(i);
1034 JSONObject obj = new JSONObject();
1035 if (jsonObj.has(MESSAGE)) {
1036 obj = jsonObj.getJSONObject(MESSAGE);
1038 if (obj.has("messageID") && obj.get("messageID").equals(randomStr)
1039 && obj.has(LISTMIRRORMAKER)) {
1040 listMirrorMaker = obj.getJSONArray(LISTMIRRORMAKER);
1044 String whitelist = null;
1045 if (listMirrorMaker != null) {
1046 for (int i = 0; i < listMirrorMaker.length(); i++) {
1048 JSONObject mm = new JSONObject();
1049 mm = listMirrorMaker.getJSONObject(i);
1050 String name = mm.getString("name");
1052 if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) {
1053 whitelist = mm.getString("whitelist");
1059 List<String> topicList = new ArrayList<>();
1061 if (whitelist != null) {
1062 topicList = Arrays.asList(whitelist.split(","));
1064 boolean removeTopic = false;
1065 String topicToRemove = jsonOb.getString("whitelistTopicName");
1067 if (topicList.contains(topicToRemove)) {
1070 sendErrResponse(ctx, "The topic does not exist.");
1074 UpdateWhiteList updateWhiteList = new UpdateWhiteList();
1075 MirrorMaker mirrorMaker = new MirrorMaker();
1077 mirrorMaker.setName(jsonOb.getString("name"));
1078 mirrorMaker.setWhitelist(removeTopic(whitelist, topicToRemove));
1080 String newRandom = getRandomNum();
1082 updateWhiteList.setMessageID(newRandom);
1083 updateWhiteList.setUpdateWhiteList(mirrorMaker);
1085 Gson g = new Gson();
1086 g.toJson(updateWhiteList);
1088 InputStream inputStream;
1089 inputStream = IOUtils.toInputStream(g.toJson(updateWhiteList), UTF_8);
1090 callPubSubForWhitelist(newRandom, ctx, inputStream, getNamespace(topicToRemove));
1095 JSONObject err = new JSONObject();
1097 "listWhiteList is not available, please make sure MirrorMakerAgent is running");
1098 DMaaPResponseBuilder.respondOk(ctx, err);
1102 sendErrResponse(ctx, NO_USER_CREATE_PERMISSION);
1107 sendErrResponse(ctx, "This is not a DeleteAllWhitelist request. Please try again.");
1110 } catch (IOException | CambriaApiException | ConfigDbException | AccessDeniedException
1111 | TopicExistsException | missingReqdSetting | UnavailableException e) {
1113 LOGGER.error("IOException: ", e);
1116 // Send error response if user does not provide Authorization
1118 sendErrResponse(ctx, NO_USER_PERMISSION);
1122 private String getNamespace(String topic) {
1123 return topic.substring(0, topic.lastIndexOf("."));
1126 private String removeTopic(String whitelist, String topicToRemove) {
1127 List<String> topicList = new ArrayList<>();
1128 List<String> newTopicList = new ArrayList<>();
1130 if (whitelist.contains(",")) {
1131 topicList = Arrays.asList(whitelist.split(","));
1135 if (topicList.contains(topicToRemove)) {
1136 for (String topic : topicList) {
1137 if (!topic.equals(topicToRemove)) {
1138 newTopicList.add(topic);
1143 String newWhitelist = StringUtils.join(newTopicList, ",");
1145 return newWhitelist;
1148 private void callPubSubForWhitelist(String randomStr, DMaaPContext ctx, InputStream inStream, String namespace) {
1151 mirrorService.pushEvents(ctx, topic, inStream, null, null);
1152 long startTime = System.currentTimeMillis();
1153 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1155 while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
1156 && (System.currentTimeMillis() - startTime) < timeout) {
1157 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1161 JSONArray jsonArray;
1162 JSONArray jsonArrayNamespace = null;
1164 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
1165 && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
1166 msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
1167 jsonArray = new JSONArray(msgFrmSubscribe);
1169 for (int i = 0; i < jsonArray.length(); i++) {
1170 jsonObj = jsonArray.getJSONObject(i);
1172 JSONObject obj = new JSONObject();
1173 if (jsonObj.has(MESSAGE)) {
1174 obj = jsonObj.getJSONObject(MESSAGE);
1176 if (obj.has("messageID") && obj.get("messageID").equals(randomStr) && obj.has(LISTMIRRORMAKER)) {
1177 jsonArrayNamespace = obj.getJSONArray(LISTMIRRORMAKER);
1180 JSONObject finalJasonObj = new JSONObject();
1181 JSONArray finalJsonArray = new JSONArray();
1183 for (int i = 0; i < jsonArrayNamespace.length(); i++) {
1186 mmObj = jsonArrayNamespace.getJSONObject(i);
1189 if (mmObj.has("whitelist")) {
1190 whitelist = getWhitelistByNamespace(mmObj.getString("whitelist"), namespace);
1192 if (whitelist != null) {
1193 mmObj.remove("whitelist");
1194 mmObj.put("whitelist", whitelist);
1196 mmObj.remove("whitelist");
1199 finalJsonArray.put(mmObj);
1201 finalJasonObj.put(LISTMIRRORMAKER, finalJsonArray);
1203 DMaaPResponseBuilder.respondOk(ctx, finalJasonObj);
1207 JSONObject err = new JSONObject();
1208 err.append(ERROR, "listMirrorMaker is not available, please make sure MirrorMakerAgent is running");
1209 DMaaPResponseBuilder.respondOk(ctx, err);
1212 } catch (Exception e) {
1213 LOGGER.error("Exception: ", e);
1217 private String getWhitelistByNamespace(String originalWhitelist, String namespace) {
1219 String whitelist = null;
1220 List<String> resultList = new ArrayList<>();
1221 List<String> whitelistList = new ArrayList<>();
1222 whitelistList = Arrays.asList(originalWhitelist.split(","));
1224 for (String topic : whitelistList) {
1225 if (StringUtils.isNotBlank(originalWhitelist) && getNamespace(topic).equals(namespace)) {
1226 resultList.add(topic);
1229 if (!resultList.isEmpty()) {
1230 whitelist = StringUtils.join(resultList, ",");
1236 private JSONArray getListMirrorMaker(String msgFrmSubscribe, String randomStr) {
1238 JSONArray jsonArray;
1239 JSONArray listMirrorMaker = new JSONArray();
1241 msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
1242 jsonArray = new JSONArray(msgFrmSubscribe);
1244 for (int i = 0; i < jsonArray.length(); i++) {
1245 jsonObj = jsonArray.getJSONObject(i);
1247 JSONObject obj = new JSONObject();
1248 if (jsonObj.has(MESSAGE)) {
1249 obj = jsonObj.getJSONObject(MESSAGE);
1251 if (obj.has("messageID") && obj.get("messageID").equals(randomStr) && obj.has(LISTMIRRORMAKER)) {
1252 listMirrorMaker = obj.getJSONArray(LISTMIRRORMAKER);
1256 return listMirrorMaker;