[PIE-991]Create Audit Topics in CtxAggr 97/73297/3
authorLeigh, Phillip (pl876u) <phillip.leigh@amdocs.com>
Wed, 21 Nov 2018 21:43:06 +0000 (16:43 -0500)
committerLeigh, Phillip (pl876u) <phillip.leigh@amdocs.com>
Thu, 22 Nov 2018 16:47:03 +0000 (11:47 -0500)
Issue-ID: LOG-811
Change-Id: I763cfb90b399514c5930539a1b71e762cb43bc6d
Signed-off-by: Leigh, Phillip (pl876u) <phillip.leigh@amdocs.com>
config/application.properties
src/main/docker/Dockerfile
src/main/java/org/onap/pomba/contextaggregator/config/TransportConfig.java
src/main/java/org/onap/pomba/contextaggregator/exception/ContextAggregatorError.java
src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java
src/test/java/org/onap/pomba/contextaggregator/config/TransportConfigTest.java

index 8418dce..9dd9da8 100644 (file)
@@ -14,6 +14,9 @@ transport.consume.timeout=15000
 transport.consume.msglimit=1000
 transport.consume.batchsize=8
 transport.consume.type=HTTPAUTH
+transport.message-router.apiKey=apiKeyTemp123
+transport.message-router.apiSecret=apiSecretTemp123
+transport.message-router.requiredPombaTopics=POA-AUDIT-INIT,POA-AUDIT-RESULT,POA-RULE-VALIDATION,POA-AUDIT-TEST-ONLY
 
 transport.publish.host=<replace_with_host_or_ip>
 transport.publish.port=<replace>
index ad64f1d..1cd5e65 100644 (file)
@@ -15,6 +15,6 @@ ADD startService.sh $MICROSERVICE_HOME/bin/
 RUN chmod 755 $MICROSERVICE_HOME/config/*
 RUN chmod 755 $MICROSERVICE_HOME/lib/*
 RUN chmod 755 $MICROSERVICE_HOME/bin/*
-RUN apk --no-cache add curl
+RUN apk --no-cache add curl
 
 CMD ["/opt/app/bin/startService.sh"]
index 12f0cc8..1532f43 100644 (file)
  */
 package org.onap.pomba.contextaggregator.config;
 
+import java.util.Collection;
 import java.util.Properties;
+
 import org.onap.pomba.contextaggregator.publisher.EventPublisherFactory;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+
 import com.att.nsa.mr.client.MRClientFactory;
 import com.att.nsa.mr.client.MRConsumer;
