Add test for Kafka message consumption via distribution-client in model-loader 95/139795/1 oslo
authorFiete Ostkamp <Fiete.Ostkamp@telekom.de>
Sat, 28 Dec 2024 09:11:42 +0000 (10:11 +0100)
committerFiete Ostkamp <Fiete.Ostkamp@telekom.de>
Sat, 28 Dec 2024 09:11:42 +0000 (10:11 +0100)
- add test that ensures message consumption of Kafka messages

Issue-ID: AAI-4090
Change-Id: Ic38994f5b944a357d4bdd8d9061803098b7505ce
Signed-off-by: Fiete Ostkamp <Fiete.Ostkamp@telekom.de>
src/main/java/org/onap/aai/modelloader/config/DistributionClientStartupConfig.java
src/test/java/org/onap/aai/modelloader/DistributionClientTestConfiguration.java
src/test/java/org/onap/aai/modelloader/notification/DistributionClientIntegrationTest.java [new file with mode: 0644]
src/test/resources/__files/kafkaBootstrap.json
src/test/resources/messages/distribution.json [new file with mode: 0644]
src/test/resources/model-loader.properties

index 937c62d..fa8d11b 100644 (file)
@@ -38,7 +38,10 @@ import org.springframework.boot.context.event.ApplicationReadyEvent;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.event.EventListener;
 
+import lombok.RequiredArgsConstructor;
+
 @Configuration
