Update dcae SDK from 1.1.6 to 1.4.2 68/111668/6
authorRemigiusz Janeczek <remigiusz.janeczek@nokia.com>
Tue, 18 Aug 2020 10:48:19 +0000 (12:48 +0200)
committerRemigiusz Janeczek <remigiusz.janeczek@nokia.com>
Tue, 25 Aug 2020 14:24:17 +0000 (16:24 +0200)
Bump project version from 1.4.2 to 1.4.3
Update deprecated calls to JsonParser
Make logs always go to file and console

Issue-ID: DCAEGEN2-2267
Signed-off-by: Remigiusz Janeczek <remigiusz.janeczek@nokia.com>
Change-Id: Ib8d7f82b3daf03ca327581c9a5dc4f6f27a20141

30 files changed:
datafile-app-server/config/application.yaml
datafile-app-server/pom.xml
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java [deleted file]
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java [deleted file]
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
datafile-app-server/src/main/resources/logback-spring.xml
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfigurationTest.java [deleted file]
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClientTest.java [deleted file]
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
datafile-app-server/src/test/resources/cert.jks [new file with mode: 0755]
datafile-app-server/src/test/resources/datafile_endpoints_test.json
datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json
datafile-app-server/src/test/resources/jks.pass [new file with mode: 0755]
datafile-app-server/src/test/resources/trust.jks [new file with mode: 0755]
datafile-app-server/src/test/resources/trust.pass [new file with mode: 0755]
pom.xml
version.properties

index 69771e2..d3cced8 100644 (file)
@@ -21,6 +21,7 @@ logging:
     org.springframework.data: ERROR
     org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR
     org.onap.dcaegen2.collectors.datafile: WARN
+    org.onap.dcaegen2: WARN
   file: /var/log/ONAP/application.log
 app:
   filepath: config/datafile_endpoints_test.json
index b06ed82..896f5d2 100644 (file)
@@ -1,7 +1,7 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
   ~ ============LICENSE_START=======================================================
-  ~ Copyright (C) 2018 NOKIA Intellectual Property, 2018-2020 Nordix Foundation. All rights reserved.
+  ~ Copyright (C) 2018-2020 NOKIA Intellectual Property, 2018-2020 Nordix Foundation. All rights reserved.
   ~ Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
   ~ ================================================================================ 
   ~ Licensed under the Apache License, Version 2.0 (the "License");
@@ -25,7 +25,7 @@
   <parent>
     <groupId>org.onap.dcaegen2.collectors</groupId>
     <artifactId>datafile</artifactId>
-    <version>1.4.2-SNAPSHOT</version>
+    <version>1.4.3-SNAPSHOT</version>
   </parent>
 
   <groupId>org.onap.dcaegen2.collectors.datafile</groupId>
       <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
       <artifactId>dmaap-client</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.onap.dcaegen2.services.sdk.security</groupId>
+      <artifactId>ssl</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.httpcomponents</groupId>
       <artifactId>httpasyncclient</artifactId>
       <artifactId>spring-boot-configuration-processor</artifactId>
       <optional>true</optional>
     </dependency>
+    <dependency>
+      <groupId>org.immutables</groupId>
+      <artifactId>value</artifactId>
+    </dependency>
 
     <!-- Actuator dependencies -->
     <dependency>
index c257cee..d933e33 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
  * in compliance with the License. You may obtain a copy of the License at
