update MM agent API
[dmaap/messagerouter/messageservice.git] / src / main / java / org / onap / dmaap / service / MMRestService.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.service;
23
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.util.ArrayList;
27 import java.util.Date;
28 import java.util.List;
29
30 import javax.servlet.http.HttpServletRequest;
31 import javax.servlet.http.HttpServletResponse;
32 import javax.ws.rs.POST;
33 import javax.ws.rs.Path;
34 import javax.ws.rs.Produces;
35 import javax.ws.rs.core.Context;
36 import org.json.JSONObject;
37 import org.apache.commons.io.IOUtils;
38 import org.apache.commons.lang.StringUtils;
39 import org.apache.http.HttpStatus;
40
41 import com.att.eelf.configuration.EELFLogger;
42 import com.att.eelf.configuration.EELFManager;
43 import org.springframework.beans.factory.annotation.Autowired;
44 import org.springframework.beans.factory.annotation.Qualifier;
45 import org.springframework.stereotype.Component;
46
47 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
48 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder;
49 import org.onap.dmaap.dmf.mr.utils.Utils;
50 import com.att.nsa.configs.ConfigDbException;
51 import org.onap.dmaap.mmagent.*;
52 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
53 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
54 import com.google.gson.Gson;
55 import com.google.gson.JsonSyntaxException;
56
57 import edu.emory.mathcs.backport.java.util.Arrays;
58
59 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
60 import org.onap.dmaap.dmf.mr.CambriaApiException;
61 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
62
63 import org.json.JSONArray;
64 import org.json.JSONException;
65 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
66 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
67 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
68 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
69 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
70 import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException;
71 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
72 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
73 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
74 import org.onap.dmaap.dmf.mr.service.MMService;
75
76 /**
77  * Rest Service class for Mirror Maker proxy Rest Services
78  * 
79  * @author <a href="mailto:"></a>
80  *
81  * @since May 25, 2016
82  */
83
84 @Component
85 public class MMRestService {
86
87         private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(MMRestService.class);
88         private static final String NO_ADMIN_PERMISSION = "No Mirror Maker Admin permission.";
89         private static final String NO_USER_PERMISSION = "No Mirror Maker User permission.";
90         private static final String NO_USER_CREATE_PERMISSION = "No Mirror Maker User Create permission.";
91         private static final String NAME_DOES_NOT_MEET_REQUIREMENT = "Mirror Maker name can only contain alpha numeric";
92         private static final String INVALID_IPPORT = "This is not a valid IP:Port";
93         private static final String MIRROR_MAKERADMIN = "msgRtr.mirrormakeradmin.aaf";
94         private static final String MIRROR_MAKERUSER = "msgRtr.mirrormakeruser.aaf";
95         private static final String UTF_8 = "UTF-8";
96         private static final String MESSAGE = "message";
97         private static final String LISTMIRRORMAKER = "listMirrorMaker";
98         private static final String ERROR = "error";
99         private static final String NAMESPACE = "namespace";
100
101         private String topic;
102         private int timeout;
103         private String consumergroup;
104         private String consumerid;
105
106         @Autowired
107         @Qualifier("configurationReader")
108         private ConfigurationReader configReader;
109
110         @Context
111         private HttpServletRequest request;
112
113         @Context
114         private HttpServletResponse response;
115
116         @Autowired
117         private MMService mirrorService;
118
119         @Autowired
120         private DMaaPErrorMessages errorMessages;
121
122         private ErrorResponse errResJson = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
123                         DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), "", null, Utils.getFormattedDate(new Date()), topic,
124                         null, null, "mirrorMakerAgent", null);
125
126         private DMaaPAAFAuthenticator dmaapAAFauthenticator = new DMaaPAAFAuthenticatorImpl();
127
128         /**
129          * This method is used for taking Configuration Object,HttpServletRequest
130          * Object,HttpServletRequest HttpServletResponse Object,HttpServletSession
131          * Object.
132          * 
133          * @return DMaaPContext object from where user can get Configuration
134          *         Object,HttpServlet Object
135          * 
136          */
137         private DMaaPContext getDmaapContext() {
138                 DMaaPContext dmaapContext = new DMaaPContext();
139                 dmaapContext.setRequest(request);
140                 dmaapContext.setResponse(response);
141                 dmaapContext.setConfigReader(configReader);
142                 dmaapContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
143
144                 return dmaapContext;
145         }
146
147         @POST
148         @Produces("application/json")
149         @Path("/create")
150         public void callCreateMirrorMaker(InputStream msg) throws Exception {
151
152                 DMaaPContext ctx = getDmaapContext();
153                 if (checkMirrorMakerPermission(ctx,
154                                 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeradmin.aaf"))) {
155
156                         loadProperty();
157                         String input = null;
158                         String randomStr = getRandomNum();
159
160                         InputStream inStream = null;
161                         Gson gson = new Gson();
162                         CreateMirrorMaker createMirrorMaker = new CreateMirrorMaker();
163                         LOGGER.info("Starting Create MirrorMaker");
164                         try {
165                                 input = IOUtils.toString(msg, "UTF-8");
166
167                                 if (input != null && input.length() > 0) {
168                                         input = removeExtraChar(input);
169                                 }
170
171                                 // Check if the request has CreateMirrorMaker
172                                 try {
173                                         createMirrorMaker = gson.fromJson(input, CreateMirrorMaker.class);
174
175                                 } catch (JsonSyntaxException ex) {
176
177                                         errResJson.setErrorMessage(errorMessages.getIncorrectJson() + ex.getMessage());
178                                         LOGGER.info(errResJson.toString());
179                                         throw new CambriaApiException(errResJson);
180                                 }
181
182                                 // send error message if it is not a CreateMirrorMaker request.
183                                 if (createMirrorMaker.getCreateMirrorMaker() == null) {
184
185                                         errResJson.setErrorMessage("This is not a CreateMirrorMaker request. Please try again.");
186                                         LOGGER.info(errResJson.toString());
187                                         throw new CambriaApiException(errResJson);
188                                 } else {
189                                         createMirrorMaker.validateJSON();
190                                 }
191
192                                 String name = createMirrorMaker.getCreateMirrorMaker().getName();
193
194                                 // if empty, blank name is entered
195                                 if (StringUtils.isBlank(name)) {
196
197                                         errResJson.setErrorMessage("Name can not be empty or blank.");
198                                         LOGGER.info(errResJson.toString());
199                                         throw new CambriaApiException(errResJson);
200                                 }
201
202                                 // Check if the name contains only Alpha Numeric
203                                 else if (!isAlphaNumeric(name)) {
204
205                                         errResJson.setErrorMessage(NAME_DOES_NOT_MEET_REQUIREMENT);
206                                         LOGGER.info(errResJson.toString());
207                                         throw new CambriaApiException(errResJson);
208
209                                 } else {
210
211                                         if (null == createMirrorMaker.getMessageID() || createMirrorMaker.getMessageID().isEmpty()) {
212                                                 createMirrorMaker.setMessageID(randomStr);
213                                         }
214                                         inStream = IOUtils.toInputStream(gson.toJson(createMirrorMaker), "UTF-8");
215                                         JSONObject existMirrorMaker = validateMMExists(ctx, name);
216                                         if (!(boolean) existMirrorMaker.get("exists")) {
217                                                 JSONObject finalJsonObj = callPubSub(createMirrorMaker.getMessageID(), ctx, inStream, name,
218                                                                 false);
219                                                 DMaaPResponseBuilder.respondOk(ctx, finalJsonObj);
220                                         } else {
221
222                                                 errResJson.setErrorMessage("MirrorMaker " + name + " already exists");
223                                                 LOGGER.info(errResJson.toString());
224                                                 throw new CambriaApiException(errResJson);
225
226                                         }
227                                 }
228
229                         } catch (IOException e) {
230
231                                 throw e;
232                         }
233                 }
234                 // Send error response if user does not provide Authorization
235                 else {
236                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
237                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_ADMIN_PERMISSION, null,
238                                         Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
239                                         ctx.getRequest().getRemoteHost());
240                         LOGGER.info(errRes.toString());
241                         throw new CambriaApiException(errRes);
242                 }
243
244         }
245
246         @POST
247         @Produces("application/json")
248         @Path("/listall")
249         public void callListAllMirrorMaker(InputStream msg) throws Exception {
250
251                 DMaaPContext ctx = getDmaapContext();
252
253                 if (checkMirrorMakerPermission(ctx,
254                                 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeradmin.aaf"))) {
255
256                         loadProperty();
257
258                         String input = null;
259                         Gson gson = new Gson();
260
261                         try {
262                                 input = IOUtils.toString(msg, "UTF-8");
263
264                                 if (input != null && input.length() > 0) {
265                                         input = removeExtraChar(input);
266                                 }
267
268                                 String randomStr = getRandomNum();
269                                 JSONObject jsonOb = null;
270
271                                 try {
272                                         jsonOb = new JSONObject(input);
273
274                                 } catch (JSONException ex) {
275                                         errResJson.setErrorMessage(errorMessages.getIncorrectJson());
276                                         LOGGER.info(errResJson.toString());
277                                         throw new CambriaApiException(errResJson);
278                                 }
279
280                                 // Check if request has listAllMirrorMaker and
281                                 // listAllMirrorMaker is empty
282                                 if (jsonOb.has("listAllMirrorMaker") && jsonOb.getJSONObject("listAllMirrorMaker").length() == 0) {
283
284                                         if (!jsonOb.has("messageID") || jsonOb.isNull("messageID")) {
285                                                 jsonOb.put("messageID", randomStr);
286                                         }
287
288                                         InputStream inStream = null;
289                                         MirrorMaker mirrormaker = gson.fromJson(input, MirrorMaker.class);
290
291                                         try {
292                                                 inStream = IOUtils.toInputStream(jsonOb.toString(), "UTF-8");
293
294                                         } catch (IOException ioe) {
295                                                 throw ioe;
296                                         }
297
298                                         JSONObject responseJson = callPubSub(jsonOb.getString("messageID"), ctx, inStream, mirrormaker.name,
299                                                         true);
300                                         DMaaPResponseBuilder.respondOk(ctx, responseJson);
301
302                                 } else {
303
304                                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
305                                                         DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
306                                                         "This is not a ListAllMirrorMaker request. Please try again.", null,
307                                                         Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
308                                                         ctx.getRequest().getRemoteHost());
309                                         LOGGER.info(errRes.toString());
310                                         throw new CambriaApiException(errRes);
311                                 }
312
313                         } catch (IOException ioe) {
314
315                                 throw ioe;
316                         }
317
318                 } else {
319
320                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
321                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_ADMIN_PERMISSION, null,
322                                         Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
323                                         ctx.getRequest().getRemoteHost());
324                         LOGGER.info(errRes.toString());
325                         throw new CambriaApiException(errRes);
326                 }
327         }
328
329         @POST
330         @Produces("application/json")
331         @Path("/update")
332         public void callUpdateMirrorMaker(InputStream msg) throws Exception {
333                 DMaaPContext ctx = getDmaapContext();
334                 if (checkMirrorMakerPermission(ctx,
335                                 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeradmin.aaf"))) {
336
337                         loadProperty();
338                         String input = null;
339                         String randomStr = getRandomNum();
340
341                         InputStream inStream = null;
342                         Gson gson = new Gson();
343                         UpdateMirrorMaker updateMirrorMaker = new UpdateMirrorMaker();
344                         JSONObject jsonOb, jsonObInput = null;
345
346                         try {
347                                 input = IOUtils.toString(msg, "UTF-8");
348
349                                 if (input != null && input.length() > 0) {
350                                         input = removeExtraChar(input);
351                                 }
352
353                                 // Check if the request has UpdateMirrorMaker
354                                 try {
355                                         updateMirrorMaker = gson.fromJson(input, UpdateMirrorMaker.class);
356                                         jsonOb = new JSONObject(input);
357                                         jsonObInput = (JSONObject) jsonOb.get("updateMirrorMaker");
358
359                                 } catch (JsonSyntaxException ex) {
360
361                                         errResJson.setErrorMessage(errorMessages.getIncorrectJson() + ex.getMessage());
362                                         LOGGER.info(errResJson.toString());
363                                         throw new CambriaApiException(errResJson);
364                                 }
365
366                                 // send error message if it is not a UpdateMirrorMaker request.
367                                 if (updateMirrorMaker.getUpdateMirrorMaker() == null) {
368
369                                         errResJson.setErrorMessage("This is not a UpdateMirrorMaker request. Please try again.");
370                                         LOGGER.info(errResJson.toString());
371                                         throw new CambriaApiException(errResJson);
372                                 } else {
373                                         updateMirrorMaker.validateJSON(jsonObInput);
374                                 }
375
376                                 String name = updateMirrorMaker.getUpdateMirrorMaker().getName();
377                                 // if empty, blank name is entered
378                                 if (StringUtils.isBlank(name)) {
379
380                                         errResJson.setErrorMessage("Name can not be empty or blank.");
381                                         LOGGER.info(errResJson.toString());
382                                         throw new CambriaApiException(errResJson);
383                                 }
384
385                                 // Check if the name contains only Alpha Numeric
386                                 else if (!isAlphaNumeric(name)) {
387                                         errResJson.setErrorMessage(NAME_DOES_NOT_MEET_REQUIREMENT);
388                                         LOGGER.info(errResJson.toString());
389                                         throw new CambriaApiException(errResJson);
390
391                                 }
392
393                                 // Set a random number as messageID, convert Json Object to
394                                 // InputStream and finally call publisher and subscriber
395                                 else {
396
397                                         if (null == updateMirrorMaker.getMessageID() || updateMirrorMaker.getMessageID().isEmpty()) {
398                                                 updateMirrorMaker.setMessageID(randomStr);
399                                         }
400
401                                         JSONObject existMirrorMaker = validateMMExists(ctx, name);
402
403                                         if ((boolean) existMirrorMaker.get("exists")) {
404                                                 JSONObject existMM = (JSONObject) existMirrorMaker.get("listMirrorMaker");
405
406                                                 if (!jsonObInput.has("numStreams")) {
407                                                         updateMirrorMaker.getUpdateMirrorMaker().setNumStreams(existMM.getInt("numStreams"));
408                                                 }
409                                                 if (!jsonObInput.has("enablelogCheck")) {
410                                                         updateMirrorMaker.getUpdateMirrorMaker()
411                                                                         .setEnablelogCheck(existMM.getBoolean("enablelogCheck"));
412                                                 }
413                                                 inStream = IOUtils.toInputStream(gson.toJson(updateMirrorMaker), "UTF-8");
414                                                 JSONObject finalJsonObj = callPubSub(updateMirrorMaker.getMessageID(), ctx, inStream, name,
415                                                                 false);
416                                                 DMaaPResponseBuilder.respondOk(ctx, finalJsonObj);
417                                         } else {
418
419                                                 errResJson.setErrorMessage("MirrorMaker " + name + " does not exist");
420                                                 LOGGER.info(errResJson.toString());
421                                                 throw new CambriaApiException(errResJson);
422
423                                         }
424
425                                 }
426
427                         } catch (IOException e) {
428
429                                 e.printStackTrace();
430                         }
431                 }
432                 // Send error response if user does not provide Authorization
433                 else {
434                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
435                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_ADMIN_PERMISSION, null,
436                                         Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
437                                         ctx.getRequest().getRemoteHost());
438                         LOGGER.info(errRes.toString());
439                         throw new CambriaApiException(errRes);
440                 }
441         }
442
443         @POST
444         @Produces("application/json")
445         @Path("/delete")
446         public void callDeleteMirrorMaker(InputStream msg) throws JSONException, Exception {
447                 DMaaPContext ctx = getDmaapContext();
448
449                 if (checkMirrorMakerPermission(ctx,
450                                 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeradmin.aaf"))) {
451
452                         loadProperty();
453
454                         String input = null;
455                         Gson gson = new Gson();
456                         MirrorMaker mirrormaker = new MirrorMaker();
457
458                         try {
459                                 input = IOUtils.toString(msg, "UTF-8");
460
461                                 if (input != null && input.length() > 0) {
462                                         input = removeExtraChar(input);
463                                 }
464
465                                 String randomStr = getRandomNum();
466                                 JSONObject jsonOb = null;
467
468                                 try {
469                                         jsonOb = new JSONObject(input);
470                                         mirrormaker = gson.fromJson(input, MirrorMaker.class);
471
472                                 } catch (JSONException ex) {
473
474                                         errResJson.setErrorMessage(errorMessages.getIncorrectJson());
475                                         LOGGER.info(errResJson.toString());
476                                         throw new CambriaApiException(errResJson);
477                                 }
478
479                                 // Check if request has DeleteMirrorMaker and
480                                 // DeleteMirrorMaker has MirrorMaker object with name variable
481                                 // and check if the name contain only alpha numeric
482
483                                 if (jsonOb.has("deleteMirrorMaker") && jsonOb.getJSONObject("deleteMirrorMaker").length() == 1
484                                                 && jsonOb.getJSONObject("deleteMirrorMaker").has("name")
485                                                 && !StringUtils.isBlank(jsonOb.getJSONObject("deleteMirrorMaker").getString("name"))
486                                                 && isAlphaNumeric(jsonOb.getJSONObject("deleteMirrorMaker").getString("name"))) {
487
488                                         if (!jsonOb.has("messageID") || jsonOb.isNull("messageID")) {
489                                                 jsonOb.put("messageID", randomStr);
490                                         }
491
492                                         InputStream inStream = null;
493
494                                         try {
495                                                 inStream = IOUtils.toInputStream(jsonOb.toString(), "UTF-8");
496
497                                         } catch (IOException ioe) {
498                                                 ioe.printStackTrace();
499                                         }
500                                         JSONObject deleteMM = (JSONObject) jsonOb.getJSONObject("deleteMirrorMaker");
501
502                                         JSONObject existMirrorMaker = validateMMExists(ctx, deleteMM.getString("name"));
503
504                                         if ((boolean) existMirrorMaker.get("exists")) {
505
506                                                 JSONObject finalJsonObj = callPubSub(jsonOb.getString("messageID"), ctx, inStream,
507                                                                 mirrormaker.name, false);
508                                                 DMaaPResponseBuilder.respondOk(ctx, finalJsonObj);
509                                         } else {
510
511                                                 errResJson.setErrorMessage("MirrorMaker " + deleteMM.getString("name") + " does not exist");
512                                                 LOGGER.info(errResJson.toString());
513                                                 throw new CambriaApiException(errResJson);
514
515                                         }
516
517                                 } else {
518
519                                         errResJson.setErrorMessage("This is not a DeleteMirrorMaker request. Please try again.");
520                                         LOGGER.info(errResJson.toString());
521                                         throw new CambriaApiException(errResJson);
522
523                                 }
524
525                         } catch (IOException ioe) {
526
527                                 throw ioe;
528                         }
529
530                 } else {
531
532                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
533                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_ADMIN_PERMISSION, null,
534                                         Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
535                                         ctx.getRequest().getRemoteHost());
536                         LOGGER.info(errRes.toString());
537                         throw new CambriaApiException(errRes);
538                 }
539         }
540
541         private boolean isListMirrorMaker(String msg, String messageID) {
542                 String topicmsg = msg;
543                 topicmsg = removeExtraChar(topicmsg);
544
545                 JSONObject jObj;
546                 JSONArray jArray;
547                 boolean exist = false;
548
549                 if (!StringUtils.isBlank(topicmsg) && topicmsg.length() > 2) {
550                         jArray = new JSONArray(topicmsg);
551
552                         for (int i = 0; i < jArray.length(); i++) {
553                                 jObj = jArray.getJSONObject(i);
554
555                                 JSONObject obj = new JSONObject();
556                                 if (jObj.has(MESSAGE)) {
557                                         obj = jObj.getJSONObject(MESSAGE);
558                                 }
559                                 if (obj.has("messageID") && obj.get("messageID").equals(messageID) && obj.has(LISTMIRRORMAKER)) {
560                                         exist = true;
561                                         break;
562                                 }
563                         }
564                 }
565                 return exist;
566         }
567
568         private void loadProperty() {
569
570                 this.timeout = Integer.parseInt(
571                                 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.timeout").trim());
572                 this.topic = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.topic").trim();
573                 this.consumergroup = AJSCPropertiesMap
574                                 .getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.consumergroup").trim();
575                 this.consumerid = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.consumerid")
576                                 .trim();
577         }
578
579         private String removeExtraChar(String message) {
580                 String str = message;
581                 str = checkJsonFormate(str);
582
583                 if (str != null && str.length() > 0) {
584                         str = str.replace("\\", "");
585                         str = str.replace("\"{", "{");
586                         str = str.replace("}\"", "}");
587                 }
588                 return str;
589         }
590
591         private String getRandomNum() {
592                 long random = Math.round(Math.random() * 89999) + 10000;
593                 String strLong = Long.toString(random);
594                 return strLong;
595         }
596
597         private boolean isAlphaNumeric(String name) {
598                 String pattern = "^[a-zA-Z0-9]*$";
599                 if (name.matches(pattern)) {
600                         return true;
601                 }
602                 return false;
603         }
604
605         private String checkJsonFormate(String jsonStr) {
606
607                 String json = jsonStr;
608                 if (jsonStr != null && jsonStr.length() > 0 && jsonStr.startsWith("[") && !jsonStr.endsWith("]")) {
609                         json = json + "]";
610                 }
611                 return json;
612         }
613
614         private boolean checkMirrorMakerPermission(DMaaPContext ctx, String permission) {
615
616                 boolean hasPermission = false;
617
618                 if (dmaapAAFauthenticator.aafAuthentication(ctx.getRequest(), permission)) {
619                         hasPermission = true;
620                 }
621                 return hasPermission;
622         }
623
624         public JSONObject callPubSub(String randomstr, DMaaPContext ctx, InputStream inStream, String name, boolean listAll)
625                         throws Exception {
626                 loadProperty();
627                 JSONObject jsonObj = new JSONObject();
628                 JSONObject finalJsonObj = new JSONObject();
629                 JSONArray jsonArray = null;
630                 try {
631                         String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
632                         mirrorService.pushEvents(ctx, topic, inStream, null, null);
633                         long startTime = System.currentTimeMillis();
634
635                         while (!isListMirrorMaker(msgFrmSubscribe, randomstr)
636                                         && ((System.currentTimeMillis() - startTime) < timeout)) {
637                                 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
638
639                         }
640
641                         if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
642                                         && isListMirrorMaker(msgFrmSubscribe, randomstr)) {
643                                 msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
644
645                                 jsonArray = new JSONArray(msgFrmSubscribe);
646                                 jsonObj = jsonArray.getJSONObject(0);
647                                 if (jsonObj.has("listMirrorMaker")) {
648                                         jsonArray = (JSONArray) jsonObj.get("listMirrorMaker");
649                                         if (true == listAll) {
650                                                 return jsonObj;
651                                         } else {
652                                                 for (int i = 0; i < jsonArray.length(); i++) {
653                                                         jsonObj = jsonArray.getJSONObject(i);
654                                                         if (null != name && !name.isEmpty()) {
655                                                                 if (jsonObj.getString("name").equals(name)) {
656                                                                         finalJsonObj.put("listMirrorMaker", jsonObj);
657                                                                         break;
658                                                                 }
659                                                         } else {
660                                                                 finalJsonObj.put("listMirrorMaker", jsonObj);
661                                                         }
662
663                                                 }
664                                         }
665                                 }
666                                 return finalJsonObj;
667
668                         } else {
669
670                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
671                                                 DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(),
672                                                 "listMirrorMaker is not available, please make sure MirrorMakerAgent is running", null,
673                                                 Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
674                                                 ctx.getRequest().getRemoteHost());
675                                 LOGGER.info(errRes.toString());
676                                 throw new CambriaApiException(errRes);
677
678                         }
679
680                 } catch (Exception e) {
681
682                         throw e;
683                 }
684
685         }
686
687         private void sendErrResponse(DMaaPContext ctx, String errMsg) {
688                 JSONObject err = new JSONObject();
689                 err.append(ERROR, errMsg);
690
691                 try {
692                         DMaaPResponseBuilder.respondOk(ctx, err);
693                         LOGGER.error(errMsg);
694
695                 } catch (JSONException | IOException e) {
696                         LOGGER.error("Error at sendErrResponse method:" + errMsg + "Exception name:" + e);
697                 }
698         }
699
700         @SuppressWarnings("unchecked")
701         @POST
702         @Produces("application/json")
703         @Path("/listallwhitelist")
704         public void listWhiteList(InputStream msg) throws Exception {
705
706                 DMaaPContext ctx = getDmaapContext();
707                 if (checkMirrorMakerPermission(ctx,
708                                 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeruser.aaf"))) {
709
710                         loadProperty();
711                         String input = null;
712
713                         try {
714                                 input = IOUtils.toString(msg, "UTF-8");
715
716                                 if (input != null && input.length() > 0) {
717                                         input = removeExtraChar(input);
718                                 }
719
720                                 // Check if it is correct Json object
721                                 JSONObject jsonOb = null;
722
723                                 try {
724                                         jsonOb = new JSONObject(input);
725
726                                 } catch (JSONException ex) {
727
728                                         errResJson.setErrorMessage(errorMessages.getIncorrectJson());
729                                         LOGGER.info(errResJson.toString());
730                                         throw new CambriaApiException(errResJson);
731
732                                 }
733
734                                 // Check if the request has name and name contains only alpha
735                                 // numeric
736                                 // and check if the request has namespace and namespace contains
737                                 // only alpha numeric
738                                 if (jsonOb.length() == 2 && jsonOb.has("name") && !StringUtils.isBlank(jsonOb.getString("name"))
739                                                 && isAlphaNumeric(jsonOb.getString("name")) && jsonOb.has("namespace")
740                                                 && !StringUtils.isBlank(jsonOb.getString("namespace"))) {
741
742                                         String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
743                                                         "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString("namespace") + "|create";
744
745                                         // Check if the user have create permission for the
746                                         // namespace
747                                         if (checkMirrorMakerPermission(ctx, permission)) {
748
749                                                 JSONObject listAll = new JSONObject();
750                                                 JSONObject emptyObject = new JSONObject();
751
752                                                 // Create a listAllMirrorMaker Json object
753                                                 try {
754                                                         listAll.put("listAllMirrorMaker", emptyObject);
755
756                                                 } catch (JSONException e) {
757
758                                                         errResJson.setErrorMessage(errorMessages.getIncorrectJson());
759                                                         LOGGER.info(errResJson.toString());
760                                                         throw new CambriaApiException(errResJson);
761                                                 }
762
763                                                 // set a random number as messageID
764                                                 String randomStr = getRandomNum();
765                                                 listAll.put("messageID", randomStr);
766                                                 InputStream inStream = null;
767
768                                                 // convert listAll Json object to InputStream object
769                                                 try {
770                                                         inStream = IOUtils.toInputStream(listAll.toString(), "UTF-8");
771
772                                                 } catch (IOException ioe) {
773                                                         ioe.printStackTrace();
774                                                 }
775                                                 JSONObject listMirrorMaker = new JSONObject();
776                                                 listMirrorMaker = callPubSub(randomStr, ctx, inStream, null, true);
777
778                                                 String whitelist = null;
779                                                 JSONArray listMMArray = new JSONArray();
780                                                 if (listMirrorMaker.has("listMirrorMaker")) {
781                                                         listMMArray = (JSONArray) listMirrorMaker.get("listMirrorMaker");
782                                                         for (int i = 0; i < listMMArray.length(); i++) {
783
784                                                                 JSONObject mm = new JSONObject();
785                                                                 mm = listMMArray.getJSONObject(i);
786                                                                 String name = mm.getString("name");
787
788                                                                 if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) {
789                                                                         whitelist = mm.getString("whitelist");
790                                                                         break;
791                                                                 }
792                                                         }
793
794                                                         if (!StringUtils.isBlank(whitelist)) {
795
796                                                                 List<String> topicList = new ArrayList<String>();
797                                                                 List<String> finalTopicList = new ArrayList<String>();
798                                                                 topicList = Arrays.asList(whitelist.split(","));
799
800                                                                 for (String topic : topicList) {
801                                                                         if (topic != null && !topic.equals("null")
802                                                                                         && getNamespace(topic).equals(jsonOb.getString("namespace"))) {
803
804                                                                                 finalTopicList.add(topic);
805                                                                         }
806                                                                 }
807
808                                                                 String topicNames = "";
809
810                                                                 if (finalTopicList.size() > 0) {
811                                                                         topicNames = StringUtils.join(finalTopicList, ",");
812                                                                 }
813
814                                                                 JSONObject listAllWhiteList = new JSONObject();
815                                                                 listAllWhiteList.put("name", jsonOb.getString("name"));
816                                                                 listAllWhiteList.put("whitelist", topicNames);
817
818                                                                 DMaaPResponseBuilder.respondOk(ctx, listAllWhiteList);
819                                                         }
820
821                                                 } else {
822
823                                                         errResJson.setErrorMessage(
824                                                                         "listWhiteList is not available, please make sure MirrorMakerAgent is running");
825                                                         LOGGER.info(errResJson.toString());
826                                                         throw new CambriaApiException(errResJson);
827
828                                                 }
829
830                                         } else {
831                                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
832                                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_USER_CREATE_PERMISSION,
833                                                                 null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
834                                                                 ctx.getRequest().getRemoteHost());
835                                                 LOGGER.info(errRes.toString());
836                                                 throw new CambriaApiException(errRes);
837                                         }
838
839                                 } else {
840
841                                         errResJson.setErrorMessage("This is not a ListAllWhitelist request. Please try again.");
842                                         LOGGER.info(errResJson.toString());
843                                         throw new CambriaApiException(errResJson);
844                                 }
845
846                         } catch (IOException e) {
847
848                                 e.printStackTrace();
849                         }
850                 } else {
851                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
852                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_USER_PERMISSION, null,
853                                         Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
854                                         ctx.getRequest().getRemoteHost());
855                         LOGGER.info(errRes.toString());
856                         throw new CambriaApiException(errRes);
857                 }
858         }
859
860         @SuppressWarnings("unchecked")
861         @POST
862         @Produces("application/json")
863         @Path("/createwhitelist")
864         public void createWhiteList(InputStream msg) throws Exception {
865
866                 DMaaPContext ctx = getDmaapContext();
867                 if (checkMirrorMakerPermission(ctx,
868                                 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeruser.aaf"))) {
869
870                         loadProperty();
871                         String input = null;
872
873                         try {
874                                 input = IOUtils.toString(msg, "UTF-8");
875
876                                 if (input != null && input.length() > 0) {
877                                         input = removeExtraChar(input);
878                                 }
879
880                                 // Check if it is correct Json object
881                                 JSONObject jsonOb = null;
882
883                                 try {
884                                         jsonOb = new JSONObject(input);
885
886                                 } catch (JSONException ex) {
887                                         errResJson.setErrorMessage(errorMessages.getIncorrectJson());
888                                         LOGGER.info(errResJson.toString());
889                                         throw new CambriaApiException(errResJson);
890                                 }
891
892                                 // Check if the request has name and name contains only alpha
893                                 // numeric,
894                                 // check if the request has namespace and
895                                 // check if the request has whitelistTopicName
896                                 // check if the topic name contains only alpha numeric
897                                 if (jsonOb.length() == 3 && jsonOb.has("name") && !StringUtils.isBlank(jsonOb.getString("name"))
898                                                 && isAlphaNumeric(jsonOb.getString("name")) && jsonOb.has("namespace")
899                                                 && !StringUtils.isBlank(jsonOb.getString("namespace")) && jsonOb.has("whitelistTopicName")
900                                                 && !StringUtils.isBlank(jsonOb.getString("whitelistTopicName"))
901                                                 && isAlphaNumeric(jsonOb.getString("whitelistTopicName").substring(
902                                                                 jsonOb.getString("whitelistTopicName").lastIndexOf(".") + 1, jsonOb
903                                                                                 .getString("whitelistTopicName").length()))) {
904
905                                         String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
906                                                         "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString("namespace") + "|create";
907
908                                         // Check if the user have create permission for the
909                                         // namespace
910                                         if (checkMirrorMakerPermission(ctx, permission)) {
911
912                                                 JSONObject listAll = new JSONObject();
913                                                 JSONObject emptyObject = new JSONObject();
914
915                                                 // Create a listAllMirrorMaker Json object
916                                                 try {
917                                                         listAll.put("listAllMirrorMaker", emptyObject);
918
919                                                 } catch (JSONException e) {
920
921                                                         errResJson.setErrorMessage(errorMessages.getIncorrectJson());
922                                                         LOGGER.info(errResJson.toString());
923                                                         throw new CambriaApiException(errResJson);
924                                                 }
925
926                                                 // set a random number as messageID
927                                                 String randomStr = getRandomNum();
928                                                 listAll.put("messageID", randomStr);
929                                                 InputStream inStream = null;
930
931                                                 // convert listAll Json object to InputStream object
932                                                 try {
933                                                         inStream = IOUtils.toInputStream(listAll.toString(), "UTF-8");
934
935                                                 } catch (IOException ioe) {
936                                                         ioe.printStackTrace();
937                                                 }
938                                                 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
939                                                 // call listAllMirrorMaker
940                                                 mirrorService.pushEvents(ctx, topic, inStream, null, null);
941
942                                                 // subscribe for listMirrorMaker
943                                                 long startTime = System.currentTimeMillis();
944
945                                                 while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
946                                                                 && (System.currentTimeMillis() - startTime) < timeout) {
947                                                         msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
948                                                 }
949
950                                                 JSONArray listMirrorMaker = null;
951
952                                                 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
953                                                                 && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
954
955                                                         listMirrorMaker = getListMirrorMaker(msgFrmSubscribe, randomStr);
956                                                         String whitelist = null;
957
958                                                         for (int i = 0; i < listMirrorMaker.length(); i++) {
959                                                                 JSONObject mm = new JSONObject();
960                                                                 mm = listMirrorMaker.getJSONObject(i);
961                                                                 String name = mm.getString("name");
962
963                                                                 if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) {
964                                                                         whitelist = mm.getString("whitelist");
965                                                                         break;
966                                                                 }
967                                                         }
968
969                                                         List<String> topicList = new ArrayList<String>();
970                                                         List<String> finalTopicList = new ArrayList<String>();
971
972                                                         if (whitelist != null) {
973                                                                 topicList = Arrays.asList(whitelist.split(","));
974                                                         }
975
976                                                         for (String st : topicList) {
977                                                                 if (!StringUtils.isBlank(st)) {
978                                                                         finalTopicList.add(st);
979                                                                 }
980                                                         }
981
982                                                         String newTopic = jsonOb.getString("whitelistTopicName");
983
984                                                         if (!topicList.contains(newTopic)
985                                                                         && getNamespace(newTopic).equals(jsonOb.getString("namespace"))) {
986
987                                                                 UpdateWhiteList updateWhiteList = new UpdateWhiteList();
988                                                                 MirrorMaker mirrorMaker = new MirrorMaker();
989                                                                 mirrorMaker.setName(jsonOb.getString("name"));
990                                                                 finalTopicList.add(newTopic);
991                                                                 String newWhitelist = "";
992
993                                                                 if (finalTopicList.size() > 0) {
994                                                                         newWhitelist = StringUtils.join(finalTopicList, ",");
995                                                                 }
996
997                                                                 mirrorMaker.setWhitelist(newWhitelist);
998
999                                                                 String newRandom = getRandomNum();
1000                                                                 updateWhiteList.setMessageID(newRandom);
1001                                                                 updateWhiteList.setUpdateWhiteList(mirrorMaker);
1002
1003                                                                 Gson g = new Gson();
1004                                                                 g.toJson(updateWhiteList);
1005                                                                 InputStream inputStream = null;
1006                                                                 inputStream = IOUtils.toInputStream(g.toJson(updateWhiteList), "UTF-8");
1007                                                                 // callPubSub(newRandom, ctx, inputStream);
1008                                                                 callPubSubForWhitelist(newRandom, ctx, inputStream, jsonOb);
1009
1010                                                         } else if (topicList.contains(newTopic)) {
1011                                                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR,
1012                                                                                 DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), "The topic already exist.",
1013                                                                                 null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
1014                                                                                 ctx.getRequest().getRemoteHost());
1015                                                                 LOGGER.info(errRes.toString());
1016                                                                 throw new CambriaApiException(errRes);
1017
1018                                                         } else if (!getNamespace(newTopic).equals(jsonOb.getString("namespace"))) {
1019                                                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR,
1020                                                                                 DMaaPResponseCode.INCORRECT_JSON.getResponseCode(),
1021                                                                                 "The namespace of the topic does not match with the namespace you provided.",
1022                                                                                 null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
1023                                                                                 ctx.getRequest().getRemoteHost());
1024                                                                 LOGGER.info(errRes.toString());
1025                                                                 throw new CambriaApiException(errRes);
1026                                                         }
1027                                                 } else {
1028
1029                                                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
1030                                                                         DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
1031                                                                         "listWhiteList is not available, please make sure MirrorMakerAgent is running",
1032                                                                         null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
1033                                                                         ctx.getRequest().getRemoteHost());
1034                                                         LOGGER.info(errRes.toString());
1035                                                         throw new CambriaApiException(errRes);
1036                                                 }
1037
1038                                         } else {
1039                                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
1040                                                                 DMaaPResponseCode.UNABLE_TO_AUTHORIZE.getResponseCode(), NO_USER_CREATE_PERMISSION,
1041                                                                 null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
1042                                                                 ctx.getRequest().getRemoteHost());
1043                                                 LOGGER.info(errRes.toString());
1044                                                 throw new CambriaApiException(errRes);
1045                                         }
1046
1047                                 } else {
1048
1049                                         errResJson.setErrorMessage("This is not a createWhitelist request. Please try again.");
1050                                         LOGGER.info(errResJson.toString());
1051                                         throw new CambriaApiException(errResJson);
1052                                 }
1053
1054                         } catch (IOException | CambriaApiException | ConfigDbException | AccessDeniedException
1055                                         | TopicExistsException | missingReqdSetting | UnavailableException e) {
1056
1057                                 throw e;
1058                         }
1059                 }
1060                 // Send error response if user does not provide Authorization
1061                 else {
1062                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
1063                                         DMaaPResponseCode.UNABLE_TO_AUTHORIZE.getResponseCode(), NO_USER_PERMISSION, null,
1064                                         Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
1065                                         ctx.getRequest().getRemoteHost());
1066                         LOGGER.info(errRes.toString());
1067                         throw new CambriaApiException(errRes);
1068
1069                 }
1070         }
1071
1072         @SuppressWarnings("unchecked")
1073         @POST
1074         @Produces("application/json")
1075         @Path("/deletewhitelist")
1076         public void deleteWhiteList(InputStream msg) throws Exception {
1077
1078                 DMaaPContext ctx = getDmaapContext();
1079                 if (checkMirrorMakerPermission(ctx,
1080                                 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeruser.aaf"))) {
1081
1082                         loadProperty();
1083                         String input = null;
1084
1085                         try {
1086                                 input = IOUtils.toString(msg, "UTF-8");
1087
1088                                 if (input != null && input.length() > 0) {
1089                                         input = removeExtraChar(input);
1090                                 }
1091
1092                                 // Check if it is correct Json object
1093                                 JSONObject jsonOb = null;
1094
1095                                 try {
1096                                         jsonOb = new JSONObject(input);
1097
1098                                 } catch (JSONException ex) {
1099
1100                                         errResJson.setErrorMessage(errorMessages.getIncorrectJson());
1101                                         LOGGER.info(errResJson.toString());
1102                                         throw new CambriaApiException(errResJson);
1103
1104                                 }
1105
1106                                 // Check if the request has name and name contains only alpha
1107                                 // numeric,
1108                                 // check if the request has namespace and
1109                                 // check if the request has whitelistTopicName
1110                                 if (jsonOb.length() == 3 && jsonOb.has("name") && isAlphaNumeric(jsonOb.getString("name"))
1111                                                 && jsonOb.has("namespace") && jsonOb.has("whitelistTopicName")
1112                                                 && isAlphaNumeric(jsonOb.getString("whitelistTopicName").substring(
1113                                                                 jsonOb.getString("whitelistTopicName").lastIndexOf(".") + 1,
1114                                                                 jsonOb.getString("whitelistTopicName").length()))) {
1115
1116                                         String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
1117                                                         "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString("namespace") + "|create";
1118
1119                                         // Check if the user have create permission for the
1120                                         // namespace
1121                                         if (checkMirrorMakerPermission(ctx, permission)) {
1122
1123                                                 JSONObject listAll = new JSONObject();
1124                                                 JSONObject emptyObject = new JSONObject();
1125
1126                                                 // Create a listAllMirrorMaker Json object
1127                                                 try {
1128                                                         listAll.put("listAllMirrorMaker", emptyObject);
1129
1130                                                 } catch (JSONException e) {
1131
1132                                                         errResJson.setErrorMessage(errorMessages.getIncorrectJson());
1133                                                         LOGGER.info(errResJson.toString());
1134                                                         throw new CambriaApiException(errResJson);
1135                                                 }
1136
1137                                                 // set a random number as messageID
1138                                                 String randomStr = getRandomNum();
1139                                                 listAll.put("messageID", randomStr);
1140                                                 InputStream inStream = null;
1141
1142                                                 // convert listAll Json object to InputStream object
1143                                                 try {
1144                                                         inStream = IOUtils.toInputStream(listAll.toString(), "UTF-8");
1145
1146                                                 } catch (IOException ioe) {
1147                                                         ioe.printStackTrace();
1148                                                 }
1149                                                 // call listAllMirrorMaker
1150                                                 mirrorService.pushEvents(ctx, topic, inStream, null, null);
1151
1152                                                 // subscribe for listMirrorMaker
1153                                                 long startTime = System.currentTimeMillis();
1154                                                 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1155
1156                                                 while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
1157                                                                 && (System.currentTimeMillis() - startTime) < timeout) {
1158                                                         msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1159                                                 }
1160
1161                                                 JSONObject jsonObj = new JSONObject();
1162                                                 JSONArray jsonArray = null;
1163                                                 JSONArray listMirrorMaker = null;
1164
1165                                                 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
1166                                                                 && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
1167                                                         msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
1168                                                         jsonArray = new JSONArray(msgFrmSubscribe);
1169
1170                                                         for (int i = 0; i < jsonArray.length(); i++) {
1171                                                                 jsonObj = jsonArray.getJSONObject(i);
1172
1173                                                                 if (jsonObj.has("messageID") && jsonObj.get("messageID").equals(randomStr)
1174                                                                                 && jsonObj.has("listMirrorMaker")) {
1175                                                                         listMirrorMaker = jsonObj.getJSONArray("listMirrorMaker");
1176                                                                         break;
1177                                                                 }
1178                                                         }
1179                                                         String whitelist = null;
1180                                                         for (int i = 0; i < listMirrorMaker.length(); i++) {
1181
1182                                                                 JSONObject mm = new JSONObject();
1183                                                                 mm = listMirrorMaker.getJSONObject(i);
1184                                                                 String name = mm.getString("name");
1185
1186                                                                 if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) {
1187                                                                         whitelist = mm.getString("whitelist");
1188                                                                         break;
1189                                                                 }
1190                                                         }
1191
1192                                                         List<String> topicList = new ArrayList<String>();
1193
1194                                                         if (whitelist != null) {
1195                                                                 topicList = Arrays.asList(whitelist.split(","));
1196                                                         }
1197                                                         boolean removeTopic = false;
1198                                                         String topicToRemove = jsonOb.getString("whitelistTopicName");
1199
1200                                                         if (topicList.contains(topicToRemove)) {
1201                                                                 removeTopic = true;
1202                                                         } else {
1203                                                                 errResJson.setErrorMessage(errorMessages.getTopicNotExist());
1204                                                                 LOGGER.info(errResJson.toString());
1205                                                                 throw new CambriaApiException(errResJson);
1206                                                         }
1207
1208                                                         if (removeTopic) {
1209                                                                 UpdateWhiteList updateWhiteList = new UpdateWhiteList();
1210                                                                 MirrorMaker mirrorMaker = new MirrorMaker();
1211
1212                                                                 mirrorMaker.setName(jsonOb.getString("name"));
1213
1214                                                                 if (StringUtils.isNotBlank((removeTopic(whitelist, topicToRemove)))) {
1215                                                                         mirrorMaker.setWhitelist(removeTopic(whitelist, topicToRemove));
1216                                                                 }
1217
1218                                                                 String newRandom = getRandomNum();
1219
1220                                                                 updateWhiteList.setMessageID(newRandom);
1221                                                                 updateWhiteList.setUpdateWhiteList(mirrorMaker);
1222
1223                                                                 Gson g = new Gson();
1224                                                                 g.toJson(updateWhiteList);
1225
1226                                                                 InputStream inputStream = null;
1227                                                                 inputStream = IOUtils.toInputStream(g.toJson(updateWhiteList), "UTF-8");
1228                                                                 callPubSubForWhitelist(newRandom, ctx, inputStream, jsonOb);
1229                                                                 // mmAgentUtil.getNamespace(topicToRemove));
1230                                                         }
1231
1232                                                 } else {
1233
1234                                                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
1235                                                                         DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
1236                                                                         "listWhiteList is not available, please make sure MirrorMakerAgent is running",
1237                                                                         null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
1238                                                                         ctx.getRequest().getRemoteHost());
1239                                                         LOGGER.info(errRes.toString());
1240                                                         throw new CambriaApiException(errRes);
1241                                                 }
1242
1243                                         } else {
1244                                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
1245                                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_USER_CREATE_PERMISSION,
1246                                                                 null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
1247                                                                 ctx.getRequest().getRemoteHost());
1248                                                 LOGGER.info(errRes.toString());
1249                                                 throw new CambriaApiException(errRes);
1250                                         }
1251
1252                                 } else {
1253
1254                                         errResJson.setErrorMessage("This is not a DeleteAllWhitelist request. Please try again.");
1255                                         LOGGER.info(errResJson.toString());
1256                                         throw new CambriaApiException(errResJson);
1257
1258                                 }
1259
1260                         } catch (IOException | CambriaApiException | ConfigDbException | AccessDeniedException
1261                                         | TopicExistsException | missingReqdSetting | UnavailableException e) {
1262
1263                                 throw e;
1264                         }
1265                 }
1266                 // Send error response if user does not provide Authorization
1267                 else {
1268                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
1269                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_USER_PERMISSION, null,
1270                                         Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
1271                                         ctx.getRequest().getRemoteHost());
1272                         LOGGER.info(errRes.toString());
1273                         throw new CambriaApiException(errRes);
1274
1275                 }
1276         }
1277
1278         private String getNamespace(String topic) {
1279                 return topic.substring(0, topic.lastIndexOf("."));
1280         }
1281
1282         private String removeTopic(String whitelist, String topicToRemove) {
1283                 List<String> topicList = new ArrayList<>();
1284                 List<String> newTopicList = new ArrayList<>();
1285
1286                 if (whitelist.contains(",")) {
1287                         topicList = Arrays.asList(whitelist.split(","));
1288
1289                 }
1290
1291                 if (topicList.contains(topicToRemove)) {
1292                         for (String topic : topicList) {
1293                                 if (!topic.equals(topicToRemove)) {
1294                                         newTopicList.add(topic);
1295                                 }
1296                         }
1297                 }
1298
1299                 String newWhitelist = StringUtils.join(newTopicList, ",");
1300
1301                 return newWhitelist;
1302         }
1303
1304         public void callPubSubForWhitelist(String randomStr, DMaaPContext ctx, InputStream inStream, JSONObject jsonOb) {
1305
1306                 loadProperty();
1307                 try {
1308                         String namespace = jsonOb.getString("namespace");
1309                         String mmName = jsonOb.getString("name");
1310
1311                         String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1312                         mirrorService.pushEvents(ctx, topic, inStream, null, null);
1313                         long startTime = System.currentTimeMillis();
1314
1315                         while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
1316                                         && (System.currentTimeMillis() - startTime) < timeout) {
1317                                 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1318                         }
1319
1320                         JSONObject jsonObj = new JSONObject();
1321                         JSONArray jsonArray = null;
1322                         JSONArray jsonArrayNamespace = null;
1323
1324                         if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
1325                                         && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
1326                                 msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
1327                                 jsonArray = new JSONArray(msgFrmSubscribe);
1328
1329                                 for (int i = 0; i < jsonArray.length(); i++) {
1330                                         jsonObj = jsonArray.getJSONObject(i);
1331
1332                                         if (jsonObj.has("messageID") && jsonObj.get("messageID").equals(randomStr)
1333                                                         && jsonObj.has("listMirrorMaker")) {
1334                                                 jsonArrayNamespace = jsonObj.getJSONArray("listMirrorMaker");
1335                                         }
1336                                 }
1337                                 JSONObject finalJasonObj = new JSONObject();
1338                                 JSONArray finalJsonArray = new JSONArray();
1339
1340                                 for (int i = 0; i < jsonArrayNamespace.length(); i++) {
1341
1342                                         JSONObject mmObj = new JSONObject();
1343                                         mmObj = jsonArrayNamespace.getJSONObject(i);
1344                                         if (mmObj.has("name") && mmName.equals(mmObj.getString("name"))) {
1345
1346                                                 finalJsonArray.put(mmObj);
1347                                         }
1348
1349                                 }
1350                                 finalJasonObj.put("listMirrorMaker", finalJsonArray);
1351
1352                                 DMaaPResponseBuilder.respondOk(ctx, finalJasonObj);
1353
1354                         } else {
1355
1356                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
1357                                                 DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(),
1358                                                 "listMirrorMaker is not available, please make sure MirrorMakerAgent is running", null,
1359                                                 Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
1360                                                 ctx.getRequest().getRemoteHost());
1361                                 LOGGER.info(errRes.toString());
1362                                 throw new CambriaApiException(errRes);
1363                         }
1364
1365                 } catch (Exception e) {
1366                         e.printStackTrace();
1367                 }
1368         }
1369
1370         private String getWhitelistByNamespace(String originalWhitelist, String namespace) {
1371
1372                 String whitelist = null;
1373                 List<String> resultList = new ArrayList<>();
1374                 List<String> whitelistList = new ArrayList<>();
1375                 whitelistList = Arrays.asList(originalWhitelist.split(","));
1376
1377                 for (String topic : whitelistList) {
1378                         if (StringUtils.isNotBlank(originalWhitelist) && getNamespace(topic).equals(namespace)) {
1379                                 resultList.add(topic);
1380                         }
1381                 }
1382                 if (!resultList.isEmpty()) {
1383                         whitelist = StringUtils.join(resultList, ",");
1384                 }
1385
1386                 return whitelist;
1387         }
1388
1389         private JSONArray getListMirrorMaker(String msgFrmSubscribe, String randomStr) {
1390                 JSONObject jsonObj;
1391                 JSONArray jsonArray;
1392                 JSONArray listMirrorMaker = new JSONArray();
1393
1394                 msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
1395                 jsonArray = new JSONArray(msgFrmSubscribe);
1396
1397                 for (int i = 0; i < jsonArray.length(); i++) {
1398                         jsonObj = jsonArray.getJSONObject(i);
1399
1400                         JSONObject obj = new JSONObject();
1401                         if (jsonObj.has(MESSAGE)) {
1402                                 obj = jsonObj.getJSONObject(MESSAGE);
1403                         }
1404                         if (obj.has("messageID") && obj.get("messageID").equals(randomStr) && obj.has(LISTMIRRORMAKER)) {
1405                                 listMirrorMaker = obj.getJSONArray(LISTMIRRORMAKER);
1406                                 break;
1407                         }
1408                 }
1409                 return listMirrorMaker;
1410         }
1411
1412         public JSONObject validateMMExists(DMaaPContext ctx, String name) throws Exception {
1413                 // Create a listAllMirrorMaker Json object
1414                 JSONObject listAll = new JSONObject();
1415                 try {
1416                         listAll.put("listAllMirrorMaker", new JSONObject());
1417
1418                 } catch (JSONException e) {
1419
1420                         e.printStackTrace();
1421                 }
1422
1423                 // set a random number as messageID
1424                 String randomStr = getRandomNum();
1425                 listAll.put("messageID", randomStr);
1426                 InputStream inStream = null;
1427
1428                 // convert listAll Json object to InputStream object
1429                 try {
1430                         inStream = IOUtils.toInputStream(listAll.toString(), "UTF-8");
1431
1432                 } catch (IOException ioe) {
1433                         ioe.printStackTrace();
1434                 }
1435                 JSONObject listMirrorMaker = new JSONObject();
1436                 listMirrorMaker = callPubSub(randomStr, ctx, inStream, name, false);
1437                 if (null != listMirrorMaker && listMirrorMaker.length() > 0) {
1438                         listMirrorMaker.put("exists", true);
1439                         return listMirrorMaker;
1440
1441                 }
1442                 listMirrorMaker.put("exists", false);
1443                 return listMirrorMaker;
1444
1445         }
1446 }