read topic_url via Data streams 50/85050/1
authorpwielebs <piotr.wielebski@nokia.com>
Thu, 11 Apr 2019 11:18:07 +0000 (13:18 +0200)
committerpwielebs <piotr.wielebski@nokia.com>
Thu, 11 Apr 2019 11:24:38 +0000 (13:24 +0200)
Change-Id: Ibabbeb3fa9b327dafbbb81e58980f8978a509d7e
Issue-ID: DCAEGEN2-1360
Signed-off-by: pwielebs <piotr.wielebski@nokia.com>
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsContentParser.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTask.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTask.java
prh-app-server/src/test/resources/correct_config.json
prh-app-server/src/test/resources/flattened_configuration.json

index e6bcda5..f19eb3e 100644 (file)
@@ -23,8 +23,13 @@ package org.onap.dcaegen2.services.prh.configuration;
 import com.google.common.reflect.TypeToken;
 import com.google.gson.Gson;
 import com.google.gson.JsonObject;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
 import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration;
 import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.ImmutableAaiClientConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
@@ -32,6 +37,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.Immutabl
 
 import java.util.Map;
 
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamWithName;
+
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 8/21/18
  */
@@ -40,7 +47,12 @@ class CbsContentParser {
     private static final String SECURITY_TRUST_STORE_PASS_PATH = "security.trustStorePasswordPath";
     private static final String SECURITY_KEY_STORE_PATH = "security.keyStorePath";
     private static final String SECURITY_KEY_STORE_PASS_PATH = "security.keyStorePasswordPath";
+    private static final String SECURITY_ENABLE_DMAAP_CERT_AUTH = "security.enableDmaapCertAuth";
     private static final String CONFIG = "config";
+    private static final String PNF_UPDATE = "pnf-update";
+    private static final String PNF_READY = "pnf-ready";
+    private static final String VES_REG_OUTPUT = "ves-reg-output";
+
     private final JsonObject jsonObject;
 
     CbsContentParser(JsonObject jsonObject) {
@@ -48,8 +60,11 @@ class CbsContentParser {
     }
 
     DmaapPublisherConfiguration getDmaapPublisherConfig() {
+        RawDataStream<JsonObject> sink = DataStreams.namedSinks(jsonObject).find(streamWithName(PNF_READY)).get();
+        MessageRouterSink parsedSink = StreamFromGsonParsers.messageRouterSinkParser().unsafeParse(sink);
+
         return new ImmutableDmaapPublisherConfiguration.Builder()
-            .endpointUrl("http://dmaap-mr:2222/events/unauthenticated.PNF_READY")
+            .endpointUrl(parsedSink.topicUrl())
             .dmaapTopicName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapTopicName").getAsString())
             .dmaapUserPassword(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserPassword").getAsString())
             .dmaapPortNumber(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapPortNumber").getAsInt())
@@ -62,13 +77,16 @@ class CbsContentParser {
             .trustStorePasswordPath(jsonObject.get(SECURITY_TRUST_STORE_PASS_PATH).getAsString())
             .keyStorePath(jsonObject.get(SECURITY_KEY_STORE_PATH).getAsString())
             .keyStorePasswordPath(jsonObject.get(SECURITY_KEY_STORE_PASS_PATH).getAsString())
-            .enableDmaapCertAuth(jsonObject.get("security.enableDmaapCertAuth").getAsBoolean())
+            .enableDmaapCertAuth(jsonObject.get(SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean())
             .build();
     }
 
     DmaapPublisherConfiguration getDmaapUpdatePublisherConfig() {
+        RawDataStream<JsonObject> sink = DataStreams.namedSinks(jsonObject).find(streamWithName(PNF_UPDATE)).get();
+        MessageRouterSink parsedSink = StreamFromGsonParsers.messageRouterSinkParser().unsafeParse(sink);
+
         return new ImmutableDmaapPublisherConfiguration.Builder()
-            .endpointUrl("http://dmaap-mr:2222/events/unauthenticated.PNF_READY")
+            .endpointUrl(parsedSink.topicUrl())
             .dmaapTopicName(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapTopicName").getAsString())
             .dmaapUserPassword(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapUserPassword").getAsString())
             .dmaapPortNumber(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapPortNumber").getAsInt())
@@ -81,7 +99,7 @@ class CbsContentParser {
             .trustStorePasswordPath(jsonObject.get(SECURITY_TRUST_STORE_PASS_PATH).getAsString())
             .keyStorePath(jsonObject.get(SECURITY_KEY_STORE_PATH).getAsString())
             .keyStorePasswordPath(jsonObject.get(SECURITY_KEY_STORE_PASS_PATH).getAsString())
-            .enableDmaapCertAuth(jsonObject.get("security.enableDmaapCertAuth").getAsBoolean())
+            .enableDmaapCertAuth(jsonObject.get(SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean())
             .build();
     }
 
@@ -109,8 +127,11 @@ class CbsContentParser {
     }
 
     DmaapConsumerConfiguration getDmaapConsumerConfig() {
+        RawDataStream<JsonObject> source = DataStreams.namedSources(jsonObject).find(streamWithName(VES_REG_OUTPUT)).get();
+        MessageRouterSource parsedSource = StreamFromGsonParsers.messageRouterSourceParser().unsafeParse(source);
+
         return new ImmutableDmaapConsumerConfiguration.Builder()
-            .endpointUrl("http://dmaap-mr:2222/events/unauthenticated.VES_PNFREG_OUTPUT")
+            .endpointUrl(parsedSource.topicUrl())
             .timeoutMs(jsonObject.get("dmaap.dmaapConsumerConfiguration.timeoutMs").getAsInt())
             .dmaapHostName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapHostName").getAsString())
             .dmaapUserName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserName").getAsString())
@@ -126,7 +147,7 @@ class CbsContentParser {
             .trustStorePasswordPath(jsonObject.get(SECURITY_TRUST_STORE_PASS_PATH).getAsString())
             .keyStorePath(jsonObject.get(SECURITY_KEY_STORE_PATH).getAsString())
             .keyStorePasswordPath(jsonObject.get(SECURITY_KEY_STORE_PASS_PATH).getAsString())
-            .enableDmaapCertAuth(jsonObject.get("security.enableDmaapCertAuth").getAsBoolean())
+            .enableDmaapCertAuth(jsonObject.get(SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean())
             .build();
     }
 }
\ No newline at end of file
index 35895ab..4b48fa3 100644 (file)
@@ -20,8 +20,6 @@
 
 package org.onap.dcaegen2.services.prh.configuration;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.gson.*;
 import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
@@ -35,13 +33,8 @@ import org.springframework.core.io.Resource;
 import org.springframework.util.StreamUtils;
 
 import javax.annotation.PostConstruct;
-import javax.validation.constraints.NotNull;
-import java.io.*;
-import java.net.URI;
-import java.net.URL;
+import java.io.IOException;
 import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.ServiceLoader;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/9/18
@@ -52,13 +45,6 @@ import java.util.ServiceLoader;
 public abstract class PrhAppConfig implements Config {
     private static final Logger LOGGER = LoggerFactory.getLogger(PrhAppConfig.class);
 
-    private static final String CONFIG = "configs";
-    private static final String AAI = "aai";
-    private static final String DMAAP = "dmaap";
-    private static final String AAI_CONFIG = "aaiClientConfiguration";
-    private static final String DMAAP_CONSUMER = "dmaapConsumerConfiguration";
-    private static final String SECURITY = "security";
-
     AaiClientConfiguration aaiClientConfiguration;
 
     DmaapConsumerConfiguration dmaapConsumerConfiguration;
@@ -99,51 +85,4 @@ public abstract class PrhAppConfig implements Config {
     public DmaapPublisherConfiguration getDmaapUpdatePublisherConfiguration() {
         return dmaapUpdatePublisherConfiguration;
     }
-
-
-    private DmaapPublisherConfiguration deserializeDmaapPublisherConfiguration(
-            final String dmaapProducerType,
-            final GsonBuilder gsonBuilder,
-            final JsonElement rootElement) {
-         return deserializeType(gsonBuilder, concatenateJsonObjects(
-                rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(DMAAP)
-                        .getAsJsonObject(dmaapProducerType),
-                rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
-                DmaapPublisherConfiguration.class);
-    }
-
-    private void deserializeDmaapConsumerConfiguration(GsonBuilder gsonBuilder, JsonElement rootElement) {
-        dmaapConsumerConfiguration = deserializeType(gsonBuilder, concatenateJsonObjects(
-                rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(DMAAP)
-                        .getAsJsonObject(DMAAP_CONSUMER),
-                rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
-                DmaapConsumerConfiguration.class);
-    }
-
-    private void deserializeAaiConfiguration(GsonBuilder gsonBuilder, JsonElement rootElement) {
-        aaiClientConfiguration = deserializeType(gsonBuilder, concatenateJsonObjects(
-                rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(AAI).getAsJsonObject(AAI_CONFIG),
-                rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
-                AaiClientConfiguration.class);
-    }
-
-    JsonElement getJsonElement(JsonParser parser, InputStream inputStream) {
-        return parser.parse(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
-    }
-
-    private JsonObject concatenateJsonObjects(JsonObject target, JsonObject source) {
-        source.entrySet()
-                .forEach(entry -> target.add(entry.getKey(), entry.getValue()));
-        return target;
-    }
-
-    private <T> T deserializeType(@NotNull GsonBuilder gsonBuilder, @NotNull JsonObject jsonObject,
-                                  @NotNull Class<T> type) {
-        try {
-            return gsonBuilder.create().fromJson(jsonObject, type);
-        } catch (JsonSyntaxException e) {
-            LOGGER.warn("Failed to parse JSON={}", jsonObject, e);
-            return null;
-        }
-    }
 }
index 8a6fbf0..4bb5a31 100644 (file)
 
 package org.onap.dcaegen2.services.prh.tasks;
 
-import org.onap.dcaegen2.services.prh.configuration.Config;
 import org.onap.dcaegen2.services.prh.exceptions.AaiNotFoundException;
 import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
 import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.model.AaiJsonBodyBuilderImpl;
 import org.onap.dcaegen2.services.prh.model.utils.HttpUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.AaiHttpClientFactory;
 import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.AaiHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.patch.AaiHttpPatchClient;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.model.AaiModel;
 import org.slf4j.Logger;
@@ -39,7 +33,6 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Mono;
-import reactor.netty.http.client.HttpClientResponse;
 
 
 /**
index dec783f..f1b900a 100644 (file)
@@ -23,6 +23,7 @@ package org.onap.dcaegen2.services.prh.tasks;
 import org.onap.dcaegen2.services.sdk.rest.services.model.AaiModel;
 import reactor.core.publisher.Mono;
 
+@FunctionalInterface
 public interface AaiQueryTask {
     Mono<Boolean> execute(final AaiModel aaiModel);
 }
index d0b8187..12e1314 100644 (file)
@@ -23,6 +23,7 @@ package org.onap.dcaegen2.services.prh.tasks;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import reactor.core.publisher.Mono;
 
+@FunctionalInterface
 public interface BbsActionsTask {
     Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel);
 }
index ec3ebee..7ec2f74 100644 (file)
     "security.enableDmaapCertAuth":false,
     "streams_publishes":{
       "pnf-update":{
+        "type": "message_router",
         "dmaap_info":{
           "topic_url":"http://dmaap-mr:2222/events/unauthenticated.PNF_UPDATE"
         }
       },
       "pnf-ready":{
+        "type": "message_router",
         "dmaap_info":{
           "topic_url":"http://dmaap-mr:2222/events/unauthenticated.PNF_READY"
         }
@@ -65,6 +67,7 @@
     },
     "streams_subscribes":{
       "ves-reg-output":{
+        "type": "message_router",
         "dmaap_info":{
           "topic_url":"http://dmaap-mr:2222/events/unauthenticated.VES_PNFREG_OUTPUT"
         }
index 7eb140b..9997f58 100644 (file)
     "security.enableDmaapCertAuth":false,
     "streams_publishes":{
       "pnf-update":{
+        "type": "message_router",
         "dmaap_info":{
           "topic_url":"http://dmaap-mr:2222/events/unauthenticated.PNF_UPDATE"
         }
       },
       "pnf-ready":{
+        "type": "message_router",
         "dmaap_info":{
           "topic_url":"http://dmaap-mr:2222/events/unauthenticated.PNF_READY"
         }
@@ -67,6 +69,7 @@
     },
     "streams_subscribes":{
       "ves-reg-output":{
+        "type": "message_router",
         "dmaap_info":{
           "topic_url":"http://dmaap-mr:2222/events/unauthenticated.VES_PNFREG_OUTPUT"
         }