@@ -41,8 +41,9 @@ import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticConte
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.CbsClientConfigurationException;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,12 +72,12 @@ public class AppConfig {
 
     private static final Logger logger = LoggerFactory.getLogger(AppConfig.class);
 
+    @Value("#{systemEnvironment}")
+    Properties systemEnvironment;
     private ConsumerConfiguration dmaapConsumerConfiguration;
     private Map<String, PublisherConfiguration> publishingConfigurations;
     private FtpesConfig ftpesConfiguration;
     private SftpConfig sftpConfiguration;
-    @Value("#{systemEnvironment}")
-    Properties systemEnvironment;
     private Disposable refreshConfigTask = null;
 
     @NotEmpty
@@ -102,8 +103,8 @@ public class AppConfig {
     }
 
     Flux<AppConfig> createRefreshTask(Map<String, String> context) {
-        return getEnvironment(systemEnvironment, context) //
-            .flatMap(this::createCbsClient) //
+        return createCbsClientConfiguration()
+            .flatMap(this::createCbsClient)
             .flatMapMany(this::periodicConfigurationUpdates) //
             .map(this::parseCloudConfig) //
             .onErrorResume(this::onErrorResume);
@@ -175,19 +176,25 @@ public class AppConfig {
         return Mono.empty();
     }
 
-    Mono<EnvProperties> getEnvironment(Properties systemEnvironment, Map<String, String> context) {
-        return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context);
+    Mono<CbsClientConfiguration> createCbsClientConfiguration() {
+        try {
+            return Mono.just(CbsClientConfiguration.fromEnvironment());
+        } catch (CbsClientConfigurationException e) {
+            return Mono.error(e);
+        }
     }
 
-    Mono<CbsClient> createCbsClient(EnvProperties env) {
-        return CbsClientFactory.createCbsClient(env);
+    Mono<CbsClient> createCbsClient(CbsClientConfiguration cbsClientConfiguration) {
+        return CbsClientFactory.createCbsClient(cbsClientConfiguration);
     }
 
     private AppConfig parseCloudConfig(JsonObject configurationObject) {
         try {
-            CloudConfigParser parser = new CloudConfigParser(configurationObject, systemEnvironment);
-            setConfiguration(parser.getDmaapConsumerConfig(), parser.getDmaapPublisherConfigurations(),
-                parser.getFtpesConfig(), parser.getSftpConfig());
+            CloudConfigParser parser =
+                new CloudConfigParser(configurationObject, systemEnvironment);
+            setConfiguration(parser.getConsumerConfiguration(),
+                parser.getDmaapPublisherConfigurations(), parser.getFtpesConfig(),
+                parser.getSftpConfig());
             logConfig();
         } catch (DatafileTaskException e) {
             logger.error("Could not parse configuration {}", e.toString(), e);
@@ -207,8 +214,7 @@ public class AppConfig {
         ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
 
         try (InputStream inputStream = createInputStream(filepath)) {
-            JsonParser parser = new JsonParser();
-            JsonObject rootObject = getJsonElement(parser, inputStream).getAsJsonObject();
+            JsonObject rootObject = getJsonElement(inputStream).getAsJsonObject();
             if (rootObject == null) {
                 throw new JsonSyntaxException("Root is not a json object");
             }
@@ -228,8 +234,8 @@ public class AppConfig {
         this.sftpConfiguration = sftpConfig;
     }
 
-    JsonElement getJsonElement(JsonParser parser, InputStream inputStream) {
-        return parser.parse(new InputStreamReader(inputStream));
+    JsonElement getJsonElement(InputStream inputStream) {
+        return JsonParser.parseReader(new InputStreamReader(inputStream));
     }
 
     InputStream createInputStream(@NotNull String filepath) throws IOException {
index a86a32b..6ace4aa 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -21,16 +21,32 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Properties;
-import java.util.Set;
 
 import javax.validation.constraints.NotNull;
 
+import io.vavr.collection.Stream;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
+import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys;
+import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeysStore;
+import org.onap.dcaegen2.services.sdk.security.ssl.Passwords;
+import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys;
 
 /**
  * Parses the cloud configuration.
@@ -51,6 +67,12 @@ public class CloudConfigParser {
     private static final String CBS_PROPERTY_SFTP_SECURITY_STRICT_HOST_KEY_CHECKING =
         "sftp.security.strictHostKeyChecking";
 
+    private static final String DMAAP_CONSUMER_CONFIGURATION_CONSUMER_GROUP = "dmaap.dmaapConsumerConfiguration.consumerGroup";
+    private static final String DMAAP_CONSUMER_CONFIGURATION_CONSUMER_ID = "dmaap.dmaapConsumerConfiguration.consumerId";
+    private static final String DMAAP_CONSUMER_CONFIGURATION_TIMEOUT_MS = "dmaap.dmaapConsumerConfiguration.timeoutMs";
+    private static final int EXPECTED_NUMBER_OF_SOURCE_TOPICS = 1;
+    private static final int FIRST_SOURCE_INDEX = 0;
+
     private final Properties systemEnvironment;
 
     private final JsonObject jsonObject;
@@ -78,7 +100,7 @@ public class CloudConfigParser {
 
             PublisherConfiguration cfg = ImmutablePublisherConfiguration.builder() //
                 .publishUrl(getAsString(feedConfig, "publish_url")) //
-                .passWord(getAsString(feedConfig, "password")) //
+                .password(getAsString(feedConfig, "password")) //
                 .userName(getAsString(feedConfig, "username")) //
                 .trustStorePath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH)) //
                 .trustStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) //
@@ -98,24 +120,51 @@ public class CloudConfigParser {
      * Get the consumer configuration.
      *
      * @return the consumer configuration.
-     * @throws DatafileTaskException if a member of the configuration is missing.
+     * @throws DatafileTaskException if the configuration is invalid.
      */
-    public @NotNull ConsumerConfiguration getDmaapConsumerConfig() throws DatafileTaskException {
-        JsonObject consumerCfg = jsonObject.get("streams_subscribes").getAsJsonObject();
-        Set<Entry<String, JsonElement>> topics = consumerCfg.entrySet();
-        if (topics.size() != 1) {
-            throw new DatafileTaskException("Invalid configuration, number of topic must be one, config: " + topics);
+    public @NotNull ConsumerConfiguration getConsumerConfiguration() throws DatafileTaskException {
+        try {
+            MessageRouterSubscriberConfig messageRouterSubscriberConfig = getMessageRouterSubscriberConfig();
+            MessageRouterSubscribeRequest messageRouterSubscribeRequest = getMessageRouterSubscribeRequest();
+            MessageRouterSubscriber messageRouterSubscriber = DmaapClientFactory.createMessageRouterSubscriber(messageRouterSubscriberConfig);
+            return new ConsumerConfiguration(messageRouterSubscriberConfig, messageRouterSubscriber, messageRouterSubscribeRequest);
+        } catch (Exception e) {
+            throw new DatafileTaskException("Could not parse message router consumer configuration", e);
         }
-        JsonObject topic = topics.iterator().next().getValue().getAsJsonObject();
-        JsonObject dmaapInfo = get(topic, "dmaap_info").getAsJsonObject();
-        String topicUrl = getAsString(dmaapInfo, "topic_url");
-
-        return ImmutableConsumerConfiguration.builder().topicUrl(topicUrl)
-            .trustStorePath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH))
-            .trustStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH))
-            .keyStorePath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH))
-            .keyStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH))
-            .enableDmaapCertAuth(get(jsonObject, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
+    }
+
+    private MessageRouterSubscriberConfig getMessageRouterSubscriberConfig() throws DatafileTaskException {
+        return ImmutableMessageRouterSubscriberConfig.builder()
+            .securityKeys(isDmaapCertAuthEnabled(jsonObject) ? createSecurityKeys() : null)
+            .build();
+    }
+
+    private SecurityKeys createSecurityKeys() throws DatafileTaskException {
+        return ImmutableSecurityKeys.builder()
+            .keyStore(ImmutableSecurityKeysStore.of(getAsPath(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH)))
+            .keyStorePassword(Passwords.fromPath(getAsPath(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH)))
+            .trustStore(ImmutableSecurityKeysStore.of(getAsPath(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH)))
+            .trustStorePassword(Passwords.fromPath(getAsPath(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)))
+            .build();
+    }
+
+    private boolean isDmaapCertAuthEnabled(JsonObject config) {
+        return config.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean();
+    }
+
+    private MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() throws DatafileTaskException {
+        Stream<RawDataStream<JsonObject>> sources = DataStreams.namedSources(jsonObject);
+        if (sources.size() != EXPECTED_NUMBER_OF_SOURCE_TOPICS) {
+            throw new DatafileTaskException("Invalid configuration, number of topic must be one, config: " + sources);
+        }
+        RawDataStream<JsonObject> source = sources.get(FIRST_SOURCE_INDEX);
+        MessageRouterSource parsedSource = StreamFromGsonParsers.messageRouterSourceParser().unsafeParse(source);
+
+        return ImmutableMessageRouterSubscribeRequest.builder()
+            .consumerGroup(getAsString(jsonObject, DMAAP_CONSUMER_CONFIGURATION_CONSUMER_GROUP))
+            .sourceDefinition(parsedSource)
+            .consumerId(getAsString(jsonObject, DMAAP_CONSUMER_CONFIGURATION_CONSUMER_ID))
+            .timeout(Duration.ofMillis(get(jsonObject, DMAAP_CONSUMER_CONFIGURATION_TIMEOUT_MS).getAsLong()))
             .build();
     }
 
@@ -176,4 +225,8 @@ public class CloudConfigParser {
         return get(obj, memberName).getAsJsonObject();
     }
 
+    private static @NotNull Path getAsPath(JsonObject obj, String dmaapSecurityKeyStorePath) throws DatafileTaskException {
+        return Paths.get(getAsString(obj, dmaapSecurityKeyStorePath));
+    }
+
 }
index 4db7963..89fcf1c 100644 (file)
 /*-
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018,2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2020 NOKIA Intellectual Property.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
  */
 
 package org.onap.dcaegen2.collectors.datafile.configuration;
 
-import java.net.MalformedURLException;
-import java.net.URL;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
 
-import org.immutables.gson.Gson;
-import org.immutables.value.Value;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
+public class ConsumerConfiguration {
 
-@Value.Immutable
-@Value.Style(redactedMask = "####")
-@Gson.TypeAdapters
-public abstract class ConsumerConfiguration {
-    @Value.Redacted
-    public abstract String topicUrl();
+    private final MessageRouterSubscriberConfig messageRouterSubscriberConfig;
+    private final MessageRouterSubscriber messageRouterSubscriber;
+    private final MessageRouterSubscribeRequest messageRouterSubscribeRequest;
 
-    public abstract String trustStorePath();
-
-    public abstract String trustStorePasswordPath();
-
-    public abstract String keyStorePath();
-
-    public abstract String keyStorePasswordPath();
-
-    public abstract Boolean enableDmaapCertAuth();
-
-    /**
-     * Gets the configuration in the SDK version.
-     *
-     * @return a <code>DmaapConsumerConfiguration</code> representing the configuration.
-     *
-     * @throws DatafileTaskException if something is wrong with the topic URL.
-     */
-    public DmaapConsumerConfiguration toDmaap() throws DatafileTaskException {
-        try {
-            URL url = new URL(topicUrl());
-            String passwd = "";
-            String userName = "";
-            if (url.getUserInfo() != null) {
-                String[] userInfo = url.getUserInfo().split(":");
-                userName = userInfo[0];
-                passwd = userInfo[1];
-            }
-            String urlPath = url.getPath();
-            DmaapConsumerUrlPath path = parseDmaapUrlPath(urlPath);
-
-            return new ImmutableDmaapConsumerConfiguration.Builder() //
-                .endpointUrl(topicUrl()) //
-                .dmaapContentType("application/json") //
-                .dmaapPortNumber(url.getPort()) //
-                .dmaapHostName(url.getHost()) //
-                .dmaapTopicName(path.dmaapTopicName) //
-                .dmaapProtocol(url.getProtocol()) //
-                .dmaapUserName(userName) //
-                .dmaapUserPassword(passwd) //
-                .trustStorePath(this.trustStorePath()) //
-                .trustStorePasswordPath(this.trustStorePasswordPath()) //
-                .keyStorePath(this.keyStorePath()) //
-                .keyStorePasswordPath(this.keyStorePasswordPath()) //
-                .enableDmaapCertAuth(this.enableDmaapCertAuth()) //
-                .consumerId(path.consumerId) //
-                .consumerGroup(path.consumerGroup) //
-                .timeoutMs(-1) //
-                .messageLimit(-1) //
-                .build();
-        } catch (MalformedURLException e) {
-            throw new DatafileTaskException("Could not parse the URL", e);
-        }
+    public ConsumerConfiguration(MessageRouterSubscriberConfig messageRouterSubscriberConfig,
+        MessageRouterSubscriber messageRouterSubscriber, MessageRouterSubscribeRequest messageRouterSubscribeRequest) {
+        this.messageRouterSubscriberConfig = messageRouterSubscriberConfig;
+        this.messageRouterSubscriber = messageRouterSubscriber;
+        this.messageRouterSubscribeRequest = messageRouterSubscribeRequest;
     }
 
-    private class DmaapConsumerUrlPath {
-        final String dmaapTopicName;
-        final String consumerGroup;
-        final String consumerId;
-
-        DmaapConsumerUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) {
-            this.dmaapTopicName = dmaapTopicName;
-            this.consumerGroup = consumerGroup;
-            this.consumerId = consumerId;
-        }
+    public MessageRouterSubscriber getMessageRouterSubscriber() {
+        return messageRouterSubscriber;
     }
 
-    private DmaapConsumerUrlPath parseDmaapUrlPath(String urlPath) throws DatafileTaskException {
-        String[] tokens = urlPath.split("/"); // /events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12
-        if (tokens.length != 5) {
-            throw new DatafileTaskException("The path has incorrect syntax: " + urlPath);
-        }
+    public MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() {
+        return messageRouterSubscribeRequest;
+    }
 
-        final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // /events/unauthenticated.VES_NOTIFICATION_OUTPUT
-        final String consumerGroup = tokens[3]; // OpenDcae-c12
-        final String consumerId = tokens[4]; // C12
-        return new DmaapConsumerUrlPath(dmaapTopicName, consumerGroup, consumerId);
+    public MessageRouterSubscriberConfig getMessageRouterSubscriberConfig() {
+        return messageRouterSubscriberConfig;
     }
 
+    @Override
+    public String toString() {
+        return "ConsumerConfiguration{" + "securityKeys=" + messageRouterSubscriberConfig.securityKeys()
+            + ", consumerGroup=" + messageRouterSubscribeRequest.consumerGroup() + ", consumerID="
+            + messageRouterSubscribeRequest.consumerId() + '}';
+    }
 }
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
deleted file mode 100644 (file)
index ad5f648..0000000
+++ /dev/null
@@ -1,91 +0,0 @@
-/*-
- * ============LICENSE_START========================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
- * =================================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END==========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.configuration;
-
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-import reactor.core.publisher.Mono;
-
-/**
- * Handling the Consul connection.
- *
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">PrzemysÅ‚aw WÄ…sala</a> on 9/19/18
- */
-class EnvironmentProcessor {
-
-    private static final int DEFAULT_CONSUL_PORT = 8500;
-    private static final Logger logger = LoggerFactory.getLogger(EnvironmentProcessor.class);
-
-    private EnvironmentProcessor() {
-    }
-
-    static Mono<EnvProperties> readEnvironmentVariables(Properties systemEnvironment, Map<String, String> contextMap) {
-        MDC.setContextMap(contextMap);
-        logger.trace("Loading configuration from system environment variables");
-        EnvProperties envProperties;
-        try {
-            envProperties = ImmutableEnvProperties.builder() //
-                .consulHost(getConsulHost(systemEnvironment)) //
-                .consulPort(getConsultPort(systemEnvironment)) //
-                .cbsName(getConfigBindingService(systemEnvironment)) //
-                .appName(getService(systemEnvironment)) //
-                .build();
-        } catch (EnvironmentLoaderException e) {
-            return Mono.error(e);
-        }
-        logger.trace("Evaluated environment system variables {}", envProperties);
-        return Mono.just(envProperties);
-    }
-
-    private static String getConsulHost(Properties systemEnvironments) throws EnvironmentLoaderException {
-        return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_HOST"))
-            .orElseThrow(() -> new EnvironmentLoaderException("$CONSUL_HOST environment has not been defined"));
-    }
-
-    private static Integer getConsultPort(Properties systemEnvironments) {
-        return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_PORT")) //
-            .map(Integer::valueOf) //
-            .orElseGet(EnvironmentProcessor::getDefaultPortOfConsul);
-    }
-
-    private static String getConfigBindingService(Properties systemEnvironments) throws EnvironmentLoaderException {
-        return Optional.ofNullable(systemEnvironments.getProperty("CONFIG_BINDING_SERVICE")) //
-            .orElseThrow(
-                () -> new EnvironmentLoaderException("$CONFIG_BINDING_SERVICE environment has not been defined"));
-    }
-
-    private static String getService(Properties systemEnvironments) throws EnvironmentLoaderException {
-        return Optional
-            .ofNullable(Optional.ofNullable(systemEnvironments.getProperty("HOSTNAME"))
-                .orElse(systemEnvironments.getProperty("SERVICE_NAME")))
-            .orElseThrow(() -> new EnvironmentLoaderException(
-                "Neither $HOSTNAME/$SERVICE_NAME have not been defined as system environment"));
-    }
-
-    private static Integer getDefaultPortOfConsul() {
-        logger.warn("$CONSUL_PORT variable will be set to default port {}", DEFAULT_CONSUL_PORT);
-        return DEFAULT_CONSUL_PORT;
-    }
-}
index d7451bd..fa7d784 100644 (file)
@@ -1,6 +1,7 @@
 /*-
  * ============LICENSE_START======================================================================
  * Copyright (C) 2018,2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2020 Nokia. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
  * in compliance with the License. You may obtain a copy of the License at
 
 package org.onap.dcaegen2.collectors.datafile.configuration;
 
-import java.net.MalformedURLException;
-import java.net.URL;
 
 import org.immutables.gson.Gson;
 import org.immutables.value.Value;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+
 
 @Value.Immutable
 @Value.Style(redactedMask = "####")
@@ -36,7 +34,7 @@ public interface PublisherConfiguration {
     String userName();
 
     @Value.Redacted
-    String passWord();
+    String password();
 
     String trustStorePath();
 
@@ -50,30 +48,4 @@ public interface PublisherConfiguration {
 
     String changeIdentifier();
 
-    /**
-     * Get the publisher configuration in SDK format.
-     *
-     * @return a <code>DmaapPublisherConfiguration</code> contining the publisher configuration.
-     * @throws MalformedURLException if the publish URL is malformed.
-     */
-    default DmaapPublisherConfiguration toDmaap() throws MalformedURLException {
-        URL url = new URL(publishUrl());
-        String urlPath = url.getPath();
-
-        return new ImmutableDmaapPublisherConfiguration.Builder() //
-            .endpointUrl(publishUrl()) //
-            .dmaapContentType("application/octet-stream") //
-            .dmaapPortNumber(url.getPort()) //
-            .dmaapHostName(url.getHost()) //
-            .dmaapTopicName(urlPath) //
-            .dmaapProtocol(url.getProtocol()) //
-            .dmaapUserName(this.userName()) //
-            .dmaapUserPassword(this.passWord()) //
-            .trustStorePath(this.trustStorePath()) //
-            .trustStorePasswordPath(this.trustStorePasswordPath()) //
-            .keyStorePath(this.keyStorePath()) //
-            .keyStorePasswordPath(this.keyStorePasswordPath()) //
-            .enableDmaapCertAuth(this.enableDmaapCertAuth()) //
-            .build();
-    }
 }
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java
deleted file mode 100644 (file)
index 5a8806b..0000000
+++ /dev/null
@@ -1,91 +0,0 @@
-/*-
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.service;
-
-import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE;
-import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.SERVICE_NAME;
-
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapCustomConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-import org.springframework.http.HttpHeaders;
-import org.springframework.web.reactive.function.client.ClientRequest;
-import org.springframework.web.reactive.function.client.ClientResponse;
-import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
-import org.springframework.web.reactive.function.client.WebClient;
-import org.springframework.web.reactive.function.client.WebClient.Builder;
-import reactor.core.publisher.Mono;
-
-/**
- * Web client for the DMaaP MessageRouter.
- *
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">PrzemysÅ‚aw WÄ…sala</a> on 7/4/18
- */
-public class DmaapWebClient {
-
-    private static final Logger logger = LoggerFactory.getLogger(DmaapWebClient.class);
-
-    private String contentType;
-
-    /**
-     * Creating DmaapReactiveWebClient passing to them basic DmaapConfig.
-     *
-     * @param dmaapCustomConfig - configuration object
-     * @return DmaapReactiveWebClient
-     */
-    public DmaapWebClient fromConfiguration(DmaapCustomConfig dmaapCustomConfig) {
-        this.contentType = dmaapCustomConfig.dmaapContentType();
-        return this;
-    }
-
-    /**
-     * Construct Reactive WebClient with appropriate settings.
-     *
-     * @return WebClient
-     */
-    public WebClient build() {
-        Builder webClientBuilder = WebClient.builder() //
-            .defaultHeader(HttpHeaders.CONTENT_TYPE, contentType) //
-            .filter(getRequestFilter()) //
-            .filter(getResponseFilter());
-        return webClientBuilder.build();
-    }
-
-    private ExchangeFilterFunction getResponseFilter() {
-        return ExchangeFilterFunction.ofResponseProcessor(this::logResponse);
-    }
-
-    Mono<ClientResponse> logResponse(ClientResponse clientResponse) {
-        MDC.put(RESPONSE_CODE, String.valueOf(clientResponse.statusCode()));
-        logger.trace("Response Status {}", clientResponse.statusCode());
-        MDC.remove(RESPONSE_CODE);
-        return Mono.just(clientResponse);
-    }
-
-    private ExchangeFilterFunction getRequestFilter() {
-        return ExchangeFilterFunction.ofRequestProcessor(this::logRequest);
-    }
-
-    Mono<ClientRequest> logRequest(ClientRequest clientRequest) {
-        MDC.put(SERVICE_NAME, String.valueOf(clientRequest.url()));
-        logger.trace("Request: {} {}", clientRequest.method(), clientRequest.url());
-        logger.trace("HTTP request headers: {}", clientRequest.headers());
-        MDC.remove(SERVICE_NAME);
-        return Mono.just(clientRequest);
-    }
-}
index eed0f0b..708865f 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START========================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
  * ==================================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -96,18 +96,17 @@ public class JsonMessageParser {
      * @return a <code>Flux</code> containing messages.
      */
 
-    public Flux<FileReadyMessage> getMessagesFromJson(Mono<JsonElement> rawMessage) {
-        return rawMessage.flatMapMany(this::createMessageData);
+    public Flux<FileReadyMessage> getMessagesFromJson(Flux<JsonElement> rawMessage) {
+        return rawMessage.flatMap(this::createMessageData);
     }
 
     Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
-        JsonParser jsonParser = new JsonParser();
         if (element.isJsonPrimitive()) {
-            return Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject());
+            return Optional.of(JsonParser.parseString(element.getAsString()).getAsJsonObject());
         } else if (element.isJsonObject()) {
             return Optional.of((JsonObject) element);
         } else {
-            return Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
+            return Optional.of(JsonParser.parseString(element.toString()).getAsJsonObject());
         }
     }
 
@@ -117,9 +116,9 @@ public class JsonMessageParser {
     }
 
     /**
-     * Extract info from string and create a Flux of {@link FileReadyMessage}.
+     * Extract info from jsonElement and create a Flux of {@link FileReadyMessage}.
      *
-     * @param rawMessage - results from DMaaP
+     * @param jsonElement - result from DMaaP
      * @return reactive Flux of FileReadyMessages
      */
     private Flux<FileReadyMessage> createMessageData(JsonElement jsonElement) {
index a46e17b..9de37e8 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -37,10 +37,10 @@ import org.apache.http.client.methods.HttpUriRequest;
 import org.apache.http.conn.ssl.NoopHostnameVerifier;
 import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
 import org.apache.http.ssl.SSLContextBuilder;
+import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.http.HttpAsyncClientBuilderWrapper;
 import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
@@ -61,14 +61,14 @@ public class DmaapProducerHttpClient {
 
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
-    private final DmaapPublisherConfiguration configuration;
+    private final PublisherConfiguration configuration;
 
     /**
      * Constructor DmaapProducerReactiveHttpClient.
      *
      * @param dmaapPublisherConfiguration - DMaaP producer configuration object
      */
-    public DmaapProducerHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
+    public DmaapProducerHttpClient(PublisherConfiguration dmaapPublisherConfiguration) {
         this.configuration = dmaapPublisherConfiguration;
     }
 
@@ -131,7 +131,7 @@ public class DmaapProducerHttpClient {
      * @param request the request to add credentials to.
      */
     public void addUserCredentialsToHead(HttpUriRequest request) {
-        String plainCreds = configuration.dmaapUserName() + ":" + configuration.dmaapUserPassword();
+        String plainCreds = configuration.userName() + ":" + configuration.password();
         byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1);
         byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes);
         String base64Creds = new String(base64CredsBytes);
index 066983a..0780e18 100644 (file)
@@ -1,6 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2019 Nordix Foundation.
+ *  Copyright (C) 2020 Nokia. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 package org.onap.dcaegen2.collectors.datafile.tasks;
 
 import com.google.gson.JsonElement;
-
-import java.util.Optional;
-
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration;
 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
 import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
 
 /**
  * Component used to get messages from the MessageRouter.
@@ -46,18 +40,14 @@ public class DMaaPMessageConsumer {
     private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumer.class);
     private final AppConfig datafileAppConfig;
     private final JsonMessageParser jsonMessageParser;
-    private final ConsumerReactiveHttpClientFactory httpClientFactory;
 
     public DMaaPMessageConsumer(AppConfig datafileAppConfig) {
-        this(datafileAppConfig, new JsonMessageParser(),
-            new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory()));
+        this(datafileAppConfig, new JsonMessageParser());
     }
 
-    protected DMaaPMessageConsumer(AppConfig datafileAppConfig, JsonMessageParser jsonMessageParser,
-        ConsumerReactiveHttpClientFactory httpClientFactory) {
+    protected DMaaPMessageConsumer(AppConfig datafileAppConfig, JsonMessageParser jsonMessageParser) {
         this.datafileAppConfig = datafileAppConfig;
         this.jsonMessageParser = jsonMessageParser;
-        this.httpClientFactory = httpClientFactory;
     }
 
     /**
@@ -68,21 +58,20 @@ public class DMaaPMessageConsumer {
     public Flux<FileReadyMessage> getMessageRouterResponse() {
         logger.trace("getMessageRouterResponse called");
         try {
-            DMaaPConsumerReactiveHttpClient client = createHttpClient();
-            return consume((client.getDMaaPConsumerResponse(Optional.empty())));
-        } catch (DatafileTaskException e) {
+            ConsumerConfiguration dmaapConsumerConfiguration = datafileAppConfig.getDmaapConsumerConfiguration();
+            MessageRouterSubscriber messageRouterSubscriber =
+                dmaapConsumerConfiguration.getMessageRouterSubscriber();
+            Flux<JsonElement> responseElements =
+                messageRouterSubscriber.getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest());
+            return consume(responseElements);
+        } catch (Exception e) {
             logger.warn("Unable to get response from message router", e);
             return Flux.empty();
         }
     }
 
-    private Flux<FileReadyMessage> consume(Mono<JsonElement> message) {
-        logger.trace("consume called with arg {}", message);
-        return jsonMessageParser.getMessagesFromJson(message);
-    }
-
-    public DMaaPConsumerReactiveHttpClient createHttpClient() throws DatafileTaskException {
-        return httpClientFactory.create(datafileAppConfig.getDmaapConsumerConfiguration().toDmaap());
+    private Flux<FileReadyMessage> consume(Flux<JsonElement> messages) {
+        return jsonMessageParser.getMessagesFromJson(messages);
     }
 
 }
index cfaf175..8b86440 100644 (file)
@@ -1,6 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2019 Nordix Foundation.
+ *  Copyright (C) 2020 Nokia. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -24,7 +25,6 @@ import com.google.gson.JsonElement;
 import com.google.gson.JsonParser;
 
 import java.io.File;
-import java.net.MalformedURLException;
 import java.net.URI;
 import java.nio.file.Path;
 import java.time.Duration;
@@ -42,7 +42,6 @@ import org.onap.dcaegen2.collectors.datafile.model.JsonSerializer;
 import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
 import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
 import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
@@ -113,7 +112,7 @@ public class DataRouterPublisher {
     private void prepareHead(FilePublishInformation publishInfo, HttpPut put) throws DatafileTaskException {
 
         put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE);
-        JsonElement metaData = new JsonParser().parse(JsonSerializer.createJsonBodyForDataRouter(publishInfo));
+        JsonElement metaData = JsonParser.parseString(JsonSerializer.createJsonBodyForDataRouter(publishInfo));
         put.addHeader(X_DMAAP_DR_META, metaData.toString());
         URI uri = new DefaultUriBuilderFactory(
             datafileAppConfig.getPublisherConfiguration(publishInfo.getChangeIdentifier()).publishUrl()) //
@@ -155,12 +154,8 @@ public class DataRouterPublisher {
     }
 
     DmaapProducerHttpClient resolveClient(String changeIdentifier) throws DatafileTaskException {
-        try {
-            DmaapPublisherConfiguration cfg = resolveConfiguration(changeIdentifier).toDmaap();
-            return new DmaapProducerHttpClient(cfg);
-        } catch (MalformedURLException e) {
-            throw new DatafileTaskException("Cannot resolve producer client", e);
-        }
+        PublisherConfiguration publisherConfiguration = resolveConfiguration(changeIdentifier);
+        return new DmaapProducerHttpClient(publisherConfiguration);
 
     }
 }
index 037803b..a9973cf 100644 (file)
@@ -1,6 +1,7 @@
 /*-
 * ============LICENSE_START=======================================================
 *  Copyright (C) 2019 Nordix Foundation.
+*  Copyright (C) 2020 Nokia. All rights reserved.
 * ================================================================================
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
@@ -21,7 +22,6 @@
 package org.onap.dcaegen2.collectors.datafile.tasks;
 
 import java.io.InputStream;
-import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.time.Duration;
@@ -113,12 +113,7 @@ public class PublishedChecker {
         return appConfig.getPublisherConfiguration(changeIdentifier);
     }
 
-    protected DmaapProducerHttpClient resolveClient(PublisherConfiguration publisherConfig)
-        throws DatafileTaskException {
-        try {
-            return new DmaapProducerHttpClient(publisherConfig.toDmaap());
-        } catch (MalformedURLException e) {
-            throw new DatafileTaskException("Cannot create published checker client", e);
-        }
+    protected DmaapProducerHttpClient resolveClient(PublisherConfiguration publisherConfig) {
+        return new DmaapProducerHttpClient(publisherConfig);
     }
 }
index 42a6fea..eba0a6c 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
  * in compliance with the License. You may obtain a copy of the License at
@@ -302,8 +302,7 @@ public class ScheduledTasks {
 
     private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> context) {
         MDC.setContextMap(context);
-        logger.error("Polling for file ready message failed, exception: {}, config: {}", exception.toString(),
-            this.applicationConfiguration.getDmaapConsumerConfiguration());
+        logger.error("Polling for file ready message failed, exception: {}", exception.toString());
         return Flux.empty();
     }
 
index 1b9818d..89405f2 100644 (file)
     |%thread
     |%n"/>
 
-  <springProfile name="dev">
-    <appender name="CONSOLE" target="SYSTEM_OUT" class="ch.qos.logback.core.ConsoleAppender">
-       <encoder>
-         <pattern>${defaultPattern}</pattern>
-       </encoder>
-    </appender>
+  <appender name="CONSOLE" target="SYSTEM_OUT" class="ch.qos.logback.core.ConsoleAppender">
+    <encoder>
+      <pattern>${defaultPattern}</pattern>
+    </encoder>
+  </appender>
 
-    <appender name="ROLLING-FILE"
-      class="ch.qos.logback.core.rolling.RollingFileAppender">
-      <encoder>
-        <pattern>${defaultPattern}</pattern>
-      </encoder>
-      <file>${logPath}/${outputFilename}.log</file>
-      <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
-        <fileNamePattern>${outputFilename}.%d{yyyy-MM-dd}.%i.log</fileNamePattern>
-        <MaxFileSize>${maxFileSize}</MaxFileSize>
-        <MaxHistory>${maxHistory}</MaxHistory>
-        <TotalSizeCap>${totalSizeCap}</TotalSizeCap>
-      </rollingPolicy>
-    </appender>
-    <root level="ERROR">
-      <appender-ref ref="CONSOLE"/>
-      <appender-ref ref="ROLLING-FILE"/>
-    </root>
-  </springProfile>
+  <appender name="ROLLING-FILE"
+            class="ch.qos.logback.core.rolling.RollingFileAppender">
+    <encoder>
+      <pattern>${defaultPattern}</pattern>
+    </encoder>
+    <file>${logPath}/${outputFilename}.log</file>
+    <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+      <fileNamePattern>${outputFilename}.%d{yyyy-MM-dd}.%i.log</fileNamePattern>
+      <MaxFileSize>${maxFileSize}</MaxFileSize>
+      <MaxHistory>${maxHistory}</MaxHistory>
+      <TotalSizeCap>${totalSizeCap}</TotalSizeCap>
+    </rollingPolicy>
+  </appender>
+  <root level="ERROR">
+    <appender-ref ref="CONSOLE"/>
+    <appender-ref ref="ROLLING-FILE"/>
+  </root>
 
-  <springProfile name="prod">
-    <appender name="ROLLING-FILE"
-      class="ch.qos.logback.core.rolling.RollingFileAppender">
-      <encoder>
-        <pattern>${defaultPattern}</pattern>
-      </encoder>
-      <file>${logPath}/${outputFilename}.log</file>
-      <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
-        <fileNamePattern>${outputFilename}.%d{yyyy-MM-dd}.%i.gz</fileNamePattern>
-        <MaxFileSize>${maxFileSize}</MaxFileSize>
-        <MaxHistory>${maxHistory}</MaxHistory>
-        <TotalSizeCap>${totalSizeCap}</TotalSizeCap>
-      </rollingPolicy>
-    </appender>
-
-    <root level="ERROR">
-      <appender-ref ref="ROLLING-FILE"/>
-    </root>
-  </springProfile>
-</configuration>
\ No newline at end of file
+</configuration>
index d0f02d6..dc8a122 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
  * in compliance with the License. You may obtain a copy of the License at
 
 package org.onap.dcaegen2.collectors.datafile.configuration;
 
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.read.ListAppender;
-
 import com.google.common.base.Charsets;
 import com.google.common.io.Resources;
 import com.google.gson.JsonElement;
@@ -37,16 +25,6 @@ import com.google.gson.JsonIOException;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import com.google.gson.JsonSyntaxException;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-import java.util.Properties;
-
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -54,15 +32,33 @@ import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
 import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
-
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 /**
  * Tests the AppConfig.
  *
@@ -73,50 +69,18 @@ public class AppConfigTest {
 
     public static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
 
-    public static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG = //
-        new ImmutableDmaapConsumerConfiguration.Builder() //
-            .endpointUrl(
-                "http://dradmin:dradmin@localhost:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12")
-            .timeoutMs(-1) //
-            .dmaapHostName("localhost") //
-            .dmaapUserName("dradmin") //
-            .dmaapUserPassword("dradmin") //
-            .dmaapTopicName("events/unauthenticated.VES_NOTIFICATION_OUTPUT") //
-            .dmaapPortNumber(2222) //
-            .dmaapContentType("application/json") //
-            .messageLimit(-1) //
-            .dmaapProtocol("http") //
-            .consumerId("C12") //
-            .consumerGroup("OpenDcae-c12") //
-            .trustStorePath("trustStorePath") //
-            .trustStorePasswordPath("trustStorePasswordPath") //
-            .keyStorePath("keyStorePath") //
-            .keyStorePasswordPath("keyStorePasswordPath") //
-            .enableDmaapCertAuth(true) //
-            .build();
-
-    public static final ConsumerConfiguration CORRECT_CONSUMER_CONFIG = ImmutableConsumerConfiguration.builder() //
-        .topicUrl(
-            "http://dradmin:dradmin@localhost:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12")
-        .trustStorePath("trustStorePath") //
-        .trustStorePasswordPath("trustStorePasswordPath") //
-        .keyStorePath("keyStorePath") //
-        .keyStorePasswordPath("keyStorePasswordPath") //
-        .enableDmaapCertAuth(true) //
-        .build();
-
     private static final PublisherConfiguration CORRECT_PUBLISHER_CONFIG = //
         ImmutablePublisherConfiguration.builder() //
             .publishUrl("https://localhost:3907/publish/1") //
             .logUrl("https://localhost:3907/feedlog/1") //
-            .trustStorePath("trustStorePath") //
-            .trustStorePasswordPath("trustStorePasswordPath") //
-            .keyStorePath("keyStorePath") //
-            .keyStorePasswordPath("keyStorePasswordPath") //
+            .trustStorePath("src/test/resources/trust.jks") //
+            .trustStorePasswordPath("src/test/resources/trust.pass") //
+            .keyStorePath("src/test/resources/cert.jks") //
+            .keyStorePasswordPath("src/test/resources/jks.pass") //
             .enableDmaapCertAuth(true) //
             .changeIdentifier("PM_MEAS_FILES") //
             .userName("CYE9fl40") //
-            .passWord("izBJD8nLjawq0HMG") //
+            .password("izBJD8nLjawq0HMG") //
             .build();
 
     private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION = //
@@ -127,35 +91,10 @@ public class AppConfigTest {
             .trustedCaPasswordPath("/src/test/resources/ftp.jks.pass") //
             .build();
 
-    private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG = //
-        new ImmutableDmaapPublisherConfiguration.Builder() //
-            .endpointUrl("https://localhost:3907/publish/1") //
-            .dmaapTopicName("/publish/1") //
-            .dmaapUserPassword("izBJD8nLjawq0HMG") //
-            .dmaapPortNumber(3907) //
-            .dmaapProtocol("https") //
-            .dmaapContentType("application/octet-stream") //
-            .dmaapHostName("localhost") //
-            .dmaapUserName("CYE9fl40") //
-            .trustStorePath("trustStorePath") //
-            .trustStorePasswordPath("trustStorePasswordPath") //
-            .keyStorePath("keyStorePath") //
-            .keyStorePasswordPath("keyStorePasswordPath") //
-            .enableDmaapCertAuth(true) //
-            .build();
-
-    private static EnvProperties properties() {
-        return ImmutableEnvProperties.builder() //
-            .consulHost("host") //
-            .consulPort(123) //
-            .cbsName("cbsName") //
-            .appName("appName") //
-            .build();
-    }
-
     private AppConfig appConfigUnderTest;
     private final Map<String, String> context = MappedDiagnosticContext.initializeTraceContext();
     CbsClient cbsClient = mock(CbsClient.class);
+    CbsClientConfiguration cbsClientConfiguration = mock(CbsClientConfiguration.class);
 
     @BeforeEach
     void setUp() {
@@ -175,13 +114,11 @@ public class AppConfigTest {
 
         ConsumerConfiguration consumerCfg = appConfigUnderTest.getDmaapConsumerConfiguration();
         Assertions.assertNotNull(consumerCfg);
-        assertThat(consumerCfg.toDmaap()).isEqualToComparingFieldByField(CORRECT_DMAAP_CONSUMER_CONFIG);
-        assertThat(consumerCfg).isEqualToComparingFieldByField(CORRECT_CONSUMER_CONFIG);
+        assertThat(consumerCfg).satisfies(this::checkCorrectConsumerConfiguration);
 
         PublisherConfiguration publisherCfg = appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER);
         Assertions.assertNotNull(publisherCfg);
         assertThat(publisherCfg).isEqualToComparingFieldByField(CORRECT_PUBLISHER_CONFIG);
-        assertThat(publisherCfg.toDmaap()).isEqualToComparingFieldByField(CORRECT_DMAAP_PUBLISHER_CONFIG);
 
         FtpesConfig ftpesConfig = appConfigUnderTest.getFtpesConfiguration();
         assertThat(ftpesConfig).isNotNull();
@@ -245,7 +182,7 @@ public class AppConfigTest {
         doReturn(getCorrectJson()).when(appConfigUnderTest).createInputStream(any());
         JsonElement jsonElement = mock(JsonElement.class);
         when(jsonElement.isJsonObject()).thenReturn(false);
-        doReturn(jsonElement).when(appConfigUnderTest).getJsonElement(any(JsonParser.class), any(InputStream.class));
+        doReturn(jsonElement).when(appConfigUnderTest).getJsonElement(any(InputStream.class));
         appConfigUnderTest.loadConfigurationFromFile();
 
         // Then
@@ -266,15 +203,13 @@ public class AppConfigTest {
             .expectSubscription() //
             .verifyComplete(); //
 
-        assertTrue(logAppender.list.toString().contains("$CONSUL_HOST environment has not been defined"));
+        assertTrue(logAppender.list.toString().contains("CbsClientConfigurationException"));
     }
 
     @Test
     public void whenPeriodicConfigRefreshNoConsul() {
-        EnvProperties props = properties();
-        doReturn(Mono.just(props)).when(appConfigUnderTest).getEnvironment(any(), any());
-
-        doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(props);
+        doReturn(Mono.just(cbsClientConfiguration)).when(appConfigUnderTest).createCbsClientConfiguration();
+        doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(cbsClientConfiguration);
         Flux<JsonObject> err = Flux.error(new IOException());
         doReturn(err).when(cbsClient).updates(any(), any(), any());
 
@@ -292,9 +227,8 @@ public class AppConfigTest {
 
     @Test
     public void whenPeriodicConfigRefreshSuccess() throws JsonIOException, JsonSyntaxException, IOException {
-        EnvProperties props = properties();
-        doReturn(Mono.just(props)).when(appConfigUnderTest).getEnvironment(any(), any());
-        doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(props);
+        doReturn(Mono.just(cbsClientConfiguration)).when(appConfigUnderTest).createCbsClientConfiguration();
+        doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(cbsClientConfiguration);
 
         Flux<JsonObject> json = Flux.just(getJsonRootObject());
         doReturn(json).when(cbsClient).updates(any(), any(), any());
@@ -312,10 +246,8 @@ public class AppConfigTest {
 
     @Test
     public void whenPeriodicConfigRefreshSuccess2() throws JsonIOException, JsonSyntaxException, IOException {
-        EnvProperties props = properties();
-        doReturn(Mono.just(props)).when(appConfigUnderTest).getEnvironment(any(), any());
-
-        doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(props);
+        doReturn(Mono.just(cbsClientConfiguration)).when(appConfigUnderTest).createCbsClientConfiguration();
+        doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(cbsClientConfiguration);
 
         Flux<JsonObject> json = Flux.just(getJsonRootObject());
         Flux<JsonObject> err = Flux.error(new IOException()); // no config entry created by the
@@ -334,8 +266,21 @@ public class AppConfigTest {
         Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration());
     }
 
+    private void checkCorrectConsumerConfiguration(ConsumerConfiguration consumerConfiguration) {
+        MessageRouterSubscribeRequest messageRouterSubscribeRequest =
+                consumerConfiguration.getMessageRouterSubscribeRequest();
+        assertThat(messageRouterSubscribeRequest.consumerGroup()).isEqualTo("OpenDcae-c12");
+        assertThat(messageRouterSubscribeRequest.consumerId()).isEqualTo("C12");
+        assertThat(messageRouterSubscribeRequest.sourceDefinition().topicUrl())
+                .isEqualTo("http://localhost:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT");
+        SecurityKeys securityKeys = consumerConfiguration.getMessageRouterSubscriberConfig().securityKeys();
+        assertThat(securityKeys.keyStore().path().toString()).isEqualTo("src/test/resources/cert.jks");
+        assertThat(securityKeys.trustStore().path().toString()).isEqualTo("src/test/resources/trust.jks");
+        assertThat(consumerConfiguration.getMessageRouterSubscriber()).isNotNull();
+    }
+
     private JsonObject getJsonRootObject() throws JsonIOException, JsonSyntaxException, IOException {
-        JsonObject rootObject = (new JsonParser()).parse(new InputStreamReader(getCorrectJson())).getAsJsonObject();
+        JsonObject rootObject = JsonParser.parseReader(new InputStreamReader(getCorrectJson())).getAsJsonObject();
         return rootObject;
     }
 
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfigurationTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfigurationTest.java
deleted file mode 100644 (file)
index bdeb1c1..0000000
+++ /dev/null
@@ -1,101 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- *  Copyright (C) 2019 Nordix Foundation.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.configuration;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-
-public class ConsumerConfigurationTest {
-    @Test
-    public void toDmaapSuccess() throws DatafileTaskException {
-        ConsumerConfiguration configurationUnderTest = ImmutableConsumerConfiguration.builder() //
-            .topicUrl(
-                "http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12")
-            .trustStorePath("") //
-            .trustStorePasswordPath("") //
-            .keyStorePath("") //
-            .keyStorePasswordPath("") //
-            .enableDmaapCertAuth(Boolean.FALSE) //
-            .build();
-
-        DmaapConsumerConfiguration dmaapConsumerConfiguration = configurationUnderTest.toDmaap();
-        assertEquals("http", dmaapConsumerConfiguration.dmaapProtocol());
-        assertEquals("message-router.onap.svc.cluster.local", dmaapConsumerConfiguration.dmaapHostName());
-        assertEquals(Integer.valueOf("2222"), dmaapConsumerConfiguration.dmaapPortNumber());
-        assertEquals("OpenDcae-c12", dmaapConsumerConfiguration.consumerGroup());
-        assertEquals("C12", dmaapConsumerConfiguration.consumerId());
-    }
-
-    @Test
-    public void toDmaapNoUserInfoSuccess() throws DatafileTaskException {
-        ConsumerConfiguration configurationUnderTest = ImmutableConsumerConfiguration.builder() //
-            .topicUrl(
-                "http://message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12")
-            .trustStorePath("") //
-            .trustStorePasswordPath("") //
-            .keyStorePath("") //
-            .keyStorePasswordPath("") //
-            .enableDmaapCertAuth(Boolean.FALSE) //
-            .build();
-
-        DmaapConsumerConfiguration dmaapConsumerConfiguration = configurationUnderTest.toDmaap();
-        assertEquals("http", dmaapConsumerConfiguration.dmaapProtocol());
-        assertEquals("message-router.onap.svc.cluster.local", dmaapConsumerConfiguration.dmaapHostName());
-        assertEquals(Integer.valueOf("2222"), dmaapConsumerConfiguration.dmaapPortNumber());
-        assertEquals("OpenDcae-c12", dmaapConsumerConfiguration.consumerGroup());
-        assertEquals("C12", dmaapConsumerConfiguration.consumerId());
-    }
-
-    @Test
-    public void toDmaapWhenInvalidUrlThrowException() throws DatafileTaskException {
-        ConsumerConfiguration configurationUnderTest = ImmutableConsumerConfiguration.builder() //
-            .topicUrl("//admin:admin@message-router.onap.svc.cluster.local:2222//events/").trustStorePath("") //
-            .trustStorePasswordPath("") //
-            .keyStorePath("") //
-            .keyStorePasswordPath("") //
-            .enableDmaapCertAuth(Boolean.FALSE) //
-            .build();
-
-        DatafileTaskException exception =
-            assertThrows(DatafileTaskException.class, () -> configurationUnderTest.toDmaap());
-        assertEquals("Could not parse the URL", exception.getMessage());
-    }
-
-    @Test
-    public void toDmaapWhenInvalidPathThrowException() throws DatafileTaskException {
-        ConsumerConfiguration configurationUnderTest = ImmutableConsumerConfiguration.builder() //
-            .topicUrl("http://admin:admin@message-router.onap.svc.cluster.local:2222//events/") //
-            .trustStorePath("") //
-            .trustStorePasswordPath("") //
-            .keyStorePath("") //
-            .keyStorePasswordPath("") //
-            .enableDmaapCertAuth(Boolean.FALSE) //
-            .build();
-
-        DatafileTaskException exception =
-            assertThrows(DatafileTaskException.class, () -> configurationUnderTest.toDmaap());
-        assertEquals("The path has incorrect syntax: //events/", exception.getMessage());
-    }
-}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClientTest.java
deleted file mode 100644 (file)
index d4e060f..0000000
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.service;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import ch.qos.logback.classic.Level;
-import ch.qos.logback.classic.spi.ILoggingEvent;
-import ch.qos.logback.core.read.ListAppender;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpMethod;
-import org.springframework.http.HttpStatus;
-import org.springframework.web.reactive.function.client.ClientRequest;
-import org.springframework.web.reactive.function.client.ClientResponse;
-import org.springframework.web.reactive.function.client.WebClient;
-import reactor.core.publisher.Mono;
-
-@ExtendWith(MockitoExtension.class)
-class DmaapWebClientTest {
-
-    @Mock
-    private DmaapConsumerConfiguration dmaapConsumerConfigurationMock;
-
-    @Mock
-    private ClientResponse clientResponseMock;
-
-    @Mock
-    private ClientRequest clientRequesteMock;
-
-    @Test
-    void buildsDMaaPReactiveWebClientProperly() {
-        when(dmaapConsumerConfigurationMock.dmaapContentType()).thenReturn("*/*");
-        WebClient dmaapWebClientUndetTest = new DmaapWebClient() //
-            .fromConfiguration(dmaapConsumerConfigurationMock) //
-            .build();
-
-        verify(dmaapConsumerConfigurationMock, times(1)).dmaapContentType();
-        assertNotNull(dmaapWebClientUndetTest);
-    }
-
-    @Test
-    public void logResponseSuccess() {
-        DmaapWebClient dmaapWebClientUndetTest = new DmaapWebClient();
-
-        when(clientResponseMock.statusCode()).thenReturn(HttpStatus.OK);
-
-        final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapWebClient.class, true);
-        Mono<ClientResponse> logResponse = dmaapWebClientUndetTest.logResponse(clientResponseMock);
-
-        assertEquals(clientResponseMock, logResponse.block());
-
-        assertEquals(Level.TRACE, logAppender.list.get(0).getLevel());
-        assertEquals("Response Status 200 OK", logAppender.list.get(0).getFormattedMessage());
-
-        logAppender.stop();
-    }
-
-    @Test
-    public void logRequestSuccess() throws URISyntaxException {
-        when(clientRequesteMock.url()).thenReturn(new URI("http://test"));
-        when(clientRequesteMock.method()).thenReturn(HttpMethod.GET);
-        HttpHeaders httpHeaders = new HttpHeaders();
-        httpHeaders.add("header", "value");
-        when(clientRequesteMock.headers()).thenReturn(httpHeaders);
-
-        DmaapWebClient dmaapWebClientUndetTest = new DmaapWebClient();
-
-        final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapWebClient.class, true);
-        Mono<ClientRequest> logRequest = dmaapWebClientUndetTest.logRequest(clientRequesteMock);
-
-        assertEquals(clientRequesteMock, logRequest.block());
-
-        assertEquals(Level.TRACE, logAppender.list.get(0).getLevel());
-        assertEquals("Request: GET http://test", logAppender.list.get(0).getFormattedMessage());
-        assertEquals(Level.TRACE, logAppender.list.get(1).getLevel());
-        assertEquals("HTTP request headers: [header:\"value\"]", logAppender.list.get(1).getFormattedMessage());
-
-        logAppender.stop();
-    }
-}
index 8fb8c36..bfb9b13 100644 (file)
@@ -46,6 +46,7 @@ import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
 import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
 import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
@@ -120,11 +121,11 @@ class JsonMessageParserTest {
 
         String parsedString = message.getParsed();
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
-        JsonElement jsonElement = new JsonParser().parse(parsedString);
+        JsonElement jsonElement = JsonParser.parseString(parsedString);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
             .getJsonObjectFromAnArray(jsonElement);
 
-        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
             .expectNext(expectedMessage).verifyComplete();
     }
 
@@ -173,12 +174,12 @@ class JsonMessageParserTest {
         String parsedString = message.getParsed();
         String messageString = "[" + parsedString + "," + parsedString + "]";
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
-        JsonElement jsonElement = new JsonParser().parse(parsedString);
-        JsonElement jsonElement1 = new JsonParser().parse(messageString);
+        JsonElement jsonElement = JsonParser.parseString(parsedString);
+        JsonElement jsonElement1 = JsonParser.parseString(messageString);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
             .getJsonObjectFromAnArray(jsonElement);
 
-        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement1)))
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement1)))
             .expectSubscription().expectNext(expectedMessage).expectNext(expectedMessage).verifyComplete();
     }
 