+@RequiredArgsConstructor
 @ConditionalOnProperty(value = "ml.distribution.connection.enabled", havingValue = "true", matchIfMissing = true)
 public class DistributionClientStartupConfig {
 
@@ -48,13 +51,6 @@ public class DistributionClientStartupConfig {
     private final ModelLoaderConfig config;
     private final EventCallback eventCallback;
 
-    public DistributionClientStartupConfig(IDistributionClient distributionClient, ModelLoaderConfig config,
-            EventCallback eventCallback) {
-        this.distributionClient = distributionClient;
-        this.config = config;
-        this.eventCallback = eventCallback;
-    }
-
     @EventListener(ApplicationReadyEvent.class)
     protected void initSdcClient() {
         // Initialize distribution client
index 1eed62f..b716c1a 100644 (file)
@@ -30,14 +30,21 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.context.event.ApplicationStartedEvent;
 import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Primary;
 import org.springframework.context.event.EventListener;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
 
 @TestConfiguration
 public class DistributionClientTestConfiguration {
@@ -48,9 +55,12 @@ public class DistributionClientTestConfiguration {
   @Value("${wiremock.server.port}")
   private int wiremockPort;
 
+  @Value("${spring.embedded.kafka.brokers}")
+  private String kafkaBootstrapAddress;
+
   @Primary
   @Bean(name = "testProperties")
-  public Properties configProperties() throws IOException {
+  Properties configProperties() throws IOException {
     // Load model loader system configuration
     InputStream configInputStream = Files.newInputStream(Paths.get(configDir, "model-loader.properties"));
     Properties configProperties = new Properties();
@@ -61,6 +71,21 @@ public class DistributionClientTestConfiguration {
     return configProperties;
   }
 
+  @Bean
+  ProducerFactory<String, String> distributionClientProducerFactory() {
+      Map<String, Object> configProps = new HashMap<>();
+      configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapAddress);
+      configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+      configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+      return new DefaultKafkaProducerFactory<>(configProps);
+  }
+
+  @Bean
+  KafkaTemplate<String, String> distributionClientKafkaTemplate() {
+      return new KafkaTemplate<>(distributionClientProducerFactory());
+  }
+
   private void setOverrides(Properties configProperties) {
     configProperties.setProperty("ml.distribution.ASDC_ADDRESS", "localhost:" + wiremockPort);
     configProperties.setProperty("ml.babel.BASE_URL", "http://localhost:" + wiremockPort);
@@ -76,6 +101,10 @@ public class DistributionClientTestConfiguration {
     stubFor(get(urlEqualTo("/sdc/v1/distributionKafkaData"))
         .withHeader("X-ECOMP-RequestID", matching(".+"))
         .withHeader("X-ECOMP-InstanceID", equalTo("aai-ml-id-test"))
-        .willReturn(aResponse().withHeader("Content-Type", "application/json").withBodyFile("kafkaBootstrap.json")));
+        .willReturn(aResponse()
+          .withHeader("Content-Type", "application/json")
+          .withTransformers("response-template")
+          .withTransformerParameter("kafkaBootstrapAddress", kafkaBootstrapAddress)
+          .withBodyFile("kafkaBootstrap.json")));
   }
-}
\ No newline at end of file
+}
diff --git a/src/test/java/org/onap/aai/modelloader/notification/DistributionClientIntegrationTest.java b/src/test/java/org/onap/aai/modelloader/notification/DistributionClientIntegrationTest.java
new file mode 100644 (file)
index 0000000..916f740
--- /dev/null
@@ -0,0 +1,88 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2024 Deutsche Telekom AG Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.aai.modelloader.notification;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.verify;
+
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.onap.aai.modelloader.DistributionClientTestConfiguration;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.cloud.contract.wiremock.AutoConfigureWireMock;
+import org.springframework.context.annotation.Import;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.test.annotation.DirtiesContext;
+
+import lombok.SneakyThrows;
+
+@DirtiesContext
+@AutoConfigureWireMock(port = 0)
+@Import(DistributionClientTestConfiguration.class)
+@SpringBootTest(properties = { "ml.distribution.connection.enabled=true" })
+@EmbeddedKafka(partitions = 1, ports = 9092, topics = {"${topics.distribution.notification}"})
+public class DistributionClientIntegrationTest {
+
+  @Value("${topics.distribution.notification}")
+  String topic;
+
+  @Autowired
+  KafkaTemplate<String,String> kafkaTemplate;
+
+  @MockBean
+  EventCallback eventCallback;
+
+  private CountDownLatch latch;
+
+  @BeforeEach
+  public void setup() {
+    latch = new CountDownLatch(1);
+
+    doAnswer(invocation -> {
+      latch.countDown();
+      return null;
+    }).when(eventCallback).activateCallback(any());
+  }
+
+  @Test
+  @SneakyThrows
+  void thatCallbackIsCalled() {
+    Thread.sleep(10000);
+    String distributionJson = "src/test/resources/messages/distribution.json";
+    String message = Files.readString(Paths.get(distributionJson));
+
+    kafkaTemplate.send(topic, message);
+
+    latch.await(10, TimeUnit.SECONDS);
+    verify(eventCallback).activateCallback(any());
+  }
+
+}
index 1820ade..3b97fe1 100644 (file)
@@ -1,5 +1,5 @@
 {
-  "kafkaBootStrapServer": "localhost:9092",
+  "kafkaBootStrapServer": "{{parameters.kafkaBootstrapAddress}}",
   "distrNotificationTopicName": "SDC-DISTR-NOTIF-TOPIC-AUTO",
   "distrStatusTopicName": "SDC-DISTR-STATUS-TOPIC-AUTO"
-}
\ No newline at end of file
+}
diff --git a/src/test/resources/messages/distribution.json b/src/test/resources/messages/distribution.json
new file mode 100644 (file)
index 0000000..6fb88eb
--- /dev/null
@@ -0,0 +1,106 @@
+{
+       "distributionID": "ac6f29fc-d6b3-428b-8ee2-b2e3ea636fe9",
+       "serviceName": "pnf_macro",
+       "serviceVersion": "1.0",
+       "serviceUUID": "1b8b0466-f7f2-4616-8198-055228f8d1eb",
+       "serviceDescription": "ONAP SDK Service",
+       "serviceInvariantUUID": "8939eac6-07f9-4395-b7ab-fe5c1a3e7cfe",
+       "resources": [
+               {
+                       "resourceInstanceName": "pnf_macro 0",
+                       "resourceName": "pnf_macro",
+                       "resourceVersion": "1.0",
+                       "resoucreType": "PNF",
+                       "resourceUUID": "ef5c823c-c048-4d0c-94d3-4569ee66947c",
+                       "resourceInvariantUUID": "27dd7ddf-babc-402e-b04a-bb76a0f58a4a",
+                       "resourceCustomizationUUID": "33e9569f-0e58-4bb4-9825-43925ddeb213",
+                       "category": "Generic",
+                       "subcategory": "Network Service",
+                       "artifacts": [
+                               {
+                                       "artifactName": "vf-license-model.xml",
+                                       "artifactType": "VF_LICENSE",
+                                       "artifactURL": "/sdc/v1/catalog/services/PnfMacro/1.0/resourceInstances/pnf_macro0/artifacts/vf-license-model.xml",
+                                       "artifactChecksum": "ZmJlMzZkN2NkZWRjNDUyNDUwZTY2YjE0NmNkMGFiOWI=",
+                                       "artifactDescription": "VF license file",
+                                       "artifactTimeout": 0,
+                                       "artifactUUID": "f8c6e818-aa35-43d2-bc23-fb2c498fb675",
+                                       "artifactVersion": "1"
+                               },
+                               {
+                                       "artifactName": "pNF.csar",
+                                       "artifactType": "ETSI_PACKAGE",
+                                       "artifactURL": "/sdc/v1/catalog/services/PnfMacro/1.0/resourceInstances/pnf_macro0/artifacts/pNF.csar",
+                                       "artifactChecksum": "ZGVhZmM4YTM3NDgwYjQxNDFmZjYzYzQ5N2E4NDFkNDE=",
+                                       "artifactDescription": "Artifact created from csar",
+                                       "artifactTimeout": 0,
+                                       "artifactUUID": "27a07164-8d63-4d7e-a8b3-8422e938b37c",
+                                       "artifactVersion": "1"
+                               },
+                               {
+                                       "artifactName": "mynetconf.yang",
+                                       "artifactType": "YANG_MODULE",
+                                       "artifactURL": "/sdc/v1/catalog/services/PnfMacro/1.0/resourceInstances/pnf_macro0/artifacts/mynetconf.yang",
+                                       "artifactChecksum": "N2Q1MDkxNjUwYzU1MWI2OTUzNmY4YTc3YmQ2YTlkZTc=",
+                                       "artifactDescription": "Artifact created from csar",
+                                       "artifactTimeout": 0,
+                                       "artifactUUID": "b5ce5c4e-4501-45e6-826b-6cbfbe8dbfee",
+                                       "artifactVersion": "1"
+                               },
+                               {
+                                       "artifactName": "vendor-license-model.xml",
+                                       "artifactType": "VENDOR_LICENSE",
+                                       "artifactURL": "/sdc/v1/catalog/services/PnfMacro/1.0/resourceInstances/pnf_macro0/artifacts/vendor-license-model.xml",
+                                       "artifactChecksum": "MTZiYmJhZmJiOTBjZmVkYzNjM2UwNjFkZmZmYzNkNjc=",
+                                       "artifactDescription": " Vendor license file",
+                                       "artifactTimeout": 0,
+                                       "artifactUUID": "8c68e938-4988-4107-834a-eab4c4e65392",
+                                       "artifactVersion": "1"
+                               },
+                               {
+                                       "artifactName": "PM_Dictionary.yaml",
+                                       "artifactType": "PM_DICTIONARY",
+                                       "artifactURL": "/sdc/v1/catalog/services/PnfMacro/1.0/resourceInstances/pnf_macro0/artifacts/PM_Dictionary.yaml",
+                                       "artifactChecksum": "ZDJlYTVlNjA2ZGRhOTc3MDBkODVjNmEyMjA2OTA5YTE=",
+                                       "artifactDescription": "Artifact created from csar",
+                                       "artifactTimeout": 0,
+                                       "artifactUUID": "e014ef41-35a6-4222-9c52-907c3a42a0b3",
+                                       "artifactVersion": "1"
+                               },
+                               {
+                                       "artifactName": "MyPnf_Pnf_v1.yaml",
+                                       "artifactType": "VES_EVENTS",
+                                       "artifactURL": "/sdc/v1/catalog/services/PnfMacro/1.0/resourceInstances/pnf_macro0/artifacts/MyPnf_Pnf_v1.yaml",
+                                       "artifactChecksum": "ZTllOGUwNjdiYWEyMWJhZDc4NjNlOGNmNGJkZWVlMWY=",
+                                       "artifactDescription": "Artifact created from csar",
+                                       "artifactTimeout": 0,
+                                       "artifactUUID": "e01c8cab-cb60-493b-895f-b1998bbaed84",
+                                       "artifactVersion": "1"
+                               }
+                       ]
+               }
+       ],
+       "serviceArtifacts": [
+               {
+                       "artifactName": "service-PnfMacro-template.yml",
+                       "artifactType": "TOSCA_TEMPLATE",
+                       "artifactURL": "/sdc/v1/catalog/services/PnfMacro/1.0/artifacts/service-PnfMacro-template.yml",
+                       "artifactChecksum": "YmEzMTIzYTU5NDQ4MGIzNThmM2Q3N2Q5OTNiZjhlYjA=",
+                       "artifactDescription": "TOSCA representation of the asset",
+                       "artifactTimeout": 0,
+                       "artifactUUID": "9c5c224c-7c6e-41c6-abd9-0bc5169e566c",
+                       "artifactVersion": "1"
+               },
+               {
+                       "artifactName": "service-PnfMacro-csar.csar",
+                       "artifactType": "TOSCA_CSAR",
+                       "artifactURL": "/sdc/v1/catalog/services/PnfMacro/1.0/artifacts/service-PnfMacro-csar.csar",
+                       "artifactChecksum": "MTQ5YmNkMjVkOWI2NmJhYzg5NTNiNzQ3YTA2NGIzYTA=",
+                       "artifactDescription": "TOSCA definition package of the asset",
+                       "artifactTimeout": 0,
+                       "artifactUUID": "61d45180-7c5f-4ae0-9044-6b005de76b3f",
+                       "artifactVersion": "1"
+               }
+       ],
+       "workloadContext": "Production"
+}
index 2db7c09..7fa2cd8 100644 (file)
@@ -8,8 +8,8 @@ ml.distribution.ENVIRONMENT_NAME=AUTO
 ml.distribution.KEYSTORE_PASSWORD=
 ml.distribution.KEYSTORE_FILE=
 ml.distribution.PASSWORD=Aa123456
-ml.distribution.POLLING_INTERVAL=15
-ml.distribution.POLLING_TIMEOUT=3
+ml.distribution.POLLING_INTERVAL=2
+ml.distribution.POLLING_TIMEOUT=1
 ml.distribution.USER=ci
 ml.distribution.ARTIFACT_TYPES=MODEL_QUERY_SPEC,TOSCA_CSAR
 ml.distribution.SASL_JAAS_CONFIG=org.apache.kafka.common.security.scram.ScramLoginModule required username="aai-modelloader-ku" password="somePassword";