[DMAAP-MR] Set topic replica & partition from config
[dmaap/messagerouter/messageservice.git] / src / main / java / org / onap / dmaap / dmf / mr / service / impl / TopicServiceImpl.java
1 /*
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Copyright (C) 2019 Nokia Intellectual Property. All rights reserved.
8  * =================================================================================
9  *  Licensed under the Apache License, Version 2.0 (the "License");
10  *  you may not use this file except in compliance with the License.
11  *  You may obtain a copy of the License at
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  *  ============LICENSE_END=========================================================
20  *  
21  */
22 package org.onap.dmaap.dmf.mr.service.impl;
23
24 import static org.onap.dmaap.util.DMaaPAuthFilter.isUseCustomAcls;
25
26 import com.att.ajsc.beans.PropertiesMapBean;
27 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
28 import com.att.eelf.configuration.EELFLogger;
29 import com.att.eelf.configuration.EELFManager;
30 import com.att.nsa.configs.ConfigDbException;
31 import com.att.nsa.security.NsaAcl;
32 import com.att.nsa.security.NsaApiKey;
33 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
34 import joptsimple.internal.Strings;
35 import org.apache.commons.lang.StringUtils;
36 import org.apache.commons.lang.math.NumberUtils;
37 import org.apache.http.HttpStatus;
38 import org.json.JSONArray;
39 import org.json.JSONException;
40 import org.json.JSONObject;
41 import org.onap.dmaap.dmf.mr.CambriaApiException;
42 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
43 import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaMetaBroker;
44 import org.onap.dmaap.dmf.mr.beans.TopicBean;
45 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
46 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
47 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
48 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
49 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
50 import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException;
51 import org.onap.dmaap.dmf.mr.metabroker.Broker1;
52 import org.onap.dmaap.dmf.mr.metabroker.Topic;
53 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
54 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
55 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
56 import org.onap.dmaap.dmf.mr.service.TopicService;
57 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder;
58 import org.onap.dmaap.dmf.mr.utils.Utils;
59 import org.springframework.beans.factory.annotation.Autowired;
60 import org.springframework.stereotype.Service;
61
62 import javax.servlet.http.HttpServletRequest;
63 import java.io.IOException;
64 import java.security.Principal;
65
66 /**
67  * @author muzainulhaque.qazi
68  *
69  */
70 @Service
71 public class TopicServiceImpl implements TopicService {
72
73         private static final String TOPIC_CREATE_OP = "create";
74         private static final EELFLogger LOGGER = EELFManager.getLogger(TopicServiceImpl.class);
75         @Autowired
76         private DMaaPErrorMessages errorMessages;
77
78         public DMaaPErrorMessages getErrorMessages() {
79                 return errorMessages;
80         }
81
82         public void setErrorMessages(DMaaPErrorMessages errorMessages) {
83                 this.errorMessages = errorMessages;
84         }
85
86
87   String getPropertyFromAJSCbean(String propertyKey) {
88                 return PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, propertyKey);
89         }
90
91         String getPropertyFromAJSCmap(String propertyKey) {
92                 return AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, propertyKey);
93         }
94
95         NsaApiKey getDmaapAuthenticatedUser(DMaaPContext dmaapContext) {
96                 return DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
97         }
98
99         void respondOk(DMaaPContext context, String msg) {
100                 DMaaPResponseBuilder.respondOkWithHtml(context, msg);
101         }
102
103         void respondOk(DMaaPContext context, JSONObject json) throws IOException {
104                 DMaaPResponseBuilder.respondOk(context, json);
105         }
106
107         boolean isCadiEnabled() {
108                 return Utils.isCadiEnabled();
109         }
110         /**
111          * @param dmaapContext
112          * @throws JSONException
113          * @throws ConfigDbException
114          * @throws IOException
115          * 
116          */
117         @Override
118         public void getTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
119                 LOGGER.info("Fetching list of all the topics.");
120                 JSONObject json = new JSONObject();
121                 JSONArray topicsList = new JSONArray();
122                 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
123                         topicsList.put(topic.getName());
124                 }
125                 json.put("topics", topicsList);
126                 LOGGER.info("Returning list of all the topics.");
127                 respondOk(dmaapContext, json);
128         }
129
130         /**
131          * @param dmaapContext
132          * @throws JSONException
133          * @throws ConfigDbException
134          * @throws IOException
135          * 
136          */
137         public void getAllTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
138                 LOGGER.info("Fetching list of all the topics.");
139                 JSONObject json = new JSONObject();
140                 JSONArray topicsList = new JSONArray();
141                 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
142                         JSONObject obj = new JSONObject();
143                         obj.put("topicName", topic.getName());
144                         
145                         obj.put("owner", topic.getOwner());
146                         obj.put("txenabled", topic.isTransactionEnabled());
147                         topicsList.put(obj);
148                 }
149                 json.put("topics", topicsList);
150                 LOGGER.info("Returning list of all the topics.");
151                 respondOk(dmaapContext, json);
152         }
153
154         /**
155          * @param dmaapContext
156          * @param topicName
157          * @throws ConfigDbException
158          * @throws IOException
159          * @throws TopicExistsException
160          */
161         @Override
162         public void getTopic(DMaaPContext dmaapContext, String topicName)
163                         throws ConfigDbException, IOException, TopicExistsException {
164                 LOGGER.info("Fetching details of topic " + topicName);
165                 Topic t = getMetaBroker(dmaapContext).getTopic(topicName);
166                 if (null == t) {
167                         LOGGER.error("Topic [" + topicName + "] does not exist.");
168                         throw new TopicExistsException("Topic [" + topicName + "] does not exist.");
169                 }
170                 JSONObject o = new JSONObject();
171                 o.put("name", t.getName());
172                 o.put("description", t.getDescription());
173                 if (null != t.getOwners())
174                         o.put("owner", t.getOwners().iterator().next());
175                 if (null != t.getReaderAcl())
176                         o.put("readerAcl", aclToJson(t.getReaderAcl()));
177                 if (null != t.getWriterAcl())
178                         o.put("writerAcl", aclToJson(t.getWriterAcl()));
179                 LOGGER.info("Returning details of topic " + topicName);
180                 respondOk(dmaapContext, o);
181         }
182
183         /**
184          * @param dmaapContext
185          * @param topicBean
186          * @throws CambriaApiException
187          * @throws AccessDeniedException
188          * @throws IOException
189          * @throws TopicExistsException
190          * @throws JSONException
191          * 
192          * 
193          * 
194          */
195         @Override
196         public void createTopic(DMaaPContext dmaapContext, TopicBean topicBean) throws CambriaApiException, IOException {
197                 String topicName = topicBean.getTopicName();
198                 LOGGER.info("Creating topic {}",topicName);
199                 String key = authorizeClient(dmaapContext, topicName, TOPIC_CREATE_OP);
200                 try {
201                         final int partitions = getIntValueOrDefault("default.partitions");
202                         final int replicas = getIntValueOrDefault("default.replicas");
203                         LOGGER.info("Attempting to create topic {} with replicas={}, partitions={}", topicName, replicas, partitions);
204                         final Topic t = getMetaBroker(dmaapContext).createTopic(topicName, topicBean.getTopicDescription(),
205                                 key, partitions, replicas, topicBean.isTransactionEnabled());
206                         LOGGER.info("Topic {} created successfully. Sending response", topicName);
207                         respondOk(dmaapContext, topicToJson(t));
208                 } catch (JSONException ex) {
209                         LOGGER.error("Failed to create topic "+ topicName +". Couldn't parse JSON data.", ex);
210                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
211                                         DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
212                         LOGGER.info(errRes.toString());
213                         throw new CambriaApiException(errRes);
214                 } catch (ConfigDbException ex) {
215                         LOGGER.error("Failed to create topic "+ topicName +".  Config DB Exception", ex);
216                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
217                                         DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
218                         LOGGER.info(errRes.toString());
219                         throw new CambriaApiException(errRes);
220                 } catch (Broker1.TopicExistsException ex) {
221                         LOGGER.error( "Failed to create topic "+ topicName +".  Topic already exists.",ex);
222                 }
223         }
224
225         private String authorizeClient(DMaaPContext dmaapContext, String topicName, String operation) throws DMaaPAccessDeniedException {
226                 String clientId = Strings.EMPTY;
227                 if(isCadiEnabled() && isTopicWithEnforcedAuthorization(topicName)) {
228                         LOGGER.info("Performing AAF authorization for topic {} creation.", topicName);
229                         String permission = buildPermission(topicName, operation);
230                         DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
231                         clientId = getAAFclientId(dmaapContext.getRequest());
232
233                         if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) {
234                                 LOGGER.error("Failed to {} topic {}. Authorization failed for client {} and permission {}",
235                                         operation, topicName, clientId, permission);
236                                 throw new DMaaPAccessDeniedException(new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
237                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
238                                         "Failed to "+ operation +" topic: Access Denied. User does not have permission to create topic with perm " + permission));
239                         }
240                 } else if (operation.equals(TOPIC_CREATE_OP)){
241                         final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
242                         clientId = (user != null) ? user.getKey() : Strings.EMPTY;
243                 }
244                 return clientId;
245         }
246
247         private String getAAFclientId(HttpServletRequest request) {
248                 Principal principal = request.getUserPrincipal();
249                 if (principal !=null) {
250                         return principal.getName();
251                 } else {
252                         LOGGER.warn("Performing AAF authorization but user has not been provided in request.");
253                         return null;
254                 }
255         }
256
257         private boolean isTopicWithEnforcedAuthorization(String topicName) {
258                 String enfTopicNamespace = getPropertyFromAJSCbean("enforced.topic.name.AAF");
259                 return enfTopicNamespace != null && topicName.startsWith(enfTopicNamespace);
260         }
261
262         int getIntValueOrDefault(String defaultProperty) {
263                 int returnValue;
264                 String defaultValue = getPropertyFromAJSCmap(defaultProperty);
265                 returnValue = StringUtils.isNotEmpty(defaultValue) ? NumberUtils.toInt(defaultValue) : 1;
266                 returnValue = (returnValue <= 0) ? 1 : returnValue;
267                 return returnValue;
268         }
269
270         private String buildPermission(String topicName, String operation) {
271                 String nameSpace = (topicName.indexOf('.') > 1) ?
272                         topicName.substring(0, topicName.lastIndexOf('.')) : "";
273
274                 String mrFactoryValue = getPropertyFromAJSCmap("msgRtr.topicfactory.aaf");
275                 return mrFactoryValue + nameSpace + "|" + operation;
276         }
277
278         /**
279          * @param dmaapContext
280          * @param topicName
281          * @throws ConfigDbException
282          * @throws IOException
283          * @throws TopicExistsException
284          * @throws CambriaApiException
285          * @throws AccessDeniedException
286          */
287         @Override
288         public void deleteTopic(DMaaPContext dmaapContext, String topicName) throws IOException, ConfigDbException,
289                         CambriaApiException, TopicExistsException, DMaaPAccessDeniedException, AccessDeniedException {
290                 LOGGER.info(" Deleting topic " + topicName);
291                 authorizeClient(dmaapContext, topicName, "destroy");
292                 final Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
293                 if (topic == null) {
294                         LOGGER.error("Failed to delete topic. Topic [" + topicName + "] does not exist.");
295                         throw new TopicExistsException("Failed to delete topic. Topic [" + topicName + "] does not exist.");
296                 }
297                 // metabroker.deleteTopic(topicName);
298                 LOGGER.info("Topic [" + topicName + "] deleted successfully. Sending response.");
299                 respondOk(dmaapContext, "Topic [" + topicName + "] deleted successfully");
300         }
301
302         /**
303          * 
304          * @param dmaapContext
305          * @return
306          */
307         DMaaPKafkaMetaBroker getMetaBroker(DMaaPContext dmaapContext) {
308                 return (DMaaPKafkaMetaBroker) dmaapContext.getConfigReader().getfMetaBroker();
309         }
310
311         /**
312          * @param dmaapContext
313          * @param topicName
314          * @throws ConfigDbException
315          * @throws IOException
316          * @throws TopicExistsException
317          * 
318          */
319         @Override
320         public void getPublishersByTopicName(DMaaPContext dmaapContext, String topicName)
321                         throws ConfigDbException, IOException, TopicExistsException {
322                 LOGGER.info("Retrieving list of all the publishers for topic " + topicName);
323                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
324                 if (topic == null) {
325                         LOGGER.error("Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
326                         throw new TopicExistsException(
327                                         "Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
328                 }
329                 final NsaAcl acl = topic.getWriterAcl();
330                 LOGGER.info("Returning list of all the publishers for topic " + topicName + ". Sending response.");
331                 respondOk(dmaapContext, aclToJson(acl));
332         }
333
334         /**
335          * 
336          * @param acl
337          * @return
338          */
339         private static JSONObject aclToJson(NsaAcl acl) {
340                 final JSONObject o = new JSONObject();
341                 if (acl == null) {
342                         o.put("enabled", false);
343                         o.put("users", new JSONArray());
344                 } else {
345                         o.put("enabled", acl.isActive());
346
347                         final JSONArray a = new JSONArray();
348                         for (String user : acl.getUsers()) {
349                                 a.put(user);
350                         }
351                         o.put("users", a);
352                 }
353                 return o;
354         }
355
356         /**
357          * @param dmaapContext
358          * @param topicName
359          */
360         @Override
361         public void getConsumersByTopicName(DMaaPContext dmaapContext, String topicName)
362                         throws IOException, ConfigDbException, TopicExistsException {
363                 LOGGER.info("Retrieving list of all the consumers for topic " + topicName);
364                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
365                 if (topic == null) {
366                         LOGGER.error("Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
367                         throw new TopicExistsException(
368                                         "Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
369                 }
370                 final NsaAcl acl = topic.getReaderAcl();
371                 LOGGER.info("Returning list of all the consumers for topic " + topicName + ". Sending response.");
372                 respondOk(dmaapContext, aclToJson(acl));
373
374         }
375
376         /**
377          * 
378          * @param t
379          * @return
380          */
381         static JSONObject topicToJson(Topic t) {
382                 final JSONObject o = new JSONObject();
383
384                 o.put("name", t.getName());
385                 o.put("description", t.getDescription());
386                 o.put("owner", t.getOwner());
387                 o.put("readerAcl", aclToJson(t.getReaderAcl()));
388                 o.put("writerAcl", aclToJson(t.getWriterAcl()));
389
390                 return o;
391         }
392
393         /**
394          * @param dmaapContext
395          *                      @param topicName @param producerId @throws
396          *            ConfigDbException @throws IOException @throws
397          *            TopicExistsException @throws AccessDeniedException @throws
398          * 
399          */
400         @Override
401         public void permitPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
402                         throws AccessDeniedException, ConfigDbException, TopicExistsException {
403                 LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
404                 final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
405                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
406                 if (null == topic) {
407                         LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic. Topic [" + topicName
408                                         + "] does not exist.");
409                         throw new TopicExistsException("Failed to permit write access to producer [" + producerId
410                                         + "] for topic. Topic [" + topicName + "] does not exist.");
411                 }
412                 if (isUseCustomAcls()) {
413                         topic.permitWritesFromUser(producerId, user);
414                         LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic [" + topicName
415                                 + "]. Sending response.");
416                 } else {
417                         LOGGER.info("Ignoring acl update");
418                 }
419                 respondOk(dmaapContext, "Write access has been granted to publisher.");
420         }
421
422         /**
423          * @param dmaapContext
424          * @param topicName
425          * @param producerId
426          * @throws ConfigDbException
427          * @throws IOException
428          * @throws TopicExistsException
429          * @throws AccessDeniedException
430          * @throws DMaaPAccessDeniedException
431          * 
432          */
433         @Override
434         public void denyPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
435                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
436                         DMaaPAccessDeniedException {
437                 LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
438                 final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
439                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
440                 if (null == topic) {
441                         LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic. Topic [" + topicName
442                                         + "] does not exist.");
443                         throw new TopicExistsException("Failed to revoke write access to producer [" + producerId
444                                         + "] for topic. Topic [" + topicName + "] does not exist.");
445                 }
446                 if (isUseCustomAcls()) {
447                         topic.denyWritesFromUser(producerId, user);
448                         LOGGER.info("Write access has been revoked to producer [" + producerId + "] for topic [" + topicName
449                                 + "]. Sending response.");
450                 } else {
451                         LOGGER.info("Ignoring acl update");
452                 }
453                 respondOk(dmaapContext, "Write access has been revoked for publisher.");
454         }
455
456         /**
457          * @param dmaapContext
458          * @param topicName
459          * @param consumerId
460          * @throws DMaaPAccessDeniedException
461          */
462         @Override
463         public void permitConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
464                         throws AccessDeniedException, ConfigDbException, TopicExistsException {
465
466                 LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
467                 final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
468                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
469                 if (null == topic) {
470                         LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
471                                         + "] does not exist.");
472                         throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
473                                         + "] for topic. Topic [" + topicName + "] does not exist.");
474                 }
475                 if (isUseCustomAcls()) {
476                         topic.permitReadsByUser(consumerId, user);
477                         LOGGER.info("Read access has been granted to consumer [" + consumerId + "] for topic [" + topicName
478                                 + "]. Sending response.");
479                 } else {
480                         LOGGER.info("Ignoring acl update");
481                 }
482                 respondOk(dmaapContext,
483                                 "Read access has been granted for consumer [" + consumerId + "] for topic [" + topicName + "].");
484         }
485
486         /**
487          * @param dmaapContext
488          * @param topicName
489          * @param consumerId
490          * @throws DMaaPAccessDeniedException
491          */
492         @Override
493         public void denyConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
494                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
495                         DMaaPAccessDeniedException {
496
497                 LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
498                 final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
499                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
500                 if (null == topic) {
501                         LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
502                                         + "] does not exist.");
503                         throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
504                                         + "] for topic. Topic [" + topicName + "] does not exist.");
505                 }
506                 if (isUseCustomAcls()) {
507                         topic.denyReadsByUser(consumerId, user);
508                         LOGGER.info("Read access has been revoked to consumer [" + consumerId + "] for topic [" + topicName
509                                 + "]. Sending response.");
510                 } else {
511                         LOGGER.info("Ignoring acl update");
512                 }
513                 respondOk(dmaapContext,
514                                 "Read access has been revoked for consumer [" + consumerId + "] for topic [" + topicName + "].");
515
516         }
517
518 }