+import com.att.nsa.mr.client.MRTopicManager;
 import com.att.nsa.mr.client.impl.MRConsumerImpl;
 
 @Configuration
 public class TransportConfig {
-
     @Bean
     public MRConsumer consumer(@Value("${transport.consume.host}") String host,
             @Value("${transport.consume.port}") String port, @Value("${transport.consume.topic}") String topic,
@@ -38,6 +41,7 @@ public class TransportConfig {
             @Value("${transport.consume.timeout}") int timeout, @Value("${transport.consume.batchsize}") int batchSize,
             @Value("${transport.consume.msglimit}") int msgLimit, @Value("${transport.consume.type}") String type) {
 
+
         String hostStr = host + ":" + port;
 
         final MRConsumer consumer = MRClientFactory.createConsumer(hostStr, topic, motsid, pass, consumerGroup,
@@ -50,6 +54,25 @@ public class TransportConfig {
         return consumer;
     }
 
+    @Bean
+    public MRTopicManager messageRouterTopicMgr (@Value("${transport.consume.host}") String host,
+            @Value("${transport.consume.port}") String port,
+            @Value("${transport.message-router.apiKey}") String apiKey,
+            @Value("${transport.message-router.apiSecret}") String apiSecret
+            ) {
+
+        String hostStr = host + ":" + port;
+        // Verify if all topics ()
+        Collection<String> hostSet = java.util.Arrays.asList(hostStr);
+        MRTopicManager mgr = MRClientFactory.createTopicManager(hostSet, apiKey, apiSecret);
+        return mgr;
+    }
+
+    @Bean
+    public String messageRouterRequiredPombaTopicList(@Value("${transport.message-router.requiredPombaTopics}") String requiredPombaTopics) {
+        return requiredPombaTopics;
+    }
+
     @Bean
     public EventPublisherFactory publisherFactory(@Value("${transport.publish.host}") String host,
             @Value("${transport.publish.port}") String port, @Value("${transport.publish.topic}") String topic,
@@ -63,4 +86,5 @@ public class TransportConfig {
         return new EventPublisherFactory(hostStr, topic, motsid, pass, batchSize, maxAge, delay, type, partition,
                 retries);
     }
+
 }
index cee1fda..396f684 100644 (file)
@@ -30,7 +30,8 @@ public enum ContextAggregatorError {
     PUBLISHER_SEND_ERROR("CA-106", "Error encountered when publishing messages: {0}"),
     PUBLISHER_CLOSE_ERROR("CA-107", "Error encountered when closing publisher: {0}"),
     FAILED_TO_PUBLISH_RESULT("CA-108", "Failed to publish model data: {0}"),
-    BUILDER_PROPERTIES_NOT_FOUND("CA-109", "No builder properties were found under location(s): {0}");
+    BUILDER_PROPERTIES_NOT_FOUND("CA-109", "No builder properties were found under location(s): {0}"),
+    FAILED_TO_CREATE_POMBA_TOPICS("CA-110", "Failed to create POMBA Topics: {0}");
 
     private String errorId;
     private String message;
index e2758ab..f79bf96 100644 (file)
@@ -20,16 +20,22 @@ package org.onap.pomba.contextaggregator.service;
 import com.att.aft.dme2.internal.gson.Gson;
 import com.att.aft.dme2.internal.gson.GsonBuilder;
 import com.att.aft.dme2.internal.gson.JsonSyntaxException;
+import com.att.nsa.apiClient.http.HttpException;
 import com.att.nsa.mr.client.MRBatchingPublisher;
+import com.att.nsa.mr.client.MRClientFactory;
 import com.att.nsa.mr.client.MRConsumer;
 import com.att.nsa.mr.client.MRPublisher;
+import com.att.nsa.mr.client.MRTopicManager;
 import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
+
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -46,6 +52,8 @@ import org.onap.pomba.contextaggregator.rest.RestRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
 import org.springframework.stereotype.Component;
 
 @Component
@@ -59,6 +67,12 @@ public class ContextAggregatorProcessor implements Callable<Void> {
     @Autowired
     private MRConsumer consumer;
 
+    @Autowired
+    private MRTopicManager messageRouterTopicMgr;
+
+    @Autowired
+    private String messageRouterRequiredPombaTopicList;
+
     @Autowired
     private EventPublisherFactory publisherFactory;
 
@@ -110,6 +124,8 @@ public class ContextAggregatorProcessor implements Callable<Void> {
 
     @Override
     public Void call() throws Exception {
+        createPombaTopics();
+
         while (true) {
             for (String event : consumer.fetch()) {
                 executor.execute(() -> {
@@ -233,5 +249,33 @@ public class ContextAggregatorProcessor implements Callable<Void> {
         }
     }
 
+    private List<String> getRequiredTopicList(String messageRouterRequiredPombaTopicList) {
+        List<String> pombaTopicList = new ArrayList<String>();
+        String noSpacePombaTopicList = messageRouterRequiredPombaTopicList.replaceAll("\\s", "");
+        String[] pombaTopicStrSet = noSpacePombaTopicList.split(",");
+        for (int i = 0; i < pombaTopicStrSet.length; i++) {
+                pombaTopicList.add(new String(pombaTopicStrSet[i]));
+        }
+        return pombaTopicList;
+    }
+
+    private void createPombaTopics () {
+
+        List<String> requiredTopicList = getRequiredTopicList(messageRouterRequiredPombaTopicList);
+
+        String topicDescription = "create default topic";
+        int partitionCount = 1;
+        int replicationCount = 1;
+
+        for ( String topic_required : requiredTopicList) {
+                try {
+                    messageRouterTopicMgr.createTopic(topic_required, topicDescription, partitionCount, replicationCount);
+                } catch (HttpException e1) {
+                    log.error(ContextAggregatorError.FAILED_TO_CREATE_POMBA_TOPICS.getMessage(e1.getMessage()));
+                } catch (IOException e) {
+                    log.error(ContextAggregatorError.FAILED_TO_CREATE_POMBA_TOPICS.getMessage(e.getMessage()));
+                }
+         }
+    }
 }
 
index af47ff6..fdb2709 100644 (file)
@@ -25,11 +25,17 @@ import org.junit.runner.RunWith;
 import org.onap.pomba.contextaggregator.publisher.EventPublisherFactory;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+import org.springframework.test.context.TestPropertySource;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
+import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
 
 import java.util.Properties;
 
 @RunWith(SpringJUnit4ClassRunner.class)
+@EnableAutoConfiguration(exclude = { DataSourceAutoConfiguration.class, HibernateJpaAutoConfiguration.class })
 @SpringBootTest
+@TestPropertySource(properties = { "transport.consume.host=http://localhost", "transport.consume.port=8080" })
 public class TransportConfigTest
 {
     TransportConfig transportConfig = new TransportConfig();