@@ -200,12 +201,12 @@ class JsonMessageParserTest {
 
         String parsedString = message.getParsed();
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
-        JsonElement jsonElement = new JsonParser().parse(parsedString);
+        JsonElement jsonElement = JsonParser.parseString(parsedString);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
             .getJsonObjectFromAnArray(jsonElement);
 
         final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
-        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
             .expectNextCount(0).verifyComplete();
 
         assertTrue(logAppender.list.toString()
@@ -232,12 +233,12 @@ class JsonMessageParserTest {
 
         String parsedString = message.getParsed();
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
-        JsonElement jsonElement = new JsonParser().parse(parsedString);
+        JsonElement jsonElement = JsonParser.parseString(parsedString);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
             .getJsonObjectFromAnArray(jsonElement);
 
         final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
-        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
             .expectNextCount(0).verifyComplete();
 
         assertTrue("Error missing in log",
@@ -293,9 +294,9 @@ class JsonMessageParserTest {
         String parsedString = message.getParsed();
         String messageString = "[{\"event\":{}}," + parsedString + "]";
         JsonMessageParser jsonMessageParserUnderTest = new JsonMessageParser();
-        JsonElement jsonElement = new JsonParser().parse(messageString);
+        JsonElement jsonElement = JsonParser.parseString(messageString);
 
-        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
             .expectNext(expectedMessage).verifyComplete();
     }
 
@@ -317,12 +318,12 @@ class JsonMessageParserTest {
 
         String parsedString = message.getParsed();
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
-        JsonElement jsonElement = new JsonParser().parse(parsedString);
+        JsonElement jsonElement = JsonParser.parseString(parsedString);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
             .getJsonObjectFromAnArray(jsonElement);
 
         final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
-        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
             .expectComplete().verify();
 
         assertTrue("Error missing in log",
@@ -348,12 +349,12 @@ class JsonMessageParserTest {
 
         String parsedString = message.getParsed();
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
-        JsonElement jsonElement = new JsonParser().parse(parsedString);
+        JsonElement jsonElement = JsonParser.parseString(parsedString);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
             .getJsonObjectFromAnArray(jsonElement);
 
         final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
-        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
             .expectNextCount(0).verifyComplete();
 
         assertTrue("Error missing in log",
@@ -374,12 +375,12 @@ class JsonMessageParserTest {
 
         String parsedString = message.getParsed();
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
-        JsonElement jsonElement = new JsonParser().parse(parsedString);
+        JsonElement jsonElement = JsonParser.parseString(parsedString);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
             .getJsonObjectFromAnArray(jsonElement);
 
         final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
-        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
             .expectNextCount(0).verifyComplete();
 
         assertTrue("Error missing in log",
@@ -405,12 +406,12 @@ class JsonMessageParserTest {
 
         String parsedString = message.getParsed();
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
-        JsonElement jsonElement = new JsonParser().parse(parsedString);
+        JsonElement jsonElement = JsonParser.parseString(parsedString);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
             .getJsonObjectFromAnArray(jsonElement);
 
         final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
-        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
             .expectNextCount(0).verifyComplete();
 
         assertTrue("Error missing in log",
@@ -438,12 +439,12 @@ class JsonMessageParserTest {
 
         String parsedString = message.getParsed();
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
-        JsonElement jsonElement = new JsonParser().parse(parsedString);
+        JsonElement jsonElement = JsonParser.parseString(parsedString);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
             .getJsonObjectFromAnArray(jsonElement);
 
         final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
-        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
             .expectNextCount(0).verifyComplete();
 
         assertTrue("Error missing in log",
@@ -504,11 +505,11 @@ class JsonMessageParserTest {
 
         String parsedString = message.getParsed();
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
-        JsonElement jsonElement = new JsonParser().parse(parsedString);
+        JsonElement jsonElement = JsonParser.parseString(parsedString);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
             .getJsonObjectFromAnArray(jsonElement);
 
-        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
             .expectNext(expectedMessage).verifyComplete();
     }
 
@@ -520,12 +521,12 @@ class JsonMessageParserTest {
 
         String parsedString = message.getParsed();
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
-        JsonElement jsonElement = new JsonParser().parse(parsedString);
+        JsonElement jsonElement = JsonParser.parseString(parsedString);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
             .getJsonObjectFromAnArray(jsonElement);
 
         final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
-        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
             .expectComplete().verify();
 
         assertTrue("Error missing in log",
@@ -538,13 +539,13 @@ class JsonMessageParserTest {
     @Test
     void whenPassingJsonWithNullJsonElement_noFileData() {
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
-        JsonElement jsonElement = new JsonParser().parse("{}");
+        JsonElement jsonElement = JsonParser.parseString("{}");
 
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
             .getJsonObjectFromAnArray(jsonElement);
 
         final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
-        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
             .expectComplete().verify();
 
         assertTrue("Error missing in log",
@@ -569,12 +570,12 @@ class JsonMessageParserTest {
 
         String parsedString = message.getParsed();
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
-        JsonElement jsonElement = new JsonParser().parse(parsedString);
+        JsonElement jsonElement = JsonParser.parseString(parsedString);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
             .getJsonObjectFromAnArray(jsonElement);
 
         final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
-        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
             .expectNextCount(0).expectComplete().verify();
 
         assertTrue("Error missing in log",
@@ -601,11 +602,11 @@ class JsonMessageParserTest {
 
         String parsedString = message.getParsed();
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
-        JsonElement jsonElement = new JsonParser().parse(parsedString);
+        JsonElement jsonElement = JsonParser.parseString(parsedString);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
             .getJsonObjectFromAnArray(jsonElement);
 
-        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
             .expectComplete().verify();
     }
 }
index d4541ef..1ddb3a5 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
  * in compliance with the License. You may obtain a copy of the License at
@@ -50,10 +50,10 @@ import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
+import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.http.HttpAsyncClientBuilderWrapper;
 import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
 
 /**
  * Test for DmaapProducerHttpClient.
@@ -73,7 +73,7 @@ class DmaapProducerHttpClientTest {
 
     private DmaapProducerHttpClient producerClientUnderTestSpy;
 
-    private DmaapPublisherConfiguration dmaapPublisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
+    private PublisherConfiguration dmaapPublisherConfigurationMock = mock(PublisherConfiguration.class);
 
     private HttpAsyncClientBuilderWrapper clientBuilderMock;
 
@@ -83,11 +83,8 @@ class DmaapProducerHttpClientTest {
 
     @BeforeEach
     void setUp() throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
-        when(dmaapPublisherConfigurationMock.dmaapHostName()).thenReturn(HOST);
-        when(dmaapPublisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME);
-        when(dmaapPublisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT);
-        when(dmaapPublisherConfigurationMock.dmaapUserName()).thenReturn("dradmin");
-        when(dmaapPublisherConfigurationMock.dmaapUserPassword()).thenReturn("dradmin");
+        when(dmaapPublisherConfigurationMock.userName()).thenReturn("dradmin");
+        when(dmaapPublisherConfigurationMock.password()).thenReturn("dradmin");
 
         producerClientUnderTestSpy = spy(new DmaapProducerHttpClient(dmaapPublisherConfigurationMock));
 
index d4dd89f..f0c8e3b 100644 (file)
 
 package org.onap.dcaegen2.collectors.datafile.tasks;
 
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-import static org.onap.dcaegen2.collectors.datafile.configuration.AppConfigTest.CORRECT_CONSUMER_CONFIG;
-
 import com.google.gson.JsonElement;
 import com.google.gson.JsonParser;
-
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Optional;
-
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
@@ -58,13 +40,27 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
 import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
 import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
 import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
 import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
 public class DMaaPMessageConsumerTest {
     private static final String NR_RADIO_ERICSSON_EVENT_NAME = "Noti_NrRadio-Ericsson_FileReady";
     private static final String PRODUCT_NAME = "NrRadio";
@@ -90,8 +86,6 @@ public class DMaaPMessageConsumerTest {
     private static List<FilePublishInformation> listOfFilePublishInformation = new ArrayList<>();
     private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
 
-    private DMaaPConsumerReactiveHttpClient httpClientMock;
-
     private DMaaPMessageConsumer messageConsumer;
     private static String ftpesMessageString;
     private static JsonElement ftpesMessageJson;
@@ -105,6 +99,7 @@ public class DMaaPMessageConsumerTest {
 
     private static AppConfig appConfig;
     private static ConsumerConfiguration dmaapConsumerConfiguration;
+    private static MessageRouterSubscriber messageRouterSubscriber;
 
     /**
      * Sets up data for the test.
@@ -113,9 +108,6 @@ public class DMaaPMessageConsumerTest {
     public static void setUp() {
 
         appConfig = mock(AppConfig.class);
-        dmaapConsumerConfiguration = CORRECT_CONSUMER_CONFIG;
-
-        JsonParser jsonParser = new JsonParser();
 
         AdditionalField ftpesAdditionalField = new JsonMessage.AdditionalFieldBuilder() //
             .location(FTPES_LOCATION) //
@@ -133,7 +125,7 @@ public class DMaaPMessageConsumerTest {
             .build();
 
         ftpesMessageString = ftpesJsonMessage.toString();
-        ftpesMessageJson = jsonParser.parse(ftpesMessageString);
+        ftpesMessageJson = JsonParser.parseString(ftpesMessageString);
 
         MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
             .productName(PRODUCT_NAME) //
@@ -175,7 +167,7 @@ public class DMaaPMessageConsumerTest {
             .addAdditionalField(sftpAdditionalField) //
             .build();
         sftpMessageString = sftpJsonMessage.toString();
-        sftpMessageJson = jsonParser.parse(sftpMessageString);
+        sftpMessageJson = JsonParser.parseString(sftpMessageString);
         sftpFileData = ImmutableFileData.builder() //
             .name(PM_FILE_NAME) //
             .location(SFTP_LOCATION) //
@@ -220,46 +212,50 @@ public class DMaaPMessageConsumerTest {
             .expectError(DatafileTaskException.class) //
             .verify();
 
-        verify(httpClientMock, times(1)).getDMaaPConsumerResponse(Optional.empty());
+        verify(messageRouterSubscriber, times(1))
+                .getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest());
     }
 
     @Test
-    public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException {
+    public void whenFtpes_ReturnsCorrectResponse() {
         prepareMocksForDmaapConsumer(Optional.of(ftpesMessageJson), expectedFtpesMessage);
 
         StepVerifier.create(messageConsumer.getMessageRouterResponse()) //
             .expectNext(expectedFtpesMessage) //
             .verifyComplete();
 
-        verify(httpClientMock, times(1)).getDMaaPConsumerResponse(Optional.empty());
-        verifyNoMoreInteractions(httpClientMock);
+
+
+        verify(messageRouterSubscriber, times(1))
+                .getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest());
+        verifyNoMoreInteractions(messageRouterSubscriber);
     }
 
     @Test
-    public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException {
+    public void whenSftp_ReturnsCorrectResponse() {
         prepareMocksForDmaapConsumer(Optional.of(sftpMessageJson), expectedSftpMessage);
 
         StepVerifier.create(messageConsumer.getMessageRouterResponse()) //
             .expectNext(expectedSftpMessage) //
             .verifyComplete();
 
-        verify(httpClientMock, times(1)).getDMaaPConsumerResponse(Optional.empty());
-        verifyNoMoreInteractions(httpClientMock);
+        verify(messageRouterSubscriber, times(1))
+                .getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest());
+        verifyNoMoreInteractions(messageRouterSubscriber);
     }
 
     private void prepareMocksForDmaapConsumer(Optional<JsonElement> message,
         FileReadyMessage fileReadyMessageAfterConsume) {
-        Mono<JsonElement> messageAsMono = message.isPresent() ? Mono.just(message.get()) : Mono.empty();
+        Flux<JsonElement> messageAsMono = message.isPresent() ? Flux.just(message.get()) : Flux.empty();
+
+        messageRouterSubscriber = mock(MessageRouterSubscriber.class);
+        dmaapConsumerConfiguration = new ConsumerConfiguration(mock(MessageRouterSubscriberConfig.class),
+                messageRouterSubscriber, mock(MessageRouterSubscribeRequest.class));
+
         JsonMessageParser jsonMessageParserMock = mock(JsonMessageParser.class);
-        httpClientMock = mock(DMaaPConsumerReactiveHttpClient.class);
-        when(httpClientMock.getDMaaPConsumerResponse(Optional.empty())).thenReturn(messageAsMono);
+        when(messageRouterSubscriber.getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest()))
+                .thenReturn(messageAsMono);
         when(appConfig.getDmaapConsumerConfiguration()).thenReturn(dmaapConsumerConfiguration);
-        ConsumerReactiveHttpClientFactory httpClientFactory = mock(ConsumerReactiveHttpClientFactory.class);
-        try {
-            doReturn(httpClientMock).when(httpClientFactory).create(dmaapConsumerConfiguration.toDmaap());
-        } catch (DatafileTaskException e) {
-            e.printStackTrace();
-        }
 
         if (message.isPresent()) {
             when(jsonMessageParserMock.getMessagesFromJson(any())).thenReturn(Flux.just(fileReadyMessageAfterConsume));
@@ -268,7 +264,7 @@ public class DMaaPMessageConsumerTest {
                 .thenReturn(Flux.error(new DatafileTaskException("problemas")));
         }
 
-        messageConsumer = spy(new DMaaPMessageConsumer(appConfig, jsonMessageParserMock, httpClientFactory));
+        messageConsumer = spy(new DMaaPMessageConsumer(appConfig, jsonMessageParserMock));
     }
 
 }
index 1cb79bc..199ac9f 100644 (file)
@@ -52,7 +52,6 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration;
-import org.onap.dcaegen2.collectors.datafile.configuration.ImmutableConsumerConfiguration;
 import org.onap.dcaegen2.collectors.datafile.configuration.ImmutablePublisherConfiguration;
 import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
@@ -66,6 +65,9 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
 import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables;
 import org.slf4j.MDC;
 
@@ -110,7 +112,7 @@ public class ScheduledTasksTest {
             .publishUrl(publishUrl) //
             .logUrl("") //
             .userName("userName") //
-            .passWord("passWord") //
+            .password("passWord") //
             .trustStorePath("trustStorePath") //
             .trustStorePasswordPath("trustStorePasswordPath") //
             .keyStorePath("keyStorePath") //
@@ -118,13 +120,10 @@ public class ScheduledTasksTest {
             .enableDmaapCertAuth(true) //
             .changeIdentifier(CHANGE_IDENTIFIER) //
             .build(); //
-        final ConsumerConfiguration dmaapConsumerConfiguration = ImmutableConsumerConfiguration.builder() //
-            .topicUrl("topicUrl").trustStorePath("trustStorePath") //
-            .trustStorePasswordPath("trustStorePasswordPath") //
-            .keyStorePath("keyStorePath") //
-            .keyStorePasswordPath("keyStorePasswordPath") //
-            .enableDmaapCertAuth(true) //
-            .build();
+        final ConsumerConfiguration dmaapConsumerConfiguration =
+            new ConsumerConfiguration(mock(MessageRouterSubscriberConfig.class), mock(MessageRouterSubscriber.class),
+                mock(MessageRouterSubscribeRequest.class));
+
 
         doReturn(dmaapPublisherConfiguration).when(appConfig).getPublisherConfiguration(CHANGE_IDENTIFIER);
         doReturn(dmaapConsumerConfiguration).when(appConfig).getDmaapConsumerConfiguration();
@@ -266,7 +265,7 @@ public class ScheduledTasksTest {
             .publishUrl(publishUrl) //
             .logUrl("") //
             .userName("userName") //
-            .passWord("passWord") //
+            .password("passWord") //
             .trustStorePath("trustStorePath") //
             .trustStorePasswordPath("trustStorePasswordPath") //
             .keyStorePath("keyStorePath") //
@@ -274,13 +273,9 @@ public class ScheduledTasksTest {
             .enableDmaapCertAuth(true) //
             .changeIdentifier("Different changeIdentifier") //
             .build(); //
-        final ConsumerConfiguration dmaapConsumerConfiguration = ImmutableConsumerConfiguration.builder() //
-            .topicUrl("topicUrl").trustStorePath("trustStorePath") //
-            .trustStorePasswordPath("trustStorePasswordPath") //
-            .keyStorePath("keyStorePath") //
-            .keyStorePasswordPath("keyStorePasswordPath") //
-            .enableDmaapCertAuth(true) //
-            .build();
+        final ConsumerConfiguration dmaapConsumerConfiguration =
+            new ConsumerConfiguration(mock(MessageRouterSubscriberConfig.class), mock(MessageRouterSubscriber.class),
+                mock(MessageRouterSubscribeRequest.class));
 
         doReturn(dmaapPublisherConfiguration).when(appConfig).getPublisherConfiguration(CHANGE_IDENTIFIER);
         doReturn(dmaapConsumerConfiguration).when(appConfig).getDmaapConsumerConfiguration();
diff --git a/datafile-app-server/src/test/resources/cert.jks b/datafile-app-server/src/test/resources/cert.jks
new file mode 100755 (executable)
index 0000000..ff0e95c
Binary files /dev/null and b/datafile-app-server/src/test/resources/cert.jks differ
index 62119e6..8e51b80 100644 (file)
@@ -5,11 +5,14 @@
     "dmaap.ftpesConfig.keyPasswordPath": "/src/test/resources/dfc.jks.pass",
     "dmaap.ftpesConfig.trustedCa": "/src/test/resources/ftp.jks",
     "dmaap.ftpesConfig.trustedCaPasswordPath": "/src/test/resources/ftp.jks.pass",
-    "dmaap.security.trustStorePath": "trustStorePath",
-    "dmaap.security.trustStorePasswordPath": "trustStorePasswordPath",
-    "dmaap.security.keyStorePath": "keyStorePath",
-    "dmaap.security.keyStorePasswordPath": "keyStorePasswordPath",
+    "dmaap.security.trustStorePath": "src/test/resources/trust.jks",
+    "dmaap.security.trustStorePasswordPath": "src/test/resources/trust.pass",
+    "dmaap.security.keyStorePath": "src/test/resources/cert.jks",
+    "dmaap.security.keyStorePasswordPath": "src/test/resources/jks.pass",
     "dmaap.security.enableDmaapCertAuth": "true",
+    "dmaap.dmaapConsumerConfiguration.consumerGroup": "OpenDcae-c12",
+    "dmaap.dmaapConsumerConfiguration.consumerId": "C12",
+    "dmaap.dmaapConsumerConfiguration.timeoutMs": 1000,
     "sftp.security.strictHostKeyChecking": "false",
     "streams_publishes": {
       "PM_MEAS_FILES": {
@@ -27,7 +30,7 @@
     "streams_subscribes": {
       "dmaap_subscriber": {
         "dmaap_info": {
-          "topic_url": "http://dradmin:dradmin@localhost:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12"
+          "topic_url": "http://localhost:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT"
         },
         "type": "message_router"
       }
index 480a6f7..e132746 100644 (file)
@@ -5,12 +5,15 @@
     "dmaap.ftpesConfig.keyPasswordPath": "/src/test/resources/dfc.jks.pass",
     "dmaap.ftpesConfig.trustedCa": "/src/test/resources/ftp.jks",
     "dmaap.ftpesConfig.trustedCaPasswordPath": "/src/test/resources/ftp.jks.pass",
-    "dmaap.security.trustStorePath": "trustStorePath",
-    "dmaap.security.trustStorePasswordPath": "trustStorePasswordPath",
-    "dmaap.security.keyStorePath": "keyStorePath",
-    "dmaap.security.keyStorePasswordPath": "keyStorePasswordPath",
+    "dmaap.security.trustStorePath": "src/test/resources/trust.jks",
+    "dmaap.security.trustStorePasswordPath": "src/test/resources/trust.pass",
+    "dmaap.security.keyStorePath": "src/test/resources/cert.jks",
+    "dmaap.security.keyStorePasswordPath": "src/test/resources/jks.pass",
     "dmaap.security.enableDmaapCertAuth": "true",
     "sftp.security.strictHostKeyChecking": "false",
+    "dmaap.dmaapConsumerConfiguration.consumerGroup": "OpenDcae-c12",
+    "dmaap.dmaapConsumerConfiguration.consumerId": "C12",
+    "dmaap.dmaapConsumerConfiguration.timeoutMs": 1000,
     "streams_publishes": {
       "PM_MEAS_FILES": {
         "type": "data_router",
@@ -49,7 +52,7 @@
     "streams_subscribes": {
       "dmaap_subscriber": {
         "dmaap_info": {
-          "topic_url": "http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12"
+          "topic_url": "http://message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12"
         },
         "type": "message_router"
       }
diff --git a/datafile-app-server/src/test/resources/jks.pass b/datafile-app-server/src/test/resources/jks.pass
new file mode 100755 (executable)
index 0000000..b2c3df4
--- /dev/null
@@ -0,0 +1 @@
+hD:!w:CxF]lGvM6Mz9l^j[7U
\ No newline at end of file
diff --git a/datafile-app-server/src/test/resources/trust.jks b/datafile-app-server/src/test/resources/trust.jks
new file mode 100755 (executable)
index 0000000..fc62ad2
Binary files /dev/null and b/datafile-app-server/src/test/resources/trust.jks differ
diff --git a/datafile-app-server/src/test/resources/trust.pass b/datafile-app-server/src/test/resources/trust.pass
new file mode 100755 (executable)
index 0000000..047a411
--- /dev/null
@@ -0,0 +1 @@
+jeQ2l]iyB62D{WbSHL]dN*8R
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index a807a42..8febb28 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -30,7 +30,7 @@
 
   <groupId>org.onap.dcaegen2.collectors</groupId>
   <artifactId>datafile</artifactId>
-  <version>1.4.2-SNAPSHOT</version>
+  <version>1.4.3-SNAPSHOT</version>
 
   <name>dcaegen2-collectors.datafile</name>
   <description>datafile collector</description>
@@ -45,7 +45,7 @@
 
   <properties>
     <java.version>11</java.version>
-    <sdk.version>1.1.6</sdk.version>
+    <sdk.version>1.4.2</sdk.version>
     <apache.httpcomponents.version>4.1.4</apache.httpcomponents.version>
     <apache.commons.version>3.6</apache.commons.version>
     <immutable.version>2.7.1</immutable.version>
         <artifactId>dmaap-client</artifactId>
         <version>${sdk.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.onap.dcaegen2.services.sdk.security</groupId>
+        <artifactId>ssl</artifactId>
+        <version>${sdk.version}</version>
+      </dependency>
       <dependency>
         <groupId>org.apache.httpcomponents</groupId>
         <artifactId>httpasyncclient</artifactId>
index b0c0f62..69a4fe1 100644 (file)
@@ -1,6 +1,6 @@
 major=1\r
 minor=4\r
-patch=2\r
+patch=3\r
 base_version=${major}.${minor}.${patch}\r
 release_version=${base_version}\r
 snapshot_version=${base_version}-SNAPSHOT\r