Merge "TopicService authorization check refactor"
authorsunil unnava <su622b@att.com>
Wed, 10 Jul 2019 17:55:26 +0000 (17:55 +0000)
committerGerrit Code Review <gerrit@onap.org>
Wed, 10 Jul 2019 17:55:26 +0000 (17:55 +0000)
pom.xml
src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java
src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java
version.properties

diff --git a/pom.xml b/pom.xml
index 2ef8c0c..85b4fe6 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -14,7 +14,7 @@
        <modelVersion>4.0.0</modelVersion>
        <groupId>org.onap.dmaap.messagerouter.msgrtr</groupId>
        <artifactId>msgrtr</artifactId>
-       <version>1.1.17-SNAPSHOT</version>
+       <version>1.1.18-SNAPSHOT</version>
        <packaging>jar</packaging>
        <name>dmaap-messagerouter-msgrtr</name>
        <description>Message Router - Restful interface built for kafka</description>
@@ -22,7 +22,7 @@
        <parent>
                <groupId>org.onap.oparent</groupId>
                <artifactId>oparent</artifactId>
-               <version>1.2.1</version>
+               <version>2.0.0</version>
        </parent>
 
        <properties>
                        <!-- <phase>package</phase> bind to the packaging phase <goals> <goal>single</goal> 
                                </goals> </execution> </executions> </plugin> -->
                        <!-- -->
+                         <plugin>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <version>2.17</version>
+        <dependencies>
+          <dependency>
+            <groupId>org.onap.oparent</groupId>
+            <artifactId>checkstyle</artifactId>
+            <version>2.0.0</version>
+          </dependency>
+        </dependencies>
+        <executions>
+          <execution>
+            <id>onap-license</id>
+            <goals>
+              <goal>check</goal>
+            </goals>
+            <phase>process-sources</phase>
+            <configuration>
+              <configLocation>onap-checkstyle/check-license.xml</configLocation>
+              <includeResources>false</includeResources>
+              <includeTestSourceDirectory>true</includeTestSourceDirectory>
+              <includeTestResources>false</includeTestResources>
+              <sourceDirectory>${project.build.sourceDirectory}</sourceDirectory>
+              <excludes>
+              </excludes>
+              <consoleOutput>true</consoleOutput>
+              <failsOnViolation>false</failsOnViolation>
+            </configuration>
+          </execution>
+          <execution>
+            <id>onap-java-style</id>
+            <goals>
+              <goal>check</goal>
+            </goals>
+            <phase>none</phase>
+            <configuration>
+              <!-- Use Google Java Style Guide:
+                   https://github.com/checkstyle/checkstyle/blob/master/src/main/resources/google_checks.xml
+                   with minor changes -->
+              <configLocation>onap-checkstyle/onap-java-style.xml</configLocation>
+              <!-- <sourceDirectory> is needed so that checkstyle ignores the generated sources directory -->
+              <sourceDirectory>${project.build.sourceDirectory}</sourceDirectory>
+              <includeResources>true</includeResources>
+              <includeTestSourceDirectory>true</includeTestSourceDirectory>
+              <includeTestResources>true</includeTestResources>
+              <excludes>
+              </excludes>
+              <consoleOutput>true</consoleOutput>
+              <failsOnViolation>false</failsOnViolation>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
                        <plugin>
                                <groupId>org.apache.maven.plugins</groupId>
                                <artifactId>maven-site-plugin</artifactId>
index 4bdd9f3..5f616c7 100644 (file)
@@ -26,21 +26,19 @@ import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
-
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.json.JSONException;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.util.StringUtils;
 import org.onap.dmaap.dmf.mr.backends.Publisher;
 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
 import org.onap.dmaap.dmf.mr.utils.Utils;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.util.StringUtils;
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
 import com.att.nsa.drumlin.till.nv.rrNvReadable;
+import kafka.common.FailedToSendMessageException;
 
 
 
