/******************************************************************************* * ============LICENSE_START======================================================= * org.onap.dmaap * ================================================================================ * Copyright © 2017 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= * * ECOMP is a trademark and service mark of AT&T Intellectual Property. * *******************************************************************************/ package org.onap.dmaap.service; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.Date; import java.util.List; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.core.Context; import org.json.JSONObject; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.http.HttpStatus; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import org.onap.dmaap.dmf.mr.utils.ConfigurationReader; import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder; import org.onap.dmaap.dmf.mr.utils.Utils; import com.att.nsa.configs.ConfigDbException; import org.onap.dmaap.mmagent.*; import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; import com.google.gson.Gson; import com.google.gson.JsonSyntaxException; import edu.emory.mathcs.backport.java.util.Arrays; import com.att.ajsc.filemonitor.AJSCPropertiesMap; import org.onap.dmaap.dmf.mr.CambriaApiException; import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException; import org.json.JSONArray; import org.json.JSONException; import org.onap.dmaap.dmf.mr.beans.DMaaPContext; import org.onap.dmaap.dmf.mr.constants.CambriaConstants; import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages; import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode; import org.onap.dmaap.dmf.mr.exception.ErrorResponse; import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException; import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator; import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl; import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl; import org.onap.dmaap.dmf.mr.service.MMService; /** * Rest Service class for Mirror Maker proxy Rest Services * * @author * * @since May 25, 2016 */ @Component public class MMRestService { private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(MMRestService.class); private static final String NO_ADMIN_PERMISSION = "No Mirror Maker Admin permission."; private static final String NO_USER_PERMISSION = "No Mirror Maker User permission."; private static final String NO_USER_CREATE_PERMISSION = "No Mirror Maker User Create permission."; private static final String NAME_DOES_NOT_MEET_REQUIREMENT = "Mirror Maker name can only contain alpha numeric"; private static final String INVALID_IPPORT = "This is not a valid IP:Port"; private static final String MIRROR_MAKERADMIN = "msgRtr.mirrormakeradmin.aaf"; private static final String MIRROR_MAKERUSER = "msgRtr.mirrormakeruser.aaf"; private static final String UTF_8 = "UTF-8"; private static final String MESSAGE = "message"; private static final String LISTMIRRORMAKER = "listMirrorMaker"; private static final String ERROR = "error"; private static final String NAMESPACE = "namespace"; private String topic; private int timeout; private String consumergroup; private String consumerid; @Autowired @Qualifier("configurationReader") private ConfigurationReader configReader; @Context private HttpServletRequest request; @Context private HttpServletResponse response; @Autowired private MMService mirrorService; @Autowired private DMaaPErrorMessages errorMessages; private ErrorResponse errResJson = new ErrorResponse(HttpStatus.SC_BAD_REQUEST, DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), "", null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent", null); private DMaaPAAFAuthenticator dmaapAAFauthenticator = new DMaaPAAFAuthenticatorImpl(); /** * This method is used for taking Configuration Object,HttpServletRequest * Object,HttpServletRequest HttpServletResponse Object,HttpServletSession * Object. * * @return DMaaPContext object from where user can get Configuration * Object,HttpServlet Object * */ private DMaaPContext getDmaapContext() { DMaaPContext dmaapContext = new DMaaPContext(); dmaapContext.setRequest(request); dmaapContext.setResponse(response); dmaapContext.setConfigReader(configReader); dmaapContext.setConsumerRequestTime(Utils.getFormattedDate(new Date())); return dmaapContext; } @POST @Produces("application/json") @Path("/create") public void callCreateMirrorMaker(InputStream msg) throws Exception { DMaaPContext ctx = getDmaapContext(); if (checkMirrorMakerPermission(ctx, AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeradmin.aaf"))) { loadProperty(); String input = null; String randomStr = getRandomNum(); InputStream inStream = null; Gson gson = new Gson(); CreateMirrorMaker createMirrorMaker = new CreateMirrorMaker(); LOGGER.info("Starting Create MirrorMaker"); try { input = IOUtils.toString(msg, "UTF-8"); if (input != null && input.length() > 0) { input = removeExtraChar(input); } // Check if the request has CreateMirrorMaker try { createMirrorMaker = gson.fromJson(input, CreateMirrorMaker.class); } catch (JsonSyntaxException ex) { errResJson.setErrorMessage(errorMessages.getIncorrectJson() + ex.getMessage()); LOGGER.info(errResJson.toString()); throw new CambriaApiException(errResJson); } // send error message if it is not a CreateMirrorMaker request. if (createMirrorMaker.getCreateMirrorMaker() == null) { errResJson.setErrorMessage("This is not a CreateMirrorMaker request. Please try again."); LOGGER.info(errResJson.toString()); throw new CambriaApiException(errResJson); } else { createMirrorMaker.validateJSON(); } String name = createMirrorMaker.getCreateMirrorMaker().getName(); // if empty, blank name is entered if (StringUtils.isBlank(name)) { errResJson.setErrorMessage("Name can not be empty or blank."); LOGGER.info(errResJson.toString()); throw new CambriaApiException(errResJson); } // Check if the name contains only Alpha Numeric else if (!isAlphaNumeric(name)) { errResJson.setErrorMessage(NAME_DOES_NOT_MEET_REQUIREMENT); LOGGER.info(errResJson.toString()); throw new CambriaApiException(errResJson); } else { if (null == createMirrorMaker.getMessageID() || createMirrorMaker.getMessageID().isEmpty()) { createMirrorMaker.setMessageID(randomStr); } inStream = IOUtils.toInputStream(gson.toJson(createMirrorMaker), "UTF-8"); JSONObject existMirrorMaker = validateMMExists(ctx, name); if (!(boolean) existMirrorMaker.get("exists")) { JSONObject finalJsonObj = callPubSub(createMirrorMaker.getMessageID(), ctx, inStream, name, false); DMaaPResponseBuilder.respondOk(ctx, finalJsonObj); } else { errResJson.setErrorMessage("MirrorMaker " + name + " already exists"); LOGGER.info(errResJson.toString()); throw new CambriaApiException(errResJson); } } } catch (IOException e) { throw e; } } // Send error response if user does not provide Authorization else { ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_ADMIN_PERMISSION, null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent", ctx.getRequest().getRemoteHost()); LOGGER.info(errRes.toString()); throw new CambriaApiException(errRes); } } @POST @Produces("application/json") @Path("/listall") public void callListAllMirrorMaker(InputStream msg) throws Exception { DMaaPContext ctx = getDmaapContext(); if (checkMirrorMakerPermission(ctx, AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeradmin.aaf"))) { loadProperty(); String input = null; Gson gson = new Gson(); try { input = IOUtils.toString(msg, "UTF-8"); if (input != null && input.length() > 0) { input = removeExtraChar(input); } String randomStr = getRandomNum(); JSONObject jsonOb = null; try { jsonOb = new JSONObject(input); } catch (JSONException ex) { errResJson.setErrorMessage(errorMessages.getIncorrectJson()); LOGGER.info(errResJson.toString()); throw new CambriaApiException(errResJson); } // Check if request has listAllMirrorMaker and // listAllMirrorMaker is empty if (jsonOb.has("listAllMirrorMaker") && jsonOb.getJSONObject("listAllMirrorMaker").length() == 0) { if (!jsonOb.has("messageID") || jsonOb.isNull("messageID")) { jsonOb.put("messageID", randomStr); } InputStream inStream = null; MirrorMaker mirrormaker = gson.fromJson(input, MirrorMaker.class); inStream = IOUtils.toInputStream(jsonOb.toString(), "UTF-8"); JSONObject responseJson = callPubSub(jsonOb.getString("messageID"), ctx, inStream, mirrormaker.name, true); DMaaPResponseBuilder.respondOk(ctx, responseJson); } else { ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE, DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(), "This is not a ListAllMirrorMaker request. Please try again.", null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent", ctx.getRequest().getRemoteHost()); LOGGER.info(errRes.toString()); throw new CambriaApiException(errRes); } } catch (IOException ioe) { throw ioe; } } else { ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_ADMIN_PERMISSION, null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent", ctx.getRequest().getRemoteHost()); LOGGER.info(errRes.toString()); throw new CambriaApiException(errRes); } } @POST @Produces("application/json") @Path("/update") public void callUpdateMirrorMaker(InputStream msg) throws Exception { DMaaPContext ctx = getDmaapContext(); if (checkMirrorMakerPermission(ctx, AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeradmin.aaf"))) { loadProperty(); String input = null; String randomStr = getRandomNum(); InputStream inStream = null; Gson gson = new Gson(); UpdateMirrorMaker updateMirrorMaker = new UpdateMirrorMaker(); JSONObject jsonOb, jsonObInput = null; try { input = IOUtils.toString(msg, "UTF-8"); if (input != null && input.length() > 0) { input = removeExtraChar(input); } // Check if the request has UpdateMirrorMaker try { updateMirrorMaker = gson.fromJson(input, UpdateMirrorMaker.class); jsonOb = new JSONObject(input); jsonObInput = (JSONObject) jsonOb.get("updateMirrorMaker"); } catch (JsonSyntaxException ex) { errResJson.setErrorMessage(errorMessages.getIncorrectJson() + ex.getMessage()); LOGGER.info(errResJson.toString()); throw new CambriaApiException(errResJson); } // send error message if it is not a UpdateMirrorMaker request. if (updateMirrorMaker.getUpdateMirrorMaker() == null) { errResJson.setErrorMessage("This is not a UpdateMirrorMaker request. Please try again."); LOGGER.info(errResJson.toString()); throw new CambriaApiException(errResJson); } else { updateMirrorMaker.validateJSON(jsonObInput); } String name = updateMirrorMaker.getUpdateMirrorMaker().getName(); // if empty, blank name is entered if (StringUtils.isBlank(name)) { errResJson.setErrorMessage("Name can not be empty or blank."); LOGGER.info(errResJson.toString()); throw new CambriaApiException(errResJson); } // Check if the name contains only Alpha Numeric else if (!isAlphaNumeric(name)) { errResJson.setErrorMessage(NAME_DOES_NOT_MEET_REQUIREMENT); LOGGER.info(errResJson.toString()); throw new CambriaApiException(errResJson); } // Set a random number as messageID, convert Json Object to // InputStream and finally call publisher and subscriber else { if (null == updateMirrorMaker.getMessageID() || updateMirrorMaker.getMessageID().isEmpty()) { updateMirrorMaker.setMessageID(randomStr); } JSONObject existMirrorMaker = validateMMExists(ctx, name); if ((boolean) existMirrorMaker.get("exists")) { JSONObject existMM = (JSONObject) existMirrorMaker.get("listMirrorMaker"); if (!jsonObInput.has("numStreams")) { updateMirrorMaker.getUpdateMirrorMaker().setNumStreams(existMM.getInt("numStreams")); } if (!jsonObInput.has("enablelogCheck")) { updateMirrorMaker.getUpdateMirrorMaker() .setEnablelogCheck(existMM.getBoolean("enablelogCheck")); } inStream = IOUtils.toInputStream(gson.toJson(updateMirrorMaker), "UTF-8"); JSONObject finalJsonObj = callPubSub(updateMirrorMaker.getMessageID(), ctx, inStream, name, false); DMaaPResponseBuilder.respondOk(ctx, finalJsonObj); } else { errResJson.setErrorMessage("MirrorMaker " + name + " does not exist"); LOGGER.info(errResJson.toString()); throw new CambriaApiException(errResJson); } } } catch (IOException e) { LOGGER.error("Error in callUpdateMirrorMaker:", e); } } // Send error response if user does not provide Authorization else { ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_ADMIN_PERMISSION, null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent", ctx.getRequest().getRemoteHost()); LOGGER.info(errRes.toString()); throw new CambriaApiException(errRes); } } @POST @Produces("application/json") @Path("/delete") public void callDeleteMirrorMaker(InputStream msg) throws JSONException, Exception { DMaaPContext ctx = getDmaapContext(); if (checkMirrorMakerPermission(ctx, AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeradmin.aaf"))) { loadProperty(); String input = null; Gson gson = new Gson(); MirrorMaker mirrormaker = new MirrorMaker(); try { input = IOUtils.toString(msg, "UTF-8"); if (input != null && input.length() > 0) { input = removeExtraChar(input); } String randomStr = getRandomNum(); JSONObject jsonOb = null; try { jsonOb = new JSONObject(input); mirrormaker = gson.fromJson(input, MirrorMaker.class); } catch (JSONException ex) { errResJson.setErrorMessage(errorMessages.getIncorrectJson()); LOGGER.info(errResJson.toString()); throw new CambriaApiException(errResJson); } // Check if request has DeleteMirrorMaker and // DeleteMirrorMaker has MirrorMaker object with name variable // and check if the name contain only alpha numeric if (jsonOb.has("deleteMirrorMaker") && jsonOb.getJSONObject("deleteMirrorMaker").length() == 1 && jsonOb.getJSONObject("deleteMirrorMaker").has("name") && !StringUtils.isBlank(jsonOb.getJSONObject("deleteMirrorMaker").getString("name")) && isAlphaNumeric(jsonOb.getJSONObject("deleteMirrorMaker").getString("name"))) { if (!jsonOb.has("messageID") || jsonOb.isNull("messageID")) { jsonOb.put("messageID", randomStr); } InputStream inStream = null; inStream = IOUtils.toInputStream(jsonOb.toString(), "UTF-8"); JSONObject deleteMM = jsonOb.getJSONObject("deleteMirrorMaker"); JSONObject existMirrorMaker = validateMMExists(ctx, deleteMM.getString("name")); if ((boolean) existMirrorMaker.get("exists")) { JSONObject finalJsonObj = callPubSub(jsonOb.getString("messageID"), ctx, inStream, mirrormaker.name, false); DMaaPResponseBuilder.respondOk(ctx, finalJsonObj); } else { errResJson.setErrorMessage("MirrorMaker " + deleteMM.getString("name") + " does not exist"); LOGGER.info(errResJson.toString()); throw new CambriaApiException(errResJson); } } else { errResJson.setErrorMessage("This is not a DeleteMirrorMaker request. Please try again."); LOGGER.info(errResJson.toString()); throw new CambriaApiException(errResJson); } } catch (IOException ioe) { LOGGER.error("Error in callDeleteMirrorMaker:", ioe); throw ioe; } } else { ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_ADMIN_PERMISSION, null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent", ctx.getRequest().getRemoteHost()); LOGGER.info(errRes.toString()); throw new CambriaApiException(errRes); } } public boolean isListMirrorMaker(String msg, String messageID) { String topicmsg = msg; topicmsg = removeExtraChar(topicmsg); JSONObject jObj = new JSONObject(); JSONArray jArray = null; boolean exist = false; if (!StringUtils.isBlank(topicmsg) && topicmsg.length() > 2) { jArray = new JSONArray(topicmsg); for (int i = 0; i < jArray.length(); i++) { jObj = jArray.getJSONObject(i); if (jObj.has("messageID") && jObj.get("messageID").equals(messageID) && jObj.has("listMirrorMaker")) { exist = true; break; } } } return exist; } private void loadProperty() { this.timeout = Integer.parseInt( AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.timeout").trim()); this.topic = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.topic").trim(); this.consumergroup = AJSCPropertiesMap .getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.consumergroup").trim(); this.consumerid = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.consumerid") .trim(); } private String removeExtraChar(String message) { String str = message; str = checkJsonFormate(str); if (str != null && str.length() > 0) { str = str.replace("\\", ""); str = str.replace("\"{", "{"); str = str.replace("}\"", "}"); } return str; } private String getRandomNum() { long random = Math.round(Math.random() * 89999) + 10000; String strLong = Long.toString(random); return strLong; } private boolean isAlphaNumeric(String name) { String pattern = "^[a-zA-Z0-9]*$"; if (name.matches(pattern)) { return true; } return false; } private String checkJsonFormate(String jsonStr) { String json = jsonStr; if (jsonStr != null && jsonStr.length() > 0 && jsonStr.startsWith("[") && !jsonStr.endsWith("]")) { json = json + "]"; } return json; } private boolean checkMirrorMakerPermission(DMaaPContext ctx, String permission) { boolean hasPermission = false; if (dmaapAAFauthenticator.aafAuthentication(ctx.getRequest(), permission)) { hasPermission = true; } return hasPermission; } public JSONObject callPubSub(String randomstr, DMaaPContext ctx, InputStream inStream, String name, boolean listAll) throws Exception { loadProperty(); JSONObject jsonObj = new JSONObject(); JSONObject finalJsonObj = new JSONObject(); JSONArray jsonArray = null; try { String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid); mirrorService.pushEvents(ctx, topic, inStream, null, null); long startTime = System.currentTimeMillis(); while (!isListMirrorMaker(msgFrmSubscribe, randomstr) && ((System.currentTimeMillis() - startTime) < timeout)) { msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid); } if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0 && isListMirrorMaker(msgFrmSubscribe, randomstr)) { msgFrmSubscribe = removeExtraChar(msgFrmSubscribe); jsonArray = new JSONArray(msgFrmSubscribe); jsonObj = jsonArray.getJSONObject(0); if (jsonObj.has("listMirrorMaker")) { jsonArray = (JSONArray) jsonObj.get("listMirrorMaker"); if (true == listAll) { return jsonObj; } else { for (int i = 0; i < jsonArray.length(); i++) { jsonObj = jsonArray.getJSONObject(i); if (null != name && !name.isEmpty()) { if (jsonObj.getString("name").equals(name)) { finalJsonObj.put("listMirrorMaker", jsonObj); break; } } else { finalJsonObj.put("listMirrorMaker", jsonObj); } } } } return finalJsonObj; } else { ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE, DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(), "listMirrorMaker is not available, please make sure MirrorMakerAgent is running", null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent", ctx.getRequest().getRemoteHost()); LOGGER.info(errRes.toString()); throw new CambriaApiException(errRes); } } catch (Exception e) { LOGGER.error("Error in callPubSub", e); throw e; } } private void sendErrResponse(DMaaPContext ctx, String errMsg) { JSONObject err = new JSONObject(); err.append(ERROR, errMsg); try { DMaaPResponseBuilder.respondOk(ctx, err); LOGGER.error(errMsg); } catch (JSONException | IOException e) { LOGGER.error("Error at sendErrResponse method:" + errMsg + "Exception name:" + e); } } @SuppressWarnings("unchecked") @POST @Produces("application/json") @Path("/listallwhitelist") public void listWhiteList(InputStream msg) throws Exception { DMaaPContext ctx = getDmaapContext(); if (checkMirrorMakerPermission(ctx, AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeruser.aaf"))) { loadProperty(); String input = null; try { input = IOUtils.toString(msg, "UTF-8"); if (input != null && input.length() > 0) { input = removeExtraChar(input); } // Check if it is correct Json object JSONObject jsonOb = null; try { jsonOb = new JSONObject(input); } catch (JSONException ex) { errResJson.setErrorMessage(errorMessages.getIncorrectJson()); LOGGER.info(errResJson.toString()); throw new CambriaApiException(errResJson); } // Check if the request has name and name contains only alpha // numeric // and check if the request has namespace and namespace contains // only alpha numeric if (jsonOb.length() == 2 && jsonOb.has("name") && !StringUtils.isBlank(jsonOb.getString("name")) && isAlphaNumeric(jsonOb.getString("name")) && jsonOb.has("namespace") && !StringUtils.isBlank(jsonOb.getString("namespace"))) { String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString("namespace") + "|create"; // Check if the user have create permission for the // namespace if (checkMirrorMakerPermission(ctx, permission)) { JSONObject listAll = new JSONObject(); JSONObject emptyObject = new JSONObject(); // Create a listAllMirrorMaker Json object try { listAll.put("listAllMirrorMaker", emptyObject); } catch (JSONException e) { errResJson.setErrorMessage(errorMessages.getIncorrectJson()); LOGGER.info(errResJson.toString()); throw new CambriaApiException(errResJson); } // set a random number as messageID String randomStr = getRandomNum(); listAll.put("messageID", randomStr); InputStream inStream = null; // convert listAll Json object to InputStream object inStream = IOUtils.toInputStream(listAll.toString(), "UTF-8"); JSONObject listMirrorMaker = new JSONObject(); listMirrorMaker = callPubSub(randomStr, ctx, inStream, null, true); String whitelist = null; JSONArray listMMArray = new JSONArray(); if (listMirrorMaker.has("listMirrorMaker")) { listMMArray = (JSONArray) listMirrorMaker.get("listMirrorMaker"); for (int i = 0; i < listMMArray.length(); i++) { JSONObject mm = new JSONObject(); mm = listMMArray.getJSONObject(i); String name = mm.getString("name"); if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) { whitelist = mm.getString("whitelist"); break; } } if (!StringUtils.isBlank(whitelist)) { List topicList = new ArrayList(); List finalTopicList = new ArrayList(); topicList = Arrays.asList(whitelist.split(",")); for (String topic : topicList) { if (topic != null && !topic.equals("null") && getNamespace(topic).equals(jsonOb.getString("namespace"))) { finalTopicList.add(topic); } } String topicNames = ""; if (finalTopicList.size() > 0) { topicNames = StringUtils.join(finalTopicList, ","); } JSONObject listAllWhiteList = new JSONObject(); listAllWhiteList.put("name", jsonOb.getString("name")); listAllWhiteList.put("whitelist", topicNames); DMaaPResponseBuilder.respondOk(ctx, listAllWhiteList); } } else { errResJson.setErrorMessage( "listWhiteList is not available, please make sure MirrorMakerAgent is running"); LOGGER.info(errResJson.toString()); throw new CambriaApiException(errResJson); } } else { ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_USER_CREATE_PERMISSION, null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent", ctx.getRequest().getRemoteHost()); LOGGER.info(errRes.toString()); throw new CambriaApiException(errRes); } } else { errResJson.setErrorMessage("This is not a ListAllWhitelist request. Please try again."); LOGGER.info(errResJson.toString()); throw new CambriaApiException(errResJson); } } catch (IOException e) { LOGGER.error("Error in listWhiteList", e); } } else { ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_USER_PERMISSION, null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent", ctx.getRequest().getRemoteHost()); LOGGER.info(errRes.toString()); throw new CambriaApiException(errRes); } } @SuppressWarnings("unchecked") @POST @Produces("application/json") @Path("/createwhitelist") public void createWhiteList(InputStream msg) throws Exception { DMaaPContext ctx = getDmaapContext(); if (checkMirrorMakerPermission(ctx, AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeruser.aaf"))) { loadProperty(); String input = null; try { input = IOUtils.toString(msg, "UTF-8"); if (input != null && input.length() > 0) { input = removeExtraChar(input); } // Check if it is correct Json object JSONObject jsonOb = null; try { jsonOb = new JSONObject(input); } catch (JSONException ex) { errResJson.setErrorMessage(errorMessages.getIncorrectJson()); LOGGER.info(errResJson.toString()); throw new CambriaApiException(errResJson); } // Check if the request has name and name contains only alpha // numeric, // check if the request has namespace and // check if the request has whitelistTopicName // check if the topic name contains only alpha numeric if (jsonOb.length() == 3 && jsonOb.has("name") && !StringUtils.isBlank(jsonOb.getString("name")) && isAlphaNumeric(jsonOb.getString("name")) && jsonOb.has("namespace") && !StringUtils.isBlank(jsonOb.getString("namespace")) && jsonOb.has("whitelistTopicName") && !StringUtils.isBlank(jsonOb.getString("whitelistTopicName")) && isAlphaNumeric(jsonOb.getString("whitelistTopicName").substring( jsonOb.getString("whitelistTopicName").lastIndexOf(".") + 1, jsonOb .getString("whitelistTopicName").length()))) { String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString("namespace") + "|create"; // Check if the user have create permission for the // namespace if (checkMirrorMakerPermission(ctx, permission)) { JSONObject listAll = new JSONObject(); JSONObject emptyObject = new JSONObject(); // Create a listAllMirrorMaker Json object try { listAll.put("listAllMirrorMaker", emptyObject); } catch (JSONException e) { errResJson.setErrorMessage(errorMessages.getIncorrectJson()); LOGGER.info(errResJson.toString()); throw new CambriaApiException(errResJson); } // set a random number as messageID String randomStr = getRandomNum(); listAll.put("messageID", randomStr); InputStream inStream = null; // convert listAll Json object to InputStream object inStream = IOUtils.toInputStream(listAll.toString(), "UTF-8"); String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid); // call listAllMirrorMaker mirrorService.pushEvents(ctx, topic, inStream, null, null); // subscribe for listMirrorMaker long startTime = System.currentTimeMillis(); while (!isListMirrorMaker(msgFrmSubscribe, randomStr) && (System.currentTimeMillis() - startTime) < timeout) { msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid); } JSONArray listMirrorMaker = null; if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0 && isListMirrorMaker(msgFrmSubscribe, randomStr)) { listMirrorMaker = getListMirrorMaker(msgFrmSubscribe, randomStr); String whitelist = null; for (int i = 0; i < listMirrorMaker.length(); i++) { JSONObject mm = new JSONObject(); mm = listMirrorMaker.getJSONObject(i); String name = mm.getString("name"); if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) { whitelist = mm.getString("whitelist"); break; } } List topicList = new ArrayList(); List finalTopicList = new ArrayList(); if (whitelist != null) { topicList = Arrays.asList(whitelist.split(",")); } for (String st : topicList) { if (!StringUtils.isBlank(st)) { finalTopicList.add(st); } } String newTopic = jsonOb.getString("whitelistTopicName"); if (!topicList.contains(newTopic) && getNamespace(newTopic).equals(jsonOb.getString("namespace"))) { UpdateWhiteList updateWhiteList = new UpdateWhiteList(); MirrorMaker mirrorMaker = new MirrorMaker(); mirrorMaker.setName(jsonOb.getString("name")); finalTopicList.add(newTopic); String newWhitelist = ""; if (finalTopicList.size() > 0) { newWhitelist = StringUtils.join(finalTopicList, ","); } mirrorMaker.setWhitelist(newWhitelist); String newRandom = getRandomNum(); updateWhiteList.setMessageID(newRandom); updateWhiteList.setUpdateWhiteList(mirrorMaker); Gson g = new Gson(); g.toJson(updateWhiteList); InputStream inputStream = null; inputStream = IOUtils.toInputStream(g.toJson(updateWhiteList), "UTF-8"); // callPubSub(newRandom, ctx, inputStream); callPubSubForWhitelist(newRandom, ctx, inputStream, jsonOb); } else if (topicList.contains(newTopic)) { ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR, DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), "The topic already exist.", null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent", ctx.getRequest().getRemoteHost()); LOGGER.info(errRes.toString()); throw new CambriaApiException(errRes); } else if (!getNamespace(newTopic).equals(jsonOb.getString("namespace"))) { ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR, DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), "The namespace of the topic does not match with the namespace you provided.", null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent", ctx.getRequest().getRemoteHost()); LOGGER.info(errRes.toString()); throw new CambriaApiException(errRes); } } else { ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE, DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(), "listWhiteList is not available, please make sure MirrorMakerAgent is running", null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent", ctx.getRequest().getRemoteHost()); LOGGER.info(errRes.toString()); throw new CambriaApiException(errRes); } } else { ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, DMaaPResponseCode.UNABLE_TO_AUTHORIZE.getResponseCode(), NO_USER_CREATE_PERMISSION, null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent", ctx.getRequest().getRemoteHost()); LOGGER.info(errRes.toString()); throw new CambriaApiException(errRes); } } else { errResJson.setErrorMessage("This is not a createWhitelist request. Please try again."); LOGGER.info(errResJson.toString()); throw new CambriaApiException(errResJson); } } catch (IOException | CambriaApiException | ConfigDbException | AccessDeniedException | TopicExistsException | missingReqdSetting | UnavailableException e) { throw e; } } // Send error response if user does not provide Authorization else { ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, DMaaPResponseCode.UNABLE_TO_AUTHORIZE.getResponseCode(), NO_USER_PERMISSION, null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent", ctx.getRequest().getRemoteHost()); LOGGER.info(errRes.toString()); throw new CambriaApiException(errRes); } } @SuppressWarnings("unchecked") @POST @Produces("application/json") @Path("/deletewhitelist") public void deleteWhiteList(InputStream msg) throws Exception { DMaaPContext ctx = getDmaapContext(); if (checkMirrorMakerPermission(ctx, AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeruser.aaf"))) { loadProperty(); String input = null; try { input = IOUtils.toString(msg, "UTF-8"); if (input != null && input.length() > 0) { input = removeExtraChar(input); } // Check if it is correct Json object JSONObject jsonOb = null; try { jsonOb = new JSONObject(input); } catch (JSONException ex) { errResJson.setErrorMessage(errorMessages.getIncorrectJson()); LOGGER.info(errResJson.toString()); throw new CambriaApiException(errResJson); } // Check if the request has name and name contains only alpha // numeric, // check if the request has namespace and // check if the request has whitelistTopicName if (jsonOb.length() == 3 && jsonOb.has("name") && isAlphaNumeric(jsonOb.getString("name")) && jsonOb.has("namespace") && jsonOb.has("whitelistTopicName") && isAlphaNumeric(jsonOb.getString("whitelistTopicName").substring( jsonOb.getString("whitelistTopicName").lastIndexOf(".") + 1, jsonOb.getString("whitelistTopicName").length()))) { String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString("namespace") + "|create"; // Check if the user have create permission for the // namespace if (checkMirrorMakerPermission(ctx, permission)) { JSONObject listAll = new JSONObject(); JSONObject emptyObject = new JSONObject(); // Create a listAllMirrorMaker Json object try { listAll.put("listAllMirrorMaker", emptyObject); } catch (JSONException e) { errResJson.setErrorMessage(errorMessages.getIncorrectJson()); LOGGER.info(errResJson.toString()); throw new CambriaApiException(errResJson); } // set a random number as messageID String randomStr = getRandomNum(); listAll.put("messageID", randomStr); InputStream inStream = null; // convert listAll Json object to InputStream object inStream = IOUtils.toInputStream(listAll.toString(), "UTF-8"); // call listAllMirrorMaker mirrorService.pushEvents(ctx, topic, inStream, null, null); // subscribe for listMirrorMaker long startTime = System.currentTimeMillis(); String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid); while (!isListMirrorMaker(msgFrmSubscribe, randomStr) && (System.currentTimeMillis() - startTime) < timeout) { msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid); } JSONObject jsonObj = new JSONObject(); JSONArray jsonArray = null; JSONArray listMirrorMaker = null; if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0 && isListMirrorMaker(msgFrmSubscribe, randomStr)) { msgFrmSubscribe = removeExtraChar(msgFrmSubscribe); jsonArray = new JSONArray(msgFrmSubscribe); for (int i = 0; i < jsonArray.length(); i++) { jsonObj = jsonArray.getJSONObject(i); if (jsonObj.has("messageID") && jsonObj.get("messageID").equals(randomStr) && jsonObj.has("listMirrorMaker")) { listMirrorMaker = jsonObj.getJSONArray("listMirrorMaker"); break; } } String whitelist = null; for (int i = 0; i < listMirrorMaker.length(); i++) { JSONObject mm = new JSONObject(); mm = listMirrorMaker.getJSONObject(i); String name = mm.getString("name"); if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) { whitelist = mm.getString("whitelist"); break; } } List topicList = new ArrayList(); if (whitelist != null) { topicList = Arrays.asList(whitelist.split(",")); } boolean removeTopic = false; String topicToRemove = jsonOb.getString("whitelistTopicName"); if (topicList.contains(topicToRemove)) { removeTopic = true; } else { errResJson.setErrorMessage(errorMessages.getTopicNotExist()); LOGGER.info(errResJson.toString()); throw new CambriaApiException(errResJson); } if (removeTopic) { UpdateWhiteList updateWhiteList = new UpdateWhiteList(); MirrorMaker mirrorMaker = new MirrorMaker(); mirrorMaker.setName(jsonOb.getString("name")); if (StringUtils.isNotBlank((removeTopic(whitelist, topicToRemove)))) { mirrorMaker.setWhitelist(removeTopic(whitelist, topicToRemove)); } String newRandom = getRandomNum(); updateWhiteList.setMessageID(newRandom); updateWhiteList.setUpdateWhiteList(mirrorMaker); Gson g = new Gson(); g.toJson(updateWhiteList); InputStream inputStream = null; inputStream = IOUtils.toInputStream(g.toJson(updateWhiteList), "UTF-8"); callPubSubForWhitelist(newRandom, ctx, inputStream, jsonOb); // mmAgentUtil.getNamespace(topicToRemove)); } } else { ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE, DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(), "listWhiteList is not available, please make sure MirrorMakerAgent is running", null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent", ctx.getRequest().getRemoteHost()); LOGGER.info(errRes.toString()); throw new CambriaApiException(errRes); } } else { ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_USER_CREATE_PERMISSION, null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent", ctx.getRequest().getRemoteHost()); LOGGER.info(errRes.toString()); throw new CambriaApiException(errRes); } } else { errResJson.setErrorMessage("This is not a DeleteAllWhitelist request. Please try again."); LOGGER.info(errResJson.toString()); throw new CambriaApiException(errResJson); } } catch (IOException | CambriaApiException | ConfigDbException | AccessDeniedException | TopicExistsException | missingReqdSetting | UnavailableException e) { throw e; } } // Send error response if user does not provide Authorization else { ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_USER_PERMISSION, null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent", ctx.getRequest().getRemoteHost()); LOGGER.info(errRes.toString()); throw new CambriaApiException(errRes); } } private String getNamespace(String topic) { return topic.substring(0, topic.lastIndexOf(".")); } private String removeTopic(String whitelist, String topicToRemove) { List topicList = new ArrayList<>(); List newTopicList = new ArrayList<>(); if (whitelist.contains(",")) { topicList = Arrays.asList(whitelist.split(",")); } if (topicList.contains(topicToRemove)) { for (String topic : topicList) { if (!topic.equals(topicToRemove)) { newTopicList.add(topic); } } } String newWhitelist = StringUtils.join(newTopicList, ","); return newWhitelist; } public void callPubSubForWhitelist(String randomStr, DMaaPContext ctx, InputStream inStream, JSONObject jsonOb) { loadProperty(); try { String namespace = jsonOb.getString("namespace"); String mmName = jsonOb.getString("name"); String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid); mirrorService.pushEvents(ctx, topic, inStream, null, null); long startTime = System.currentTimeMillis(); while (!isListMirrorMaker(msgFrmSubscribe, randomStr) && (System.currentTimeMillis() - startTime) < timeout) { msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid); } JSONObject jsonObj = new JSONObject(); JSONArray jsonArray = null; JSONArray jsonArrayNamespace = null; if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0 && isListMirrorMaker(msgFrmSubscribe, randomStr)) { msgFrmSubscribe = removeExtraChar(msgFrmSubscribe); jsonArray = new JSONArray(msgFrmSubscribe); for (int i = 0; i < jsonArray.length(); i++) { jsonObj = jsonArray.getJSONObject(i); if (jsonObj.has("messageID") && jsonObj.get("messageID").equals(randomStr) && jsonObj.has("listMirrorMaker")) { jsonArrayNamespace = jsonObj.getJSONArray("listMirrorMaker"); } } JSONObject finalJasonObj = new JSONObject(); JSONArray finalJsonArray = new JSONArray(); if (jsonArrayNamespace != null) { for (int i = 0; i < jsonArrayNamespace.length(); i++) { JSONObject mmObj = new JSONObject(); mmObj = jsonArrayNamespace.getJSONObject(i); if (mmObj.has("name") && mmName.equals(mmObj.getString("name"))) { finalJsonArray.put(mmObj); } } } finalJasonObj.put("listMirrorMaker", finalJsonArray); DMaaPResponseBuilder.respondOk(ctx, finalJasonObj); } else { ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE, DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(), "listMirrorMaker is not available, please make sure MirrorMakerAgent is running", null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent", ctx.getRequest().getRemoteHost()); LOGGER.info(errRes.toString()); throw new CambriaApiException(errRes); } } catch (Exception e) { LOGGER.error("Error in callPubSubForWhitelist:", e); } } public JSONArray getListMirrorMaker(String msgFrmSubscribe, String randomStr) { JSONObject jsonObj = new JSONObject(); JSONArray jsonArray = new JSONArray(); JSONArray listMirrorMaker = new JSONArray(); msgFrmSubscribe = removeExtraChar(msgFrmSubscribe); jsonArray = new JSONArray(msgFrmSubscribe); jsonObj = jsonArray.getJSONObject(0); for (int i = 0; i < jsonArray.length(); i++) { jsonObj = jsonArray.getJSONObject(i); if (jsonObj.has("messageID") && jsonObj.get("messageID").equals(randomStr) && jsonObj.has("listMirrorMaker")) { listMirrorMaker = jsonObj.getJSONArray("listMirrorMaker"); break; } } return listMirrorMaker; } public JSONObject validateMMExists(DMaaPContext ctx, String name) throws Exception { // Create a listAllMirrorMaker Json object JSONObject listAll = new JSONObject(); try { listAll.put("listAllMirrorMaker", new JSONObject()); } catch (JSONException e) { LOGGER.error("Error while creating a listAllMirrorMaker Json object:", e); } // set a random number as messageID String randomStr = getRandomNum(); listAll.put("messageID", randomStr); InputStream inStream = null; // convert listAll Json object to InputStream object inStream = IOUtils.toInputStream(listAll.toString(), "UTF-8"); JSONObject listMirrorMaker = new JSONObject(); listMirrorMaker = callPubSub(randomStr, ctx, inStream, name, false); if (null != listMirrorMaker && listMirrorMaker.length() > 0) { listMirrorMaker.put("exists", true); return listMirrorMaker; } if(null != listMirrorMaker) { listMirrorMaker.put("exists", false); } return listMirrorMaker; } }