update MM agent API
[dmaap/messagerouter/messageservice.git] / src / main / java / org / onap / dmaap / service / MMRestService.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.service;
23
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.util.ArrayList;
27 import java.util.Date;
28 import java.util.List;
29
30 import javax.servlet.http.HttpServletRequest;
31 import javax.servlet.http.HttpServletResponse;
32 import javax.ws.rs.POST;
33 import javax.ws.rs.Path;
34 import javax.ws.rs.Produces;
35 import javax.ws.rs.core.Context;
36 import org.json.JSONObject;
37 import org.apache.commons.io.IOUtils;
38 import org.apache.commons.lang.StringUtils;
39 import org.apache.http.HttpStatus;
40
41 import com.att.eelf.configuration.EELFLogger;
42 import com.att.eelf.configuration.EELFManager;
43 import org.springframework.beans.factory.annotation.Autowired;
44 import org.springframework.beans.factory.annotation.Qualifier;
45 import org.springframework.stereotype.Component;
46
47 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
48 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder;
49 import org.onap.dmaap.dmf.mr.utils.Utils;
50 import com.att.nsa.configs.ConfigDbException;
51 import org.onap.dmaap.mmagent.*;
52 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
53 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
54 import com.google.gson.Gson;
55 import com.google.gson.JsonSyntaxException;
56
57 import edu.emory.mathcs.backport.java.util.Arrays;
58
59 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
60 import org.onap.dmaap.dmf.mr.CambriaApiException;
61 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
62
63 import org.json.JSONArray;
64 import org.json.JSONException;
65 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
66 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
67 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
68 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
69 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
70 import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException;
71 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
72 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
73 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
74 import org.onap.dmaap.dmf.mr.service.MMService;
75
76 /**
77  * Rest Service class for Mirror Maker proxy Rest Services
78  * 
79  * @author <a href="mailto:"></a>
80  *
81  * @since May 25, 2016
82  */
83
84 @Component
85 public class MMRestService {
86
87         private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(MMRestService.class);
88         private static final String NO_ADMIN_PERMISSION = "No Mirror Maker Admin permission.";
89         private static final String NO_USER_PERMISSION = "No Mirror Maker User permission.";
90         private static final String NO_USER_CREATE_PERMISSION = "No Mirror Maker User Create permission.";
91         private static final String NAME_DOES_NOT_MEET_REQUIREMENT = "Mirror Maker name can only contain alpha numeric";
92         private static final String INVALID_IPPORT = "This is not a valid IP:Port";
93         private static final String MIRROR_MAKERADMIN = "msgRtr.mirrormakeradmin.aaf";
94         private static final String MIRROR_MAKERUSER = "msgRtr.mirrormakeruser.aaf";
95         private static final String UTF_8 = "UTF-8";
96         private static final String MESSAGE = "message";
97         private static final String LISTMIRRORMAKER = "listMirrorMaker";
98         private static final String ERROR = "error";
99         private static final String NAMESPACE = "namespace";
100
101         private String topic;
102         private int timeout;
103         private String consumergroup;
104         private String consumerid;
105
106         @Autowired
107         @Qualifier("configurationReader")
108         private ConfigurationReader configReader;
109
110         @Context
111         private HttpServletRequest request;
112
113         @Context
114         private HttpServletResponse response;
115
116         @Autowired
117         private MMService mirrorService;
118
119         @Autowired
120         private DMaaPErrorMessages errorMessages;
121
122         private ErrorResponse errResJson = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
123                         DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), "", null, Utils.getFormattedDate(new Date()), topic,
124                         null, null, "mirrorMakerAgent", null);
125
126         private DMaaPAAFAuthenticator dmaapAAFauthenticator = new DMaaPAAFAuthenticatorImpl();
127
128         /**
129          * This method is used for taking Configuration Object,HttpServletRequest
130          * Object,HttpServletRequest HttpServletResponse Object,HttpServletSession
131          * Object.
132          * 
133          * @return DMaaPContext object from where user can get Configuration
134          *         Object,HttpServlet Object
135          * 
136          */
137         private DMaaPContext getDmaapContext() {
138                 DMaaPContext dmaapContext = new DMaaPContext();
139                 dmaapContext.setRequest(request);
140                 dmaapContext.setResponse(response);
141                 dmaapContext.setConfigReader(configReader);
142                 dmaapContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
143
144                 return dmaapContext;
145         }
146
147         @POST
148         @Produces("application/json")
149         @Path("/create")
150         public void callCreateMirrorMaker(InputStream msg) throws Exception {
151
152                 DMaaPContext ctx = getDmaapContext();
153                 if (checkMirrorMakerPermission(ctx,
154                                 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeradmin.aaf"))) {
155
156                         loadProperty();
157                         String input = null;
158                         String randomStr = getRandomNum();
159
160                         InputStream inStream = null;
161                         Gson gson = new Gson();
162                         CreateMirrorMaker createMirrorMaker = new CreateMirrorMaker();
163                         LOGGER.info("Starting Create MirrorMaker");
164                         try {
165                                 input = IOUtils.toString(msg, "UTF-8");
166
167                                 if (input != null && input.length() > 0) {
168                                         input = removeExtraChar(input);
169                                 }
170
171                                 // Check if the request has CreateMirrorMaker
172                                 try {
173                                         createMirrorMaker = gson.fromJson(input, CreateMirrorMaker.class);
174
175                                 } catch (JsonSyntaxException ex) {
176
177                                         errResJson.setErrorMessage(errorMessages.getIncorrectJson() + ex.getMessage());
178                                         LOGGER.info(errResJson.toString());
179                                         throw new CambriaApiException(errResJson);
180                                 }
181
182                                 // send error message if it is not a CreateMirrorMaker request.
183                                 if (createMirrorMaker.getCreateMirrorMaker() == null) {
184
185                                         errResJson.setErrorMessage("This is not a CreateMirrorMaker request. Please try again.");
186                                         LOGGER.info(errResJson.toString());
187                                         throw new CambriaApiException(errResJson);
188                                 } else {
189                                         createMirrorMaker.validateJSON();
190                                 }
191
192                                 String name = createMirrorMaker.getCreateMirrorMaker().getName();
193
194                                 // if empty, blank name is entered
195                                 if (StringUtils.isBlank(name)) {
196
197                                         errResJson.setErrorMessage("Name can not be empty or blank.");
198                                         LOGGER.info(errResJson.toString());
199                                         throw new CambriaApiException(errResJson);
200                                 }
201
202                                 // Check if the name contains only Alpha Numeric
203                                 else if (!isAlphaNumeric(name)) {
204
205                                         errResJson.setErrorMessage(NAME_DOES_NOT_MEET_REQUIREMENT);
206                                         LOGGER.info(errResJson.toString());
207                                         throw new CambriaApiException(errResJson);
208
209                                 } else {
210
211                                         if (null == createMirrorMaker.getMessageID() || createMirrorMaker.getMessageID().isEmpty()) {
212                                                 createMirrorMaker.setMessageID(randomStr);
213                                         }
214                                         inStream = IOUtils.toInputStream(gson.toJson(createMirrorMaker), "UTF-8");
215                                         JSONObject existMirrorMaker = validateMMExists(ctx, name);
216                                         if (!(boolean) existMirrorMaker.get("exists")) {
217                                                 JSONObject finalJsonObj = callPubSub(createMirrorMaker.getMessageID(), ctx, inStream, name,
218                                                                 false);
219                                                 DMaaPResponseBuilder.respondOk(ctx, finalJsonObj);
220                                         } else {
221
222                                                 errResJson.setErrorMessage("MirrorMaker " + name + " already exists");
223                                                 LOGGER.info(errResJson.toString());
224                                                 throw new CambriaApiException(errResJson);
225
226                                         }
227                                 }
228
229                         } catch (IOException e) {
230
231                                 throw e;
232                         }
233                 }
234                 // Send error response if user does not provide Authorization
235                 else {
236                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
237                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_ADMIN_PERMISSION, null,
238                                         Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
239                                         ctx.getRequest().getRemoteHost());
240                         LOGGER.info(errRes.toString());
241                         throw new CambriaApiException(errRes);
242                 }
243
244         }
245
246         @POST
247         @Produces("application/json")
248         @Path("/listall")
249         public void callListAllMirrorMaker(InputStream msg) throws Exception {
250
251                 DMaaPContext ctx = getDmaapContext();
252
253                 if (checkMirrorMakerPermission(ctx,
254                                 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeradmin.aaf"))) {
255
256                         loadProperty();
257
258                         String input = null;
259                         Gson gson = new Gson();
260
261                         try {
262                                 input = IOUtils.toString(msg, "UTF-8");
263
264                                 if (input != null && input.length() > 0) {
265                                         input = removeExtraChar(input);
266                                 }
267
268                                 String randomStr = getRandomNum();
269                                 JSONObject jsonOb = null;
270
271                                 try {
272                                         jsonOb = new JSONObject(input);
273
274                                 } catch (JSONException ex) {
275                                         errResJson.setErrorMessage(errorMessages.getIncorrectJson());
276                                         LOGGER.info(errResJson.toString());
277                                         throw new CambriaApiException(errResJson);
278                                 }
279
280                                 // Check if request has listAllMirrorMaker and
281                                 // listAllMirrorMaker is empty
282                                 if (jsonOb.has("listAllMirrorMaker") && jsonOb.getJSONObject("listAllMirrorMaker").length() == 0) {
283
284                                         if (!jsonOb.has("messageID") || jsonOb.isNull("messageID")) {
285                                                 jsonOb.put("messageID", randomStr);
286                                         }
287
288                                         InputStream inStream = null;
289                                         MirrorMaker mirrormaker = gson.fromJson(input, MirrorMaker.class);
290
291                                         try {
292                                                 inStream = IOUtils.toInputStream(jsonOb.toString(), "UTF-8");
293
294                                         } catch (IOException ioe) {
295                                                 throw ioe;
296                                         }
297
298                                         JSONObject responseJson = callPubSub(jsonOb.getString("messageID"), ctx, inStream, mirrormaker.name,
299                                                         true);
300                                         DMaaPResponseBuilder.respondOk(ctx, responseJson);
301
302                                 } else {
303
304                                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
305                                                         DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
306                                                         "This is not a ListAllMirrorMaker request. Please try again.", null,
307                                                         Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
308                                                         ctx.getRequest().getRemoteHost());
309                                         LOGGER.info(errRes.toString());
310                                         throw new CambriaApiException(errRes);
311                                 }
312
313                         } catch (IOException ioe) {
314
315                                 throw ioe;
316                         }
317
318                 } else {
319
320                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
321                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_ADMIN_PERMISSION, null,
322                                         Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
323                                         ctx.getRequest().getRemoteHost());
324                         LOGGER.info(errRes.toString());
325                         throw new CambriaApiException(errRes);
326                 }
327         }
328
329         @POST
330         @Produces("application/json")
331         @Path("/update")
332         public void callUpdateMirrorMaker(InputStream msg) throws Exception {
333                 DMaaPContext ctx = getDmaapContext();
334                 if (checkMirrorMakerPermission(ctx,
335                                 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeradmin.aaf"))) {
336
337                         loadProperty();
338                         String input = null;
339                         String randomStr = getRandomNum();
340
341                         InputStream inStream = null;
342                         Gson gson = new Gson();
343                         UpdateMirrorMaker updateMirrorMaker = new UpdateMirrorMaker();
344                         JSONObject jsonOb, jsonObInput = null;
345
346                         try {
347                                 input = IOUtils.toString(msg, "UTF-8");
348
349                                 if (input != null && input.length() > 0) {
350                                         input = removeExtraChar(input);
351                                 }
352
353                                 // Check if the request has UpdateMirrorMaker
354                                 try {
355                                         updateMirrorMaker = gson.fromJson(input, UpdateMirrorMaker.class);
356                                         jsonOb = new JSONObject(input);
357                                         jsonObInput = (JSONObject) jsonOb.get("updateMirrorMaker");
358
359                                 } catch (JsonSyntaxException ex) {
360
361                                         errResJson.setErrorMessage(errorMessages.getIncorrectJson() + ex.getMessage());
362                                         LOGGER.info(errResJson.toString());
363                                         throw new CambriaApiException(errResJson);
364                                 }
365
366                                 // send error message if it is not a UpdateMirrorMaker request.
367                                 if (updateMirrorMaker.getUpdateMirrorMaker() == null) {
368
369                                         errResJson.setErrorMessage("This is not a UpdateMirrorMaker request. Please try again.");
370                                         LOGGER.info(errResJson.toString());
371                                         throw new CambriaApiException(errResJson);
372                                 } else {
373                                         updateMirrorMaker.validateJSON(jsonObInput);
374                                 }
375
376                                 String name = updateMirrorMaker.getUpdateMirrorMaker().getName();
377                                 // if empty, blank name is entered
378                                 if (StringUtils.isBlank(name)) {
379
380                                         errResJson.setErrorMessage("Name can not be empty or blank.");
381                                         LOGGER.info(errResJson.toString());
382                                         throw new CambriaApiException(errResJson);
383                                 }
384
385                                 // Check if the name contains only Alpha Numeric
386                                 else if (!isAlphaNumeric(name)) {
387                                         errResJson.setErrorMessage(NAME_DOES_NOT_MEET_REQUIREMENT);
388                                         LOGGER.info(errResJson.toString());
389                                         throw new CambriaApiException(errResJson);
390
391                                 }
392
393                                 // Set a random number as messageID, convert Json Object to
394                                 // InputStream and finally call publisher and subscriber
395                                 else {
396
397                                         if (null == updateMirrorMaker.getMessageID() || updateMirrorMaker.getMessageID().isEmpty()) {
398                                                 updateMirrorMaker.setMessageID(randomStr);
399                                         }
400
401                                         JSONObject existMirrorMaker = validateMMExists(ctx, name);
402
403                                         if ((boolean) existMirrorMaker.get("exists")) {
404                                                 JSONObject existMM = (JSONObject) existMirrorMaker.get("listMirrorMaker");
405
406                                                 if (!jsonObInput.has("numStreams")) {
407                                                         updateMirrorMaker.getUpdateMirrorMaker().setNumStreams(existMM.getInt("numStreams"));
408                                                 }
409                                                 if (!jsonObInput.has("enablelogCheck")) {
410                                                         updateMirrorMaker.getUpdateMirrorMaker()
411                                                                         .setEnablelogCheck(existMM.getBoolean("enablelogCheck"));
412                                                 }
413                                                 inStream = IOUtils.toInputStream(gson.toJson(updateMirrorMaker), "UTF-8");
414                                                 JSONObject finalJsonObj = callPubSub(updateMirrorMaker.getMessageID(), ctx, inStream, name,
415                                                                 false);
416                                                 DMaaPResponseBuilder.respondOk(ctx, finalJsonObj);
417                                         } else {
418
419                                                 errResJson.setErrorMessage("MirrorMaker " + name + " does not exist");
420                                                 LOGGER.info(errResJson.toString());
421                                                 throw new CambriaApiException(errResJson);
422
423                                         }
424
425                                 }
426
427                         } catch (IOException e) {
428
429                                 e.printStackTrace();
430                         }
431                 }
432                 // Send error response if user does not provide Authorization
433                 else {
434                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
435                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_ADMIN_PERMISSION, null,
436                                         Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
437                                         ctx.getRequest().getRemoteHost());
438                         LOGGER.info(errRes.toString());
439                         throw new CambriaApiException(errRes);
440                 }
441         }
442
443         @POST
444         @Produces("application/json")
445         @Path("/delete")
446         public void callDeleteMirrorMaker(InputStream msg) throws JSONException, Exception {
447                 DMaaPContext ctx = getDmaapContext();
448
449                 if (checkMirrorMakerPermission(ctx,
450                                 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeradmin.aaf"))) {
451
452                         loadProperty();
453
454                         String input = null;
455                         Gson gson = new Gson();
456                         MirrorMaker mirrormaker = new MirrorMaker();
457
458                         try {
459                                 input = IOUtils.toString(msg, "UTF-8");
460
461                                 if (input != null && input.length() > 0) {
462                                         input = removeExtraChar(input);
463                                 }
464
465                                 String randomStr = getRandomNum();
466                                 JSONObject jsonOb = null;
467
468                                 try {
469                                         jsonOb = new JSONObject(input);
470                                         mirrormaker = gson.fromJson(input, MirrorMaker.class);
471
472                                 } catch (JSONException ex) {
473
474                                         errResJson.setErrorMessage(errorMessages.getIncorrectJson());
475                                         LOGGER.info(errResJson.toString());
476                                         throw new CambriaApiException(errResJson);
477                                 }
478
479                                 // Check if request has DeleteMirrorMaker and
480                                 // DeleteMirrorMaker has MirrorMaker object with name variable
481                                 // and check if the name contain only alpha numeric
482
483                                 if (jsonOb.has("deleteMirrorMaker") && jsonOb.getJSONObject("deleteMirrorMaker").length() == 1
484                                                 && jsonOb.getJSONObject("deleteMirrorMaker").has("name")
485                                                 && !StringUtils.isBlank(jsonOb.getJSONObject("deleteMirrorMaker").getString("name"))
486                                                 && isAlphaNumeric(jsonOb.getJSONObject("deleteMirrorMaker").getString("name"))) {
487
488                                         if (!jsonOb.has("messageID") || jsonOb.isNull("messageID")) {
489                                                 jsonOb.put("messageID", randomStr);
490                                         }
491
492                                         InputStream inStream = null;
493
494                                         try {
495                                                 inStream = IOUtils.toInputStream(jsonOb.toString(), "UTF-8");
496
497                                         } catch (IOException ioe) {
498                                                 ioe.printStackTrace();
499                                         }
500                                         JSONObject deleteMM = (JSONObject) jsonOb.getJSONObject("deleteMirrorMaker");
501
502                                         JSONObject existMirrorMaker = validateMMExists(ctx, deleteMM.getString("name"));
503
504                                         if ((boolean) existMirrorMaker.get("exists")) {
505
506                                                 JSONObject finalJsonObj = callPubSub(jsonOb.getString("messageID"), ctx, inStream,
507                                                                 mirrormaker.name, false);
508                                                 DMaaPResponseBuilder.respondOk(ctx, finalJsonObj);
509                                         } else {
510
511                                                 errResJson.setErrorMessage("MirrorMaker " + deleteMM.getString("name") + " does not exist");
512                                                 LOGGER.info(errResJson.toString());
513                                                 throw new CambriaApiException(errResJson);
514
515                                         }
516
517                                 } else {
518
519                                         errResJson.setErrorMessage("This is not a DeleteMirrorMaker request. Please try again.");
520                                         LOGGER.info(errResJson.toString());
521                                         throw new CambriaApiException(errResJson);
522
523                                 }
524
525                         } catch (IOException ioe) {
526
527                                 throw ioe;
528                         }
529
530                 } else {
531
532                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
533                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_ADMIN_PERMISSION, null,
534                                         Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
535                                         ctx.getRequest().getRemoteHost());
536                         LOGGER.info(errRes.toString());
537                         throw new CambriaApiException(errRes);
538                 }
539         }
540
541         public boolean isListMirrorMaker(String msg, String messageID) {
542                 String topicmsg = msg;
543                 topicmsg = removeExtraChar(topicmsg);
544                 JSONObject jObj = new JSONObject();
545                 JSONArray jArray = null;
546                 boolean exist = false;
547
548                 if (!StringUtils.isBlank(topicmsg) && topicmsg.length() > 2) {
549                         jArray = new JSONArray(topicmsg);
550
551                         for (int i = 0; i < jArray.length(); i++) {
552                                 jObj = jArray.getJSONObject(i);
553
554                                 if (jObj.has("messageID") && jObj.get("messageID").equals(messageID) && jObj.has("listMirrorMaker")) {
555                                         exist = true;
556                                         break;
557                                 }
558                         }
559                 }
560                 return exist;
561         }
562
563         private void loadProperty() {
564
565                 this.timeout = Integer.parseInt(
566                                 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.timeout").trim());
567                 this.topic = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.topic").trim();
568                 this.consumergroup = AJSCPropertiesMap
569                                 .getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.consumergroup").trim();
570                 this.consumerid = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.consumerid")
571                                 .trim();
572         }
573
574         private String removeExtraChar(String message) {
575                 String str = message;
576                 str = checkJsonFormate(str);
577
578                 if (str != null && str.length() > 0) {
579                         str = str.replace("\\", "");
580                         str = str.replace("\"{", "{");
581                         str = str.replace("}\"", "}");
582                 }
583                 return str;
584         }
585
586         private String getRandomNum() {
587                 long random = Math.round(Math.random() * 89999) + 10000;
588                 String strLong = Long.toString(random);
589                 return strLong;
590         }
591
592         private boolean isAlphaNumeric(String name) {
593                 String pattern = "^[a-zA-Z0-9]*$";
594                 if (name.matches(pattern)) {
595                         return true;
596                 }
597                 return false;
598         }
599
600         private String checkJsonFormate(String jsonStr) {
601
602                 String json = jsonStr;
603                 if (jsonStr != null && jsonStr.length() > 0 && jsonStr.startsWith("[") && !jsonStr.endsWith("]")) {
604                         json = json + "]";
605                 }
606                 return json;
607         }
608
609         private boolean checkMirrorMakerPermission(DMaaPContext ctx, String permission) {
610
611                 boolean hasPermission = false;
612
613                 if (dmaapAAFauthenticator.aafAuthentication(ctx.getRequest(), permission)) {
614                         hasPermission = true;
615                 }
616                 return hasPermission;
617         }
618
619         public JSONObject callPubSub(String randomstr, DMaaPContext ctx, InputStream inStream, String name, boolean listAll)
620                         throws Exception {
621                 loadProperty();
622                 JSONObject jsonObj = new JSONObject();
623                 JSONObject finalJsonObj = new JSONObject();
624                 JSONArray jsonArray = null;
625                 try {
626                         String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
627                         mirrorService.pushEvents(ctx, topic, inStream, null, null);
628                         long startTime = System.currentTimeMillis();
629
630                         while (!isListMirrorMaker(msgFrmSubscribe, randomstr)
631                                         && ((System.currentTimeMillis() - startTime) < timeout)) {
632                                 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
633
634                         }
635
636                         if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
637                                         && isListMirrorMaker(msgFrmSubscribe, randomstr)) {
638                                 msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
639
640                                 jsonArray = new JSONArray(msgFrmSubscribe);
641                                 jsonObj = jsonArray.getJSONObject(0);
642                                 if (jsonObj.has("listMirrorMaker")) {
643                                         jsonArray = (JSONArray) jsonObj.get("listMirrorMaker");
644                                         if (true == listAll) {
645                                                 return jsonObj;
646                                         } else {
647                                                 for (int i = 0; i < jsonArray.length(); i++) {
648                                                         jsonObj = jsonArray.getJSONObject(i);
649                                                         if (null != name && !name.isEmpty()) {
650                                                                 if (jsonObj.getString("name").equals(name)) {
651                                                                         finalJsonObj.put("listMirrorMaker", jsonObj);
652                                                                         break;
653                                                                 }
654                                                         } else {
655                                                                 finalJsonObj.put("listMirrorMaker", jsonObj);
656                                                         }
657
658                                                 }
659                                         }
660                                 }
661                                 return finalJsonObj;
662
663                         } else {
664
665                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
666                                                 DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(),
667                                                 "listMirrorMaker is not available, please make sure MirrorMakerAgent is running", null,
668                                                 Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
669                                                 ctx.getRequest().getRemoteHost());
670                                 LOGGER.info(errRes.toString());
671                                 throw new CambriaApiException(errRes);
672
673                         }
674
675                 } catch (Exception e) {
676
677                         throw e;
678                 }
679
680         }
681
682         private void sendErrResponse(DMaaPContext ctx, String errMsg) {
683                 JSONObject err = new JSONObject();
684                 err.append(ERROR, errMsg);
685
686                 try {
687                         DMaaPResponseBuilder.respondOk(ctx, err);
688                         LOGGER.error(errMsg);
689
690                 } catch (JSONException | IOException e) {
691                         LOGGER.error("Error at sendErrResponse method:" + errMsg + "Exception name:" + e);
692                 }
693         }
694
695         @SuppressWarnings("unchecked")
696         @POST
697         @Produces("application/json")
698         @Path("/listallwhitelist")
699         public void listWhiteList(InputStream msg) throws Exception {
700
701                 DMaaPContext ctx = getDmaapContext();
702                 if (checkMirrorMakerPermission(ctx,
703                                 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeruser.aaf"))) {
704
705                         loadProperty();
706                         String input = null;
707
708                         try {
709                                 input = IOUtils.toString(msg, "UTF-8");
710
711                                 if (input != null && input.length() > 0) {
712                                         input = removeExtraChar(input);
713                                 }
714
715                                 // Check if it is correct Json object
716                                 JSONObject jsonOb = null;
717
718                                 try {
719                                         jsonOb = new JSONObject(input);
720
721                                 } catch (JSONException ex) {
722
723                                         errResJson.setErrorMessage(errorMessages.getIncorrectJson());
724                                         LOGGER.info(errResJson.toString());
725                                         throw new CambriaApiException(errResJson);
726
727                                 }
728
729                                 // Check if the request has name and name contains only alpha
730                                 // numeric
731                                 // and check if the request has namespace and namespace contains
732                                 // only alpha numeric
733                                 if (jsonOb.length() == 2 && jsonOb.has("name") && !StringUtils.isBlank(jsonOb.getString("name"))
734                                                 && isAlphaNumeric(jsonOb.getString("name")) && jsonOb.has("namespace")
735                                                 && !StringUtils.isBlank(jsonOb.getString("namespace"))) {
736
737                                         String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
738                                                         "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString("namespace") + "|create";
739
740                                         // Check if the user have create permission for the
741                                         // namespace
742                                         if (checkMirrorMakerPermission(ctx, permission)) {
743
744                                                 JSONObject listAll = new JSONObject();
745                                                 JSONObject emptyObject = new JSONObject();
746
747                                                 // Create a listAllMirrorMaker Json object
748                                                 try {
749                                                         listAll.put("listAllMirrorMaker", emptyObject);
750
751                                                 } catch (JSONException e) {
752
753                                                         errResJson.setErrorMessage(errorMessages.getIncorrectJson());
754                                                         LOGGER.info(errResJson.toString());
755                                                         throw new CambriaApiException(errResJson);
756                                                 }
757
758                                                 // set a random number as messageID
759                                                 String randomStr = getRandomNum();
760                                                 listAll.put("messageID", randomStr);
761                                                 InputStream inStream = null;
762
763                                                 // convert listAll Json object to InputStream object
764                                                 try {
765                                                         inStream = IOUtils.toInputStream(listAll.toString(), "UTF-8");
766
767                                                 } catch (IOException ioe) {
768                                                         ioe.printStackTrace();
769                                                 }
770                                                 JSONObject listMirrorMaker = new JSONObject();
771                                                 listMirrorMaker = callPubSub(randomStr, ctx, inStream, null, true);
772
773                                                 String whitelist = null;
774                                                 JSONArray listMMArray = new JSONArray();
775                                                 if (listMirrorMaker.has("listMirrorMaker")) {
776                                                         listMMArray = (JSONArray) listMirrorMaker.get("listMirrorMaker");
777                                                         for (int i = 0; i < listMMArray.length(); i++) {
778
779                                                                 JSONObject mm = new JSONObject();
780                                                                 mm = listMMArray.getJSONObject(i);
781                                                                 String name = mm.getString("name");
782
783                                                                 if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) {
784                                                                         whitelist = mm.getString("whitelist");
785                                                                         break;
786                                                                 }
787                                                         }
788
789                                                         if (!StringUtils.isBlank(whitelist)) {
790
791                                                                 List<String> topicList = new ArrayList<String>();
792                                                                 List<String> finalTopicList = new ArrayList<String>();
793                                                                 topicList = Arrays.asList(whitelist.split(","));
794
795                                                                 for (String topic : topicList) {
796                                                                         if (topic != null && !topic.equals("null")
797                                                                                         && getNamespace(topic).equals(jsonOb.getString("namespace"))) {
798
799                                                                                 finalTopicList.add(topic);
800                                                                         }
801                                                                 }
802
803                                                                 String topicNames = "";
804
805                                                                 if (finalTopicList.size() > 0) {
806                                                                         topicNames = StringUtils.join(finalTopicList, ",");
807                                                                 }
808
809                                                                 JSONObject listAllWhiteList = new JSONObject();
810                                                                 listAllWhiteList.put("name", jsonOb.getString("name"));
811                                                                 listAllWhiteList.put("whitelist", topicNames);
812
813                                                                 DMaaPResponseBuilder.respondOk(ctx, listAllWhiteList);
814                                                         }
815
816                                                 } else {
817
818                                                         errResJson.setErrorMessage(
819                                                                         "listWhiteList is not available, please make sure MirrorMakerAgent is running");
820                                                         LOGGER.info(errResJson.toString());
821                                                         throw new CambriaApiException(errResJson);
822
823                                                 }
824
825                                         } else {
826                                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
827                                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_USER_CREATE_PERMISSION,
828                                                                 null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
829                                                                 ctx.getRequest().getRemoteHost());
830                                                 LOGGER.info(errRes.toString());
831                                                 throw new CambriaApiException(errRes);
832                                         }
833
834                                 } else {
835
836                                         errResJson.setErrorMessage("This is not a ListAllWhitelist request. Please try again.");
837                                         LOGGER.info(errResJson.toString());
838                                         throw new CambriaApiException(errResJson);
839                                 }
840
841                         } catch (IOException e) {
842
843                                 e.printStackTrace();
844                         }
845                 } else {
846                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
847                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_USER_PERMISSION, null,
848                                         Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
849                                         ctx.getRequest().getRemoteHost());
850                         LOGGER.info(errRes.toString());
851                         throw new CambriaApiException(errRes);
852                 }
853         }
854
855         @SuppressWarnings("unchecked")
856         @POST
857         @Produces("application/json")
858         @Path("/createwhitelist")
859         public void createWhiteList(InputStream msg) throws Exception {
860
861                 DMaaPContext ctx = getDmaapContext();
862                 if (checkMirrorMakerPermission(ctx,
863                                 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeruser.aaf"))) {
864
865                         loadProperty();
866                         String input = null;
867
868                         try {
869                                 input = IOUtils.toString(msg, "UTF-8");
870
871                                 if (input != null && input.length() > 0) {
872                                         input = removeExtraChar(input);
873                                 }
874
875                                 // Check if it is correct Json object
876                                 JSONObject jsonOb = null;
877
878                                 try {
879                                         jsonOb = new JSONObject(input);
880
881                                 } catch (JSONException ex) {
882                                         errResJson.setErrorMessage(errorMessages.getIncorrectJson());
883                                         LOGGER.info(errResJson.toString());
884                                         throw new CambriaApiException(errResJson);
885                                 }
886
887                                 // Check if the request has name and name contains only alpha
888                                 // numeric,
889                                 // check if the request has namespace and
890                                 // check if the request has whitelistTopicName
891                                 // check if the topic name contains only alpha numeric
892                                 if (jsonOb.length() == 3 && jsonOb.has("name") && !StringUtils.isBlank(jsonOb.getString("name"))
893                                                 && isAlphaNumeric(jsonOb.getString("name")) && jsonOb.has("namespace")
894                                                 && !StringUtils.isBlank(jsonOb.getString("namespace")) && jsonOb.has("whitelistTopicName")
895                                                 && !StringUtils.isBlank(jsonOb.getString("whitelistTopicName"))
896                                                 && isAlphaNumeric(jsonOb.getString("whitelistTopicName").substring(
897                                                                 jsonOb.getString("whitelistTopicName").lastIndexOf(".") + 1, jsonOb
898                                                                                 .getString("whitelistTopicName").length()))) {
899
900                                         String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
901                                                         "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString("namespace") + "|create";
902
903                                         // Check if the user have create permission for the
904                                         // namespace
905                                         if (checkMirrorMakerPermission(ctx, permission)) {
906
907                                                 JSONObject listAll = new JSONObject();
908                                                 JSONObject emptyObject = new JSONObject();
909
910                                                 // Create a listAllMirrorMaker Json object
911                                                 try {
912                                                         listAll.put("listAllMirrorMaker", emptyObject);
913
914                                                 } catch (JSONException e) {
915
916                                                         errResJson.setErrorMessage(errorMessages.getIncorrectJson());
917                                                         LOGGER.info(errResJson.toString());
918                                                         throw new CambriaApiException(errResJson);
919                                                 }
920
921                                                 // set a random number as messageID
922                                                 String randomStr = getRandomNum();
923                                                 listAll.put("messageID", randomStr);
924                                                 InputStream inStream = null;
925
926                                                 // convert listAll Json object to InputStream object
927                                                 try {
928                                                         inStream = IOUtils.toInputStream(listAll.toString(), "UTF-8");
929
930                                                 } catch (IOException ioe) {
931                                                         ioe.printStackTrace();
932                                                 }
933                                                 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
934                                                 // call listAllMirrorMaker
935                                                 mirrorService.pushEvents(ctx, topic, inStream, null, null);
936
937                                                 // subscribe for listMirrorMaker
938                                                 long startTime = System.currentTimeMillis();
939
940                                                 while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
941                                                                 && (System.currentTimeMillis() - startTime) < timeout) {
942                                                         msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
943                                                 }
944
945                                                 JSONArray listMirrorMaker = null;
946
947                                                 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
948                                                                 && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
949
950                                                         listMirrorMaker = getListMirrorMaker(msgFrmSubscribe, randomStr);
951                                                         String whitelist = null;
952
953                                                         for (int i = 0; i < listMirrorMaker.length(); i++) {
954                                                                 JSONObject mm = new JSONObject();
955                                                                 mm = listMirrorMaker.getJSONObject(i);
956                                                                 String name = mm.getString("name");
957
958                                                                 if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) {
959                                                                         whitelist = mm.getString("whitelist");
960                                                                         break;
961                                                                 }
962                                                         }
963
964                                                         List<String> topicList = new ArrayList<String>();
965                                                         List<String> finalTopicList = new ArrayList<String>();
966
967                                                         if (whitelist != null) {
968                                                                 topicList = Arrays.asList(whitelist.split(","));
969                                                         }
970
971                                                         for (String st : topicList) {
972                                                                 if (!StringUtils.isBlank(st)) {
973                                                                         finalTopicList.add(st);
974                                                                 }
975                                                         }
976
977                                                         String newTopic = jsonOb.getString("whitelistTopicName");
978
979                                                         if (!topicList.contains(newTopic)
980                                                                         && getNamespace(newTopic).equals(jsonOb.getString("namespace"))) {
981
982                                                                 UpdateWhiteList updateWhiteList = new UpdateWhiteList();
983                                                                 MirrorMaker mirrorMaker = new MirrorMaker();
984                                                                 mirrorMaker.setName(jsonOb.getString("name"));
985                                                                 finalTopicList.add(newTopic);
986                                                                 String newWhitelist = "";
987
988                                                                 if (finalTopicList.size() > 0) {
989                                                                         newWhitelist = StringUtils.join(finalTopicList, ",");
990                                                                 }
991
992                                                                 mirrorMaker.setWhitelist(newWhitelist);
993
994                                                                 String newRandom = getRandomNum();
995                                                                 updateWhiteList.setMessageID(newRandom);
996                                                                 updateWhiteList.setUpdateWhiteList(mirrorMaker);
997
998                                                                 Gson g = new Gson();
999                                                                 g.toJson(updateWhiteList);
1000                                                                 InputStream inputStream = null;
1001                                                                 inputStream = IOUtils.toInputStream(g.toJson(updateWhiteList), "UTF-8");
1002                                                                 // callPubSub(newRandom, ctx, inputStream);
1003                                                                 callPubSubForWhitelist(newRandom, ctx, inputStream, jsonOb);
1004
1005                                                         } else if (topicList.contains(newTopic)) {
1006                                                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR,
1007                                                                                 DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), "The topic already exist.",
1008                                                                                 null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
1009                                                                                 ctx.getRequest().getRemoteHost());
1010                                                                 LOGGER.info(errRes.toString());
1011                                                                 throw new CambriaApiException(errRes);
1012
1013                                                         } else if (!getNamespace(newTopic).equals(jsonOb.getString("namespace"))) {
1014                                                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR,
1015                                                                                 DMaaPResponseCode.INCORRECT_JSON.getResponseCode(),
1016                                                                                 "The namespace of the topic does not match with the namespace you provided.",
1017                                                                                 null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
1018                                                                                 ctx.getRequest().getRemoteHost());
1019                                                                 LOGGER.info(errRes.toString());
1020                                                                 throw new CambriaApiException(errRes);
1021                                                         }
1022                                                 } else {
1023
1024                                                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
1025                                                                         DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
1026                                                                         "listWhiteList is not available, please make sure MirrorMakerAgent is running",
1027                                                                         null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
1028                                                                         ctx.getRequest().getRemoteHost());
1029                                                         LOGGER.info(errRes.toString());
1030                                                         throw new CambriaApiException(errRes);
1031                                                 }
1032
1033                                         } else {
1034                                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
1035                                                                 DMaaPResponseCode.UNABLE_TO_AUTHORIZE.getResponseCode(), NO_USER_CREATE_PERMISSION,
1036                                                                 null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
1037                                                                 ctx.getRequest().getRemoteHost());
1038                                                 LOGGER.info(errRes.toString());
1039                                                 throw new CambriaApiException(errRes);
1040                                         }
1041
1042                                 } else {
1043
1044                                         errResJson.setErrorMessage("This is not a createWhitelist request. Please try again.");
1045                                         LOGGER.info(errResJson.toString());
1046                                         throw new CambriaApiException(errResJson);
1047                                 }
1048
1049                         } catch (IOException | CambriaApiException | ConfigDbException | AccessDeniedException
1050                                         | TopicExistsException | missingReqdSetting | UnavailableException e) {
1051
1052                                 throw e;
1053                         }
1054                 }
1055                 // Send error response if user does not provide Authorization
1056                 else {
1057                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
1058                                         DMaaPResponseCode.UNABLE_TO_AUTHORIZE.getResponseCode(), NO_USER_PERMISSION, null,
1059                                         Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
1060                                         ctx.getRequest().getRemoteHost());
1061                         LOGGER.info(errRes.toString());
1062                         throw new CambriaApiException(errRes);
1063
1064                 }
1065         }
1066
1067         @SuppressWarnings("unchecked")
1068         @POST
1069         @Produces("application/json")
1070         @Path("/deletewhitelist")
1071         public void deleteWhiteList(InputStream msg) throws Exception {
1072
1073                 DMaaPContext ctx = getDmaapContext();
1074                 if (checkMirrorMakerPermission(ctx,
1075                                 AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeruser.aaf"))) {
1076
1077                         loadProperty();
1078                         String input = null;
1079
1080                         try {
1081                                 input = IOUtils.toString(msg, "UTF-8");
1082
1083                                 if (input != null && input.length() > 0) {
1084                                         input = removeExtraChar(input);
1085                                 }
1086
1087                                 // Check if it is correct Json object
1088                                 JSONObject jsonOb = null;
1089
1090                                 try {
1091                                         jsonOb = new JSONObject(input);
1092
1093                                 } catch (JSONException ex) {
1094
1095                                         errResJson.setErrorMessage(errorMessages.getIncorrectJson());
1096                                         LOGGER.info(errResJson.toString());
1097                                         throw new CambriaApiException(errResJson);
1098
1099                                 }
1100
1101                                 // Check if the request has name and name contains only alpha
1102                                 // numeric,
1103                                 // check if the request has namespace and
1104                                 // check if the request has whitelistTopicName
1105                                 if (jsonOb.length() == 3 && jsonOb.has("name") && isAlphaNumeric(jsonOb.getString("name"))
1106                                                 && jsonOb.has("namespace") && jsonOb.has("whitelistTopicName")
1107                                                 && isAlphaNumeric(jsonOb.getString("whitelistTopicName").substring(
1108                                                                 jsonOb.getString("whitelistTopicName").lastIndexOf(".") + 1,
1109                                                                 jsonOb.getString("whitelistTopicName").length()))) {
1110
1111                                         String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
1112                                                         "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString("namespace") + "|create";
1113
1114                                         // Check if the user have create permission for the
1115                                         // namespace
1116                                         if (checkMirrorMakerPermission(ctx, permission)) {
1117
1118                                                 JSONObject listAll = new JSONObject();
1119                                                 JSONObject emptyObject = new JSONObject();
1120
1121                                                 // Create a listAllMirrorMaker Json object
1122                                                 try {
1123                                                         listAll.put("listAllMirrorMaker", emptyObject);
1124
1125                                                 } catch (JSONException e) {
1126
1127                                                         errResJson.setErrorMessage(errorMessages.getIncorrectJson());
1128                                                         LOGGER.info(errResJson.toString());
1129                                                         throw new CambriaApiException(errResJson);
1130                                                 }
1131
1132                                                 // set a random number as messageID
1133                                                 String randomStr = getRandomNum();
1134                                                 listAll.put("messageID", randomStr);
1135                                                 InputStream inStream = null;
1136
1137                                                 // convert listAll Json object to InputStream object
1138                                                 try {
1139                                                         inStream = IOUtils.toInputStream(listAll.toString(), "UTF-8");
1140
1141                                                 } catch (IOException ioe) {
1142                                                         ioe.printStackTrace();
1143                                                 }
1144                                                 // call listAllMirrorMaker
1145                                                 mirrorService.pushEvents(ctx, topic, inStream, null, null);
1146
1147                                                 // subscribe for listMirrorMaker
1148                                                 long startTime = System.currentTimeMillis();
1149                                                 String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1150
1151                                                 while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
1152                                                                 && (System.currentTimeMillis() - startTime) < timeout) {
1153                                                         msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1154                                                 }
1155
1156                                                 JSONObject jsonObj = new JSONObject();
1157                                                 JSONArray jsonArray = null;
1158                                                 JSONArray listMirrorMaker = null;
1159
1160                                                 if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
1161                                                                 && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
1162                                                         msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
1163                                                         jsonArray = new JSONArray(msgFrmSubscribe);
1164
1165                                                         for (int i = 0; i < jsonArray.length(); i++) {
1166                                                                 jsonObj = jsonArray.getJSONObject(i);
1167
1168                                                                 if (jsonObj.has("messageID") && jsonObj.get("messageID").equals(randomStr)
1169                                                                                 && jsonObj.has("listMirrorMaker")) {
1170                                                                         listMirrorMaker = jsonObj.getJSONArray("listMirrorMaker");
1171                                                                         break;
1172                                                                 }
1173                                                         }
1174                                                         String whitelist = null;
1175                                                         for (int i = 0; i < listMirrorMaker.length(); i++) {
1176
1177                                                                 JSONObject mm = new JSONObject();
1178                                                                 mm = listMirrorMaker.getJSONObject(i);
1179                                                                 String name = mm.getString("name");
1180
1181                                                                 if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) {
1182                                                                         whitelist = mm.getString("whitelist");
1183                                                                         break;
1184                                                                 }
1185                                                         }
1186
1187                                                         List<String> topicList = new ArrayList<String>();
1188
1189                                                         if (whitelist != null) {
1190                                                                 topicList = Arrays.asList(whitelist.split(","));
1191                                                         }
1192                                                         boolean removeTopic = false;
1193                                                         String topicToRemove = jsonOb.getString("whitelistTopicName");
1194
1195                                                         if (topicList.contains(topicToRemove)) {
1196                                                                 removeTopic = true;
1197                                                         } else {
1198                                                                 errResJson.setErrorMessage(errorMessages.getTopicNotExist());
1199                                                                 LOGGER.info(errResJson.toString());
1200                                                                 throw new CambriaApiException(errResJson);
1201                                                         }
1202
1203                                                         if (removeTopic) {
1204                                                                 UpdateWhiteList updateWhiteList = new UpdateWhiteList();
1205                                                                 MirrorMaker mirrorMaker = new MirrorMaker();
1206
1207                                                                 mirrorMaker.setName(jsonOb.getString("name"));
1208
1209                                                                 if (StringUtils.isNotBlank((removeTopic(whitelist, topicToRemove)))) {
1210                                                                         mirrorMaker.setWhitelist(removeTopic(whitelist, topicToRemove));
1211                                                                 }
1212
1213                                                                 String newRandom = getRandomNum();
1214
1215                                                                 updateWhiteList.setMessageID(newRandom);
1216                                                                 updateWhiteList.setUpdateWhiteList(mirrorMaker);
1217
1218                                                                 Gson g = new Gson();
1219                                                                 g.toJson(updateWhiteList);
1220
1221                                                                 InputStream inputStream = null;
1222                                                                 inputStream = IOUtils.toInputStream(g.toJson(updateWhiteList), "UTF-8");
1223                                                                 callPubSubForWhitelist(newRandom, ctx, inputStream, jsonOb);
1224                                                                 // mmAgentUtil.getNamespace(topicToRemove));
1225                                                         }
1226
1227                                                 } else {
1228
1229                                                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
1230                                                                         DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
1231                                                                         "listWhiteList is not available, please make sure MirrorMakerAgent is running",
1232                                                                         null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
1233                                                                         ctx.getRequest().getRemoteHost());
1234                                                         LOGGER.info(errRes.toString());
1235                                                         throw new CambriaApiException(errRes);
1236                                                 }
1237
1238                                         } else {
1239                                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
1240                                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_USER_CREATE_PERMISSION,
1241                                                                 null, Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
1242                                                                 ctx.getRequest().getRemoteHost());
1243                                                 LOGGER.info(errRes.toString());
1244                                                 throw new CambriaApiException(errRes);
1245                                         }
1246
1247                                 } else {
1248
1249                                         errResJson.setErrorMessage("This is not a DeleteAllWhitelist request. Please try again.");
1250                                         LOGGER.info(errResJson.toString());
1251                                         throw new CambriaApiException(errResJson);
1252
1253                                 }
1254
1255                         } catch (IOException | CambriaApiException | ConfigDbException | AccessDeniedException
1256                                         | TopicExistsException | missingReqdSetting | UnavailableException e) {
1257
1258                                 throw e;
1259                         }
1260                 }
1261                 // Send error response if user does not provide Authorization
1262                 else {
1263                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
1264                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), NO_USER_PERMISSION, null,
1265                                         Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
1266                                         ctx.getRequest().getRemoteHost());
1267                         LOGGER.info(errRes.toString());
1268                         throw new CambriaApiException(errRes);
1269
1270                 }
1271         }
1272
1273         private String getNamespace(String topic) {
1274                 return topic.substring(0, topic.lastIndexOf("."));
1275         }
1276
1277         private String removeTopic(String whitelist, String topicToRemove) {
1278                 List<String> topicList = new ArrayList<>();
1279                 List<String> newTopicList = new ArrayList<>();
1280
1281                 if (whitelist.contains(",")) {
1282                         topicList = Arrays.asList(whitelist.split(","));
1283
1284                 }
1285
1286                 if (topicList.contains(topicToRemove)) {
1287                         for (String topic : topicList) {
1288                                 if (!topic.equals(topicToRemove)) {
1289                                         newTopicList.add(topic);
1290                                 }
1291                         }
1292                 }
1293
1294                 String newWhitelist = StringUtils.join(newTopicList, ",");
1295
1296                 return newWhitelist;
1297         }
1298
1299         public void callPubSubForWhitelist(String randomStr, DMaaPContext ctx, InputStream inStream, JSONObject jsonOb) {
1300
1301                 loadProperty();
1302                 try {
1303                         String namespace = jsonOb.getString("namespace");
1304                         String mmName = jsonOb.getString("name");
1305
1306                         String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1307                         mirrorService.pushEvents(ctx, topic, inStream, null, null);
1308                         long startTime = System.currentTimeMillis();
1309
1310                         while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
1311                                         && (System.currentTimeMillis() - startTime) < timeout) {
1312                                 msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
1313                         }
1314
1315                         JSONObject jsonObj = new JSONObject();
1316                         JSONArray jsonArray = null;
1317                         JSONArray jsonArrayNamespace = null;
1318
1319                         if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
1320                                         && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
1321                                 msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
1322                                 jsonArray = new JSONArray(msgFrmSubscribe);
1323
1324                                 for (int i = 0; i < jsonArray.length(); i++) {
1325                                         jsonObj = jsonArray.getJSONObject(i);
1326
1327                                         if (jsonObj.has("messageID") && jsonObj.get("messageID").equals(randomStr)
1328                                                         && jsonObj.has("listMirrorMaker")) {
1329                                                 jsonArrayNamespace = jsonObj.getJSONArray("listMirrorMaker");
1330                                         }
1331                                 }
1332                                 JSONObject finalJasonObj = new JSONObject();
1333                                 JSONArray finalJsonArray = new JSONArray();
1334
1335                                 for (int i = 0; i < jsonArrayNamespace.length(); i++) {
1336
1337                                         JSONObject mmObj = new JSONObject();
1338                                         mmObj = jsonArrayNamespace.getJSONObject(i);
1339                                         if (mmObj.has("name") && mmName.equals(mmObj.getString("name"))) {
1340
1341                                                 finalJsonArray.put(mmObj);
1342                                         }
1343
1344                                 }
1345                                 finalJasonObj.put("listMirrorMaker", finalJsonArray);
1346
1347                                 DMaaPResponseBuilder.respondOk(ctx, finalJasonObj);
1348
1349                         } else {
1350
1351                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
1352                                                 DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(),
1353                                                 "listMirrorMaker is not available, please make sure MirrorMakerAgent is running", null,
1354                                                 Utils.getFormattedDate(new Date()), topic, null, null, "mirrorMakerAgent",
1355                                                 ctx.getRequest().getRemoteHost());
1356                                 LOGGER.info(errRes.toString());
1357                                 throw new CambriaApiException(errRes);
1358                         }
1359
1360                 } catch (Exception e) {
1361                         e.printStackTrace();
1362                 }
1363         }
1364
1365         private String getWhitelistByNamespace(String originalWhitelist, String namespace) {
1366
1367                 String whitelist = null;
1368                 List<String> resultList = new ArrayList<>();
1369                 List<String> whitelistList = new ArrayList<>();
1370                 whitelistList = Arrays.asList(originalWhitelist.split(","));
1371
1372                 for (String topic : whitelistList) {
1373                         if (StringUtils.isNotBlank(originalWhitelist) && getNamespace(topic).equals(namespace)) {
1374                                 resultList.add(topic);
1375                         }
1376                 }
1377                 if (!resultList.isEmpty()) {
1378                         whitelist = StringUtils.join(resultList, ",");
1379                 }
1380
1381                 return whitelist;
1382         }
1383
1384         private JSONArray getListMirrorMaker(String msgFrmSubscribe, String randomStr) {
1385                 JSONObject jsonObj;
1386                 JSONArray jsonArray;
1387                 JSONArray listMirrorMaker = new JSONArray();
1388
1389                 msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
1390                 jsonArray = new JSONArray(msgFrmSubscribe);
1391
1392                 for (int i = 0; i < jsonArray.length(); i++) {
1393                         jsonObj = jsonArray.getJSONObject(i);
1394
1395                         JSONObject obj = new JSONObject();
1396                         if (jsonObj.has(MESSAGE)) {
1397                                 obj = jsonObj.getJSONObject(MESSAGE);
1398                         }
1399                         if (obj.has("messageID") && obj.get("messageID").equals(randomStr) && obj.has(LISTMIRRORMAKER)) {
1400                                 listMirrorMaker = obj.getJSONArray(LISTMIRRORMAKER);
1401                                 break;
1402                         }
1403                 }
1404                 return listMirrorMaker;
1405         }
1406
1407         public JSONObject validateMMExists(DMaaPContext ctx, String name) throws Exception {
1408                 // Create a listAllMirrorMaker Json object
1409                 JSONObject listAll = new JSONObject();
1410                 try {
1411                         listAll.put("listAllMirrorMaker", new JSONObject());
1412
1413                 } catch (JSONException e) {
1414
1415                         e.printStackTrace();
1416                 }
1417
1418                 // set a random number as messageID
1419                 String randomStr = getRandomNum();
1420                 listAll.put("messageID", randomStr);
1421                 InputStream inStream = null;
1422
1423                 // convert listAll Json object to InputStream object
1424                 try {
1425                         inStream = IOUtils.toInputStream(listAll.toString(), "UTF-8");
1426
1427                 } catch (IOException ioe) {
1428                         ioe.printStackTrace();
1429                 }
1430                 JSONObject listMirrorMaker = new JSONObject();
1431                 listMirrorMaker = callPubSub(randomStr, ctx, inStream, name, false);
1432                 if (null != listMirrorMaker && listMirrorMaker.length() > 0) {
1433                         listMirrorMaker.put("exists", true);
1434                         return listMirrorMaker;
1435
1436                 }
1437                 listMirrorMaker.put("exists", false);
1438                 return listMirrorMaker;
1439
1440         }
1441 }