@@ -84,7 +82,7 @@ public class KafkaPublisher implements Publisher {
                
                
                props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-                props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+               props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
                
                
@@ -100,7 +98,7 @@ public class KafkaPublisher implements Publisher {
         */
        @Override
        public void sendMessage(String topic, message msg) throws IOException{
-               final List<message> msgs = new LinkedList<message>();
+               final List<message> msgs = new LinkedList<>();
                msgs.add(msg);
                sendMessages(topic, msgs);
        }
@@ -168,29 +166,18 @@ public class KafkaPublisher implements Publisher {
                }
        } */
        @Override
-       public void sendMessagesNew(String topic, List<? extends message> msgs)
-                       throws IOException {
-               log.info("sending " + msgs.size() + " events to [" + topic + "]");
-try{
-               final List<ProducerRecord<String, String>> kms = new ArrayList<>(msgs.size());
-                       for (message o : msgs) {
-                       
-                       final ProducerRecord<String, String> data = new ProducerRecord<>(topic, o.getKey(), o.toString());
-                       
-               
-               try {
-
-                       fProducer.send(data);
-
-               } catch (Exception excp) {
-                       log.error("Failed to send message(s) to topic [" + topic + "].", excp);
-                       throw new Exception(excp.getMessage(), excp);
-               }
-       }
-               
-       }catch(Exception e){}
-}
-       //private final rrNvReadable fSettings;
+    public void sendMessagesNew(String topic, List<? extends message> msgs) throws IOException {
+        log.info("sending " + msgs.size() + " events to [" + topic + "]");
+        try {
+            for (message o : msgs) {
+                final ProducerRecord<String, String> data =
+                        new ProducerRecord<>(topic, o.getKey(), o.toString());
+                fProducer.send(data);
+            }
+        } catch (Exception e) {
+            log.error("Failed to send message(s) to topic [" + topic + "].", e);
+        }
+    }
 
        
        private Producer<String, String> fProducer;
@@ -203,14 +190,11 @@ try{
    * @param defVal
    */
        private void transferSetting(Properties props, String key, String defVal) {
-               String kafka_prop= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka." + key);
-               if (StringUtils.isEmpty(kafka_prop)) kafka_prop=defVal;
-               //props.put(key, settings.getString("kafka." + key, defVal));
-               props.put(key, kafka_prop);
+               String kafkaProp= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka." + key);
+               if (StringUtils.isEmpty(kafkaProp)) kafkaProp=defVal;
+               props.put(key, kafkaProp);
        }
 
-       //private static final Logger log = LoggerFactory.getLogger(KafkaPublisher.class);
-
        private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaPublisher.class);
 
        @Override
@@ -218,6 +202,4 @@ try{
                // TODO Auto-generated method stub
                
        }
-
-       
-}
\ No newline at end of file
+}
index 03a1bd5..d7fa28b 100644 (file)
@@ -222,28 +222,23 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                }
 
                // create via kafka
-               
-                       try
-                       {
-                               final NewTopic topicRequest = new NewTopic ( topic, partitions, new Integer(replicas).shortValue() );
-                               final CreateTopicsResult ctr = fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) );
-                               final KafkaFuture<Void> ctrResult = ctr.all ();
-                               ctrResult.get ();
-                               // underlying Kafka topic created. now setup our API info
-                               return createTopicEntry ( topic, desc, ownerApiKey, transactionEnabled );
-                       }
-                       catch ( InterruptedException e )
-                       {
-                               
-                               log.warn ( "Execution of describeTopics timed out." );
-                               throw new ConfigDbException ( e );
-                       }
-                       catch ( ExecutionException e )
-                       {
-                               
-                               log.warn ( "Execution of describeTopics failed: " + e.getCause ().getMessage (), e.getCause () );
-                               throw new ConfigDbException ( e.getCause () );
-                       }
+
+        try {
+            final NewTopic topicRequest =
+                    new NewTopic(topic, partitions, (short)replicas);
+            final CreateTopicsResult ctr =
+                    fKafkaAdminClient.createTopics(Arrays.asList(topicRequest));
+            final KafkaFuture<Void> ctrResult = ctr.all();
+            ctrResult.get();
+            // underlying Kafka topic created. now setup our API info
+            return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled);
+        } catch (InterruptedException e) {
+            log.warn("Execution of describeTopics timed out.");
+            throw new ConfigDbException(e);
+        } catch (ExecutionException e) {
+            log.warn("Execution of describeTopics failed: " + e.getCause().getMessage(), e);
+            throw new ConfigDbException(e.getCause());
+        }
                
        }
 
@@ -348,11 +343,17 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                                                // owner (or it's empty), null is okay -- this is for existing or implicitly
                                                // created topics.
                                                JSONObject readers = o.optJSONObject ( "readers" );
-                                               if ( readers == null && fOwner.length () > 0 ) readers = kEmptyAcl;
+                                               if ( readers == null && fOwner.length () > 0 )
+                                               {
+                                                   readers = kEmptyAcl;
+                                               }
                                                fReaders =  fromJson ( readers );
 
                                                JSONObject writers = o.optJSONObject ( "writers" );
-                                               if ( writers == null && fOwner.length () > 0 ) writers = kEmptyAcl;
+                                               if ( writers == null && fOwner.length () > 0 )
+                                               {
+                                                   writers = kEmptyAcl;
+                                               }
                                                fWriters = fromJson ( writers );
                }
                
@@ -445,11 +446,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                                log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName );
        
                        }
-                       catch ( ConfigDbException x )
-                       {
-                               throw x;
-                       }
-                       catch ( AccessDeniedException x )
+                       catch ( ConfigDbException | AccessDeniedException x )
                        {
                                throw x;
                        }
index 8d21b0b..dba8f85 100644 (file)
@@ -27,7 +27,7 @@
 
 major=1
 minor=1
-patch=17
+patch=18
 
 base_version=${major}.${minor}.${patch}