PRH Code Additions for Early PNF registrations 07/133607/7 1.9.0
authorsangeeta.bellara <sangeeta.bellara@t-systems.com>
Thu, 9 Mar 2023 16:43:03 +0000 (22:13 +0530)
committerSangeeta Bellara <sangeeta.bellara@t-systems.com>
Tue, 28 Mar 2023 06:26:20 +0000 (06:26 +0000)
Issue-ID: DCAEGEN2-3312
Change-Id: Id9b1ca83390af3675e26fc61ccc8d12611ab8ddf
Signed-off-by: sangeeta.bellara <sangeeta.bellara@t-systems.com>
Signed-off-by: sangeeta.bellara <sangeeta.bellara@t-systems.com>
Change-Id: I9bc25bc1343c40aca5644de3fd30f7c2142c1a47
Signed-off-by: sangeeta.bellara <sangeeta.bellara@t-systems.com>
30 files changed:
Changelog.md
pom.xml
prh-app-server/pom.xml
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfig.java [new file with mode: 0644]
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTask.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTaskImpl.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/EpochDateTimeConversion.java [new file with mode: 0644]
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTask.java [new file with mode: 0644]
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImpl.java [new file with mode: 0644]
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommit.java [new file with mode: 0644]
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommit.java [new file with mode: 0644]
prh-app-server/src/main/resources/application.yaml
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfigTest.java [new file with mode: 0644]
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/controllers/ScheduleControllerTest.java
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationTest.java
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTaskImplTest.java
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/EpochDateTimeConversionTest.java [new file with mode: 0644]
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java [new file with mode: 0644]
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommitTest.java [new file with mode: 0644]
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommitTest.java [new file with mode: 0644]
prh-app-server/src/test/resources/application.yaml
prh-commons/pom.xml
prh-commons/src/main/java/org/onap/dcaegen2/services/prh/adapter/aai/api/AaiPnfResultModel.java
version.properties

index 3289729..10c10f5 100644 (file)
@@ -4,6 +4,33 @@ All notable changes to this project will be documented in this file.
 The format is based on [Keep a Changelog](http://keepachangelog.com/)
 and this project adheres to [Semantic Versioning](http://semver.org/).
 
+## [1.9.0] - 2023/01/13
+### Changed
+- [DCAEGEN2-3312] Code additions to handle (optionally) early PNF registrations. This is enabled if the deployment Helm contains an environment variable: name: SPRING_PROFILES_ACTIVE
+          value: autoCommitDisabled. 
+  If this is set with the value of "autoCommitDisabled", then other required environment variables are:
+       - name: kafkaBoostrapServerConfig
+          value: onap-strimzi-kafka-bootstrap:9092
+        - name: groupIdConfig
+          value: OpenDCAE-c12
+        - name: kafkaUsername
+          value: strimzi-kafka-admin
+        - name: kafkaPassword
+          valueFrom:
+            secretKeyRef:
+              key: password
+              name: strimzi-kafka-admin
+        - name: kafkaTopic
+          value: unauthenticated.VES_PNFREG_OUTPUT
+        - name: SPRING_PROFILES_ACTIVE
+          value: autoCommitDisabled
+        - name: JAAS_CONFIG
+          valueFrom:
+            secretKeyRef:
+              key: sasl.jaas.config
+              name: strimzi-kafka-admin
+  [DCAEGEN2-3357] Updated dependencies for vulnerability check
+
 ## [1.8.1] - 2022/08/11
 ### Changed
 - [DCAEGEN2-3219] dcaegen2-services-prh vulnerability update
diff --git a/pom.xml b/pom.xml
index cfb60f4..a8928ae 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -5,6 +5,7 @@
   ~ ================================================================================
   ~ Copyright (C) 2018-2022 NOKIA Intellectual Property. All rights reserved.
   ~ Copyright (C) 2021 Samsung Electronics. All rights reserved.
+  ~ Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
   ~ ================================================================================
   ~ Licensed under the Apache License, Version 2.0 (the "License");
   ~ you may not use this file except in compliance with the License.
@@ -33,7 +34,7 @@
 
   <groupId>org.onap.dcaegen2.services</groupId>
   <artifactId>prh</artifactId>
-  <version>1.8.1-SNAPSHOT</version>
+  <version>1.9.0-SNAPSHOT</version>
 
   <name>dcaegen2-services-prh</name>
   <description>PNF Registration Handler</description>
     </license>
   </licenses>
 
+  <profiles>
+    <profile>
+      <id>docker</id>
+    </profile>
+  </profiles>
+
   <properties>
     <java.version>11</java.version>
     <spring-boot.version>2.7.2</spring-boot.version>
       </plugins>
     </pluginManagement>
   </build>
-
   <dependencyManagement>
     <dependencies>
       <dependency>
         <artifactId>guava</artifactId>
         <version>${guava.version}</version>
       </dependency>
+
+      <dependency>
+        <groupId>org.springframework.kafka</groupId>
+        <artifactId>spring-kafka</artifactId>
+      </dependency>
+      <dependency>
+        <groupId>org.springframework.kafka</groupId>
+        <artifactId>spring-kafka-test</artifactId>
+        <scope>test</scope>
+      </dependency>
+      <dependency>
+        <groupId>org.springframework</groupId>
+        <artifactId>spring-web</artifactId>
+        <version>5.3.25</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.commons</groupId>
+        <artifactId>commons-text</artifactId>
+        <version>1.10.0</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.tomcat.embed</groupId>
+        <artifactId>tomcat-embed-core</artifactId>
+        <version>9.0.72</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 </project>
index 62e19dc..4de28bc 100644 (file)
@@ -5,6 +5,7 @@
   ~ ================================================================================
   ~ Copyright (C) 2018-2022 NOKIA Intellectual Property. All rights reserved.
   ~ Copyright (C) 2021 Samsung Electronics. All rights reserved.
+  ~ Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
   ~ ================================================================================
   ~ Licensed under the Apache License, Version 2.0 (the "License");
   ~ you may not use this file except in compliance with the License.
@@ -27,7 +28,7 @@
   <parent>
     <groupId>org.onap.dcaegen2.services</groupId>
     <artifactId>prh</artifactId>
-    <version>1.8.1-SNAPSHOT</version>
+    <version>1.9.0-SNAPSHOT</version>
   </parent>
 
   <groupId>org.onap.dcaegen2.services.prh</groupId>
       <artifactId>jersey-apache-connector</artifactId>
       <scope>test</scope>
     </dependency>
+      <dependency>
+          <groupId>org.springframework.kafka</groupId>
+          <artifactId>spring-kafka</artifactId>
+          <version>2.8.11</version>
+      </dependency>
   </dependencies>
 </project>
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfig.java
new file mode 100644 (file)
index 0000000..8affe28
--- /dev/null
@@ -0,0 +1,96 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.configuration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Profile;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.listener.ContainerProperties;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author <a href="mailto:pravin.kokane@t-systems.com">Pravin Kokane</a> on 3/13/23
+ */
+
+@Profile("autoCommitDisabled")
+@EnableKafka
+@Configuration
+public class KafkaConfig
+{
+    String kafkaBoostrapServerConfig = System.getenv("kafkaBoostrapServerConfig");
+
+    String groupIdConfig = System.getenv("groupIdConfig");
+
+
+    String kafkaSecurityProtocol = System.getenv("kafkaSecurityProtocol");
+
+    String kafkaSaslMechanism = System.getenv("kafkaSaslMechanism");
+
+    String kafkaUsername = System.getenv("kafkaUsername");
+
+    String kafkaPassword = System.getenv("kafkaPassword");
+
+    String kafkaJaasConfig = System.getenv("JAAS_CONFIG");
+
+    String kafkaLoginModuleClassConfig = System.getenv("Login_Module_Class");
+
+    @Bean
+    public ConsumerFactory<String, String> consumerFactory()
+    {
+        Map<String,Object> config = new HashMap<>();
+        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,kafkaBoostrapServerConfig);
+        config.put(ConsumerConfig.GROUP_ID_CONFIG,groupIdConfig);
+        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
+        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+        if(kafkaJaasConfig == null) {
+            kafkaJaasConfig = kafkaLoginModuleClassConfig + " required username=\""
+                + kafkaUsername + "\" password=\"" + kafkaPassword + "\";";
+        }
+        if(kafkaSecurityProtocol==null ) kafkaSecurityProtocol="SASL_PLAINTEXT";
+        config.put("security.protocol", kafkaSecurityProtocol);
+        if(kafkaSaslMechanism==null ) kafkaSaslMechanism="SCRAM-SHA-512";
+        config.put("sasl.mechanism", kafkaSaslMechanism);
+
+        config.put("sasl.jaas.config", kafkaJaasConfig);
+
+        return new DefaultKafkaConsumerFactory<>(config);
+    }
+
+    @Bean
+    public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory()
+    {
+        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
+        factory.setConsumerFactory(consumerFactory());
+        factory.setBatchListener(true);
+        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
+        return factory;
+    }
+}
index a0aa17e..0b1f0e1 100644 (file)
@@ -3,6 +3,7 @@
  * PNF-REGISTRATION-HANDLER
  * ================================================================================
  * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -23,6 +24,7 @@ package org.onap.dcaegen2.services.prh.controllers;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import org.onap.dcaegen2.services.prh.tasks.ScheduledTasksRunner;
+import org.onap.dcaegen2.services.prh.tasks.commit.ScheduledTasksRunnerWithCommit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -40,30 +42,34 @@ import reactor.core.publisher.Mono;
 @Api(value = "ScheduleController", description = "Schedule Controller")
 public class ScheduleController {
 
+    
     private static final Logger LOGGER = LoggerFactory.getLogger(ScheduleController.class);
 
-    private final ScheduledTasksRunner scheduledTasksRunner;
+    private ScheduledTasksRunner scheduledTasksRunner;
 
-    @Autowired
+
+    @Autowired(required = false)
     public ScheduleController(ScheduledTasksRunner scheduledTasksRunner) {
         this.scheduledTasksRunner = scheduledTasksRunner;
     }
 
+
+
     @RequestMapping(value = "start", method = RequestMethod.GET)
     @ApiOperation(value = "Start scheduling worker request")
     public Mono<ResponseEntity<String>> startTasks() {
-        LOGGER.trace("Receiving start scheduling worker request");
-        return Mono.fromSupplier(scheduledTasksRunner::tryToStartTask).map(this::createStartTaskResponse);
+            return Mono.fromSupplier(scheduledTasksRunner::tryToStartTask).map(this::createStartTaskResponse);
     }
 
+
     @RequestMapping(value = "stopPrh", method = RequestMethod.GET)
     @ApiOperation(value = "Receiving stop scheduling worker request")
     public Mono<ResponseEntity<String>> stopTask() {
         LOGGER.trace("Receiving stop scheduling worker request");
         return Mono.defer(() -> {
-                    scheduledTasksRunner.cancelTasks();
-                    return Mono.just(new ResponseEntity<>("PRH Service has been stopped!", HttpStatus.OK));
-                }
+                scheduledTasksRunner.cancelTasks();
+                return Mono.just(new ResponseEntity<>("PRH Service has been stopped!", HttpStatus.OK));
+            }
         );
     }
 
index f98e952..25c380f 100644 (file)
@@ -3,6 +3,7 @@
  * PNF-REGISTRATION-HANDLER
  * ================================================================================
  * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-
 package org.onap.dcaegen2.services.prh.service;
 
-import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.ADDITIONAL_FIELDS;
-import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_EVENT_HEADER;
-import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_FORMAT_FOR_JSON_OBJECT;
-import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_FORMAT_FOR_STRING;
-import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.CORRELATION_ID;
-import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_MODEL;
-import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_TYPE;
-import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_VENDOR;
-import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EVENT;
-import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.NF_ROLE;
-import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.OAM_IPV_4_ADDRESS;
-import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.OAM_IPV_6_ADDRESS;
-import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.PNF_REGISTRATION_FIELDS;
-import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SERIAL_NUMBER;
-import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SOURCE_NAME;
-import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SW_VERSION;
-
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import io.vavr.collection.List;
-import java.util.Optional;
-import java.util.stream.StreamSupport;
 import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.adapter.aai.api.ImmutableConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.boot.configurationprocessor.json.JSONArray;
+import org.springframework.boot.configurationprocessor.json.JSONException;
+import org.springframework.boot.configurationprocessor.json.JSONObject;
 import org.springframework.stereotype.Component;
 import org.springframework.util.StringUtils;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.util.ArrayList;
+import java.util.Optional;
+import java.util.stream.StreamSupport;
+
+import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SOURCE_NAME;
+import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.OAM_IPV_4_ADDRESS;
+import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.OAM_IPV_6_ADDRESS;
+import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SERIAL_NUMBER;
+import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_VENDOR;
+import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_MODEL;
+import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_TYPE;
+import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.NF_ROLE;
+import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SW_VERSION;
+import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.ADDITIONAL_FIELDS;
+import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_FORMAT_FOR_STRING;
+import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EVENT;
+import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_EVENT_HEADER;
+import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.PNF_REGISTRATION_FIELDS;
+import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.CORRELATION_ID;
+import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_FORMAT_FOR_JSON_OBJECT;
+
+
+
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">PrzemysÅ‚aw WÄ…sala</a> on 5/8/18
@@ -74,6 +80,8 @@ public class DmaapConsumerJsonParser {
     private String pnfSwVersionOptionalField;
     private JsonObject pnfAdditionalFields;
 
+    private String sourceName;
+
     /**
      * Extract info from string and create @see {@link ConsumerDmaapModel}.
      *
@@ -84,6 +92,11 @@ public class DmaapConsumerJsonParser {
         return monoMessage.flatMapMany(msgRouterResponse -> getConsumerDmaapModelFromJsonArray(msgRouterResponse.items()));
     }
 
+    public JSONObject getJsonObjectKafka(String jsonStr) throws JSONException {
+        return new JSONObject(jsonStr);
+    }
+
+
     private Flux<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(List<JsonElement> items) {
         LOGGER.debug("DmaapConsumerJsonParser input for parsing: {}", items);
 
@@ -97,25 +110,59 @@ public class DmaapConsumerJsonParser {
                                 .orElseGet(JsonObject::new)))));
     }
 
+    /**
+     * Extract info from string and create @see {@link ConsumerDmaapModel}.
+     *
+     * @param monoMessage - results from Kafka
+     * @return reactive DMaaPModel
+     *
+     */
+    /**
+     * @author <a href="mailto:shilpa.urade@t-systems.com">Shilpa Urade</a> on 13/3/23
+     */
+
+    public Flux<ConsumerDmaapModel> getConsumerDmaapModelFromKafkaConsumerRecord(java.util.List<String> items)
+    {
+        LOGGER.info("DmaapConsumerJsonParser input for parsing: {} with commit", items);
+        if (items.size() == 0) {
+            LOGGER.info("Nothing to consume from Kafka");
+            return Flux.empty();
+        }
+       return create(
+                Flux.defer(() -> Flux.fromStream(StreamSupport.stream(items.spliterator(), false)
+                        .map(jsonObjectFromString -> getJsonObjectFromString(jsonObjectFromString)
+                                .orElseGet(JsonObject::new)))));
+    }
+
+    Optional<JsonObject> getJsonObjectFromString(String element) {
+        return Optional.ofNullable(JsonParser.parseString(element).getAsJsonObject());
+    }
+
+    public String getSourceName() {
+        return sourceName;
+    }
+
     Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
         JsonParser jsonParser = new JsonParser();
         return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
-            : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
+                : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
+    }
+
+    Optional<JsonObject> getJsonObjectFromKafkaRecords(String element) {
+        return Optional.ofNullable(new JsonObject().getAsJsonObject(element));
     }
 
+
     private Flux<ConsumerDmaapModel> create(Flux<JsonObject> jsonObject) {
-        return jsonObject.flatMap(monoJsonP ->
-            !containsHeader(monoJsonP) ? logErrorAndReturnMonoEmpty("Incorrect JsonObject - missing header")
-                : transform(monoJsonP))
-            .onErrorResume(exception -> exception instanceof DmaapNotFoundException, e -> Mono.empty());
+        return jsonObject.flatMap(monoJsonP -> !containsHeader(monoJsonP) ? logErrorAndReturnMonoEmpty("Incorrect JsonObject - missing header")
+                            : transform(monoJsonP));
     }
 
     private Mono<ConsumerDmaapModel> transform(JsonObject responseFromDmaap) {
         JsonObject commonEventHeader = responseFromDmaap.getAsJsonObject(EVENT)
-            .getAsJsonObject(COMMON_EVENT_HEADER);
+                .getAsJsonObject(COMMON_EVENT_HEADER);
         JsonObject pnfRegistrationFields = responseFromDmaap.getAsJsonObject(EVENT)
-            .getAsJsonObject(PNF_REGISTRATION_FIELDS);
-
+                .getAsJsonObject(PNF_REGISTRATION_FIELDS);
         this.pnfSourceName = getValueFromJson(commonEventHeader, SOURCE_NAME);
         this.pnfNfRoleOptionalField = getValueFromJson(commonEventHeader, NF_ROLE);
         this.pnfOamIpv4Address = getValueFromJson(pnfRegistrationFields, OAM_IPV_4_ADDRESS);
@@ -126,21 +173,20 @@ public class DmaapConsumerJsonParser {
         this.pnfEquipTypeOptionalField = getValueFromJson(pnfRegistrationFields, EQUIP_TYPE);
         this.pnfSwVersionOptionalField = getValueFromJson(pnfRegistrationFields, SW_VERSION);
         this.pnfAdditionalFields = pnfRegistrationFields.getAsJsonObject(ADDITIONAL_FIELDS);
-
         return (StringUtils.isEmpty(pnfSourceName))
-            ? logErrorAndReturnMonoEmpty("Incorrect json, consumerDmaapModel can not be created: "
-            + printMessage()) :
-            Mono.just(ImmutableConsumerDmaapModel.builder()
-                .correlationId(pnfSourceName)
-                .ipv4(pnfOamIpv4Address)
-                .ipv6(pnfOamIpv6Address)
-                .serialNumber(pnfSerialNumberOptionalField)
-                .equipVendor(pnfEquipVendorOptionalField)
-                .equipModel(pnfEquipModelOptionalField)
-                .equipType(pnfEquipTypeOptionalField)
-                .nfRole(pnfNfRoleOptionalField)
-                .swVersion(pnfSwVersionOptionalField)
-                .additionalFields(pnfAdditionalFields).build());
+                ? logErrorAndReturnMonoEmpty("Incorrect json, consumerDmaapModel can not be created: "
+                + printMessage()) :
+                Mono.just(ImmutableConsumerDmaapModel.builder()
+                        .correlationId(pnfSourceName)
+                        .ipv4(pnfOamIpv4Address)
+                        .ipv6(pnfOamIpv6Address)
+                        .serialNumber(pnfSerialNumberOptionalField)
+                        .equipVendor(pnfEquipVendorOptionalField)
+                        .equipModel(pnfEquipModelOptionalField)
+                        .equipType(pnfEquipTypeOptionalField)
+                        .nfRole(pnfNfRoleOptionalField)
+                        .swVersion(pnfSwVersionOptionalField)
+                        .additionalFields(pnfAdditionalFields).build());
     }
 
     private String getValueFromJson(JsonObject jsonObject, String jsonKey) {
@@ -148,30 +194,39 @@ public class DmaapConsumerJsonParser {
     }
 
     private boolean containsHeader(JsonObject jsonObject) {
-        return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(PNF_REGISTRATION_FIELDS);
+        try {
+            return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(PNF_REGISTRATION_FIELDS);
+        }catch(Exception e){
+            LOGGER.info("Fetching an error in containsHeader method {}",e.getMessage());
+        }
+        return false;
     }
 
     private String printMessage() {
         return String.format("%n{"
-                + "\"" + CORRELATION_ID + COMMON_FORMAT_FOR_STRING + ","
-                + "\"" + OAM_IPV_4_ADDRESS + COMMON_FORMAT_FOR_STRING + ","
-                + "\"" + OAM_IPV_6_ADDRESS + COMMON_FORMAT_FOR_STRING + ","
-                + "\"" + SERIAL_NUMBER + COMMON_FORMAT_FOR_STRING + ","
-                + "\"" + EQUIP_VENDOR + COMMON_FORMAT_FOR_STRING + ","
-                + "\"" + EQUIP_MODEL + COMMON_FORMAT_FOR_STRING + ","
-                + "\"" + EQUIP_TYPE + COMMON_FORMAT_FOR_STRING + ","
-                + "\"" + NF_ROLE + COMMON_FORMAT_FOR_STRING + ","
-                + "\"" + SW_VERSION + COMMON_FORMAT_FOR_STRING + ","
-                + "\"" + ADDITIONAL_FIELDS + COMMON_FORMAT_FOR_JSON_OBJECT
-                + "%n}", this.pnfSourceName, this.pnfOamIpv4Address, this.pnfOamIpv6Address,
-            this.pnfSerialNumberOptionalField, this.pnfEquipVendorOptionalField,
-            this.pnfEquipModelOptionalField, this.pnfEquipTypeOptionalField,
-            this.pnfNfRoleOptionalField, this.pnfSwVersionOptionalField, this.pnfAdditionalFields
+                        + "\"" + CORRELATION_ID + COMMON_FORMAT_FOR_STRING + ","
+                        + "\"" + OAM_IPV_4_ADDRESS + COMMON_FORMAT_FOR_STRING + ","
+                        + "\"" + OAM_IPV_6_ADDRESS + COMMON_FORMAT_FOR_STRING + ","
+                        + "\"" + SERIAL_NUMBER + COMMON_FORMAT_FOR_STRING + ","
+                        + "\"" + EQUIP_VENDOR + COMMON_FORMAT_FOR_STRING + ","
+                        + "\"" + EQUIP_MODEL + COMMON_FORMAT_FOR_STRING + ","
+                        + "\"" + EQUIP_TYPE + COMMON_FORMAT_FOR_STRING + ","
+                        + "\"" + NF_ROLE + COMMON_FORMAT_FOR_STRING + ","
+                        + "\"" + SW_VERSION + COMMON_FORMAT_FOR_STRING + ","
+                        + "\"" + ADDITIONAL_FIELDS + COMMON_FORMAT_FOR_JSON_OBJECT
+                        + "%n}", this.pnfSourceName, this.pnfOamIpv4Address, this.pnfOamIpv6Address,
+                this.pnfSerialNumberOptionalField, this.pnfEquipVendorOptionalField,
+                this.pnfEquipModelOptionalField, this.pnfEquipTypeOptionalField,
+                this.pnfNfRoleOptionalField, this.pnfSwVersionOptionalField, this.pnfAdditionalFields
         );
     }
 
     private <T> Mono<T> logErrorAndReturnMonoEmpty(String messageForLogger) {
-        LOGGER.warn(messageForLogger);
+        LOGGER.info(messageForLogger);
         return Mono.empty();
     }
+
+    public JSONArray getJsonArray(String value) throws JSONException {
+        return new JSONArray(value);
+    }
 }
index 35eb948..ce8059b 100644 (file)
@@ -3,6 +3,7 @@
  * PNF-REGISTRATION-HANDLER
  * ================================================================================
  * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -25,6 +26,6 @@ import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
 import reactor.core.publisher.Mono;
 
 @FunctionalInterface
-interface AaiProducerTask {
+public interface AaiProducerTask {
     Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException;
 }
index 11ff369..5f86010 100644 (file)
@@ -3,6 +3,7 @@
  * PNF-REGISTRATION-HANDLER
  * ================================================================================
  * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -23,7 +24,8 @@ package org.onap.dcaegen2.services.prh.tasks;
 import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel;
 import reactor.core.publisher.Mono;
 
-@FunctionalInterface
+
 public interface AaiQueryTask {
     Mono<Boolean> execute(final ConsumerDmaapModel aaiModel);
+    Mono<ConsumerDmaapModel> findPnfinAAI(final ConsumerDmaapModel model);
 }
index 3db4887..4a7eef5 100644 (file)
@@ -3,10 +3,10 @@
  * PNF-REGISTRATION-HANDLER
  * ================================================================================
  * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
  *
  *      http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -35,6 +35,8 @@ import org.onap.dcaegen2.services.prh.model.RelationshipDict;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Mono;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @Component
 public class AaiQueryTaskImpl implements AaiQueryTask {
@@ -44,6 +46,7 @@ public class AaiQueryTaskImpl implements AaiQueryTask {
     static final String SERVICE_TYPE = "service-subscription.service-type";
     static final String SERVICE_INSTANCE_ID = "service-instance.service-instance-id";
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(AaiQueryTaskImpl.class);
     private final AaiHttpClient<ConsumerDmaapModel, AaiPnfResultModel> getPnfModelClient;
     private final AaiHttpClient<AaiServiceInstanceQueryModel, AaiServiceInstanceResultModel> getServiceClient;
 
@@ -55,8 +58,11 @@ public class AaiQueryTaskImpl implements AaiQueryTask {
         this.getServiceClient = getServiceClient;
     }
 
+    
+
     @Override
     public Mono<Boolean> execute(ConsumerDmaapModel aaiModel) {
+
         return getPnfModelClient
                 .getAaiResponse(aaiModel)
                 .flatMap(this::checkIfPnfHasRelationToService)
@@ -65,7 +71,22 @@ public class AaiQueryTaskImpl implements AaiQueryTask {
                 .defaultIfEmpty(false);
     }
 
+
+   // Added by DTAG, March 2023
+    @Override
+    public Mono<ConsumerDmaapModel> findPnfinAAI(final ConsumerDmaapModel model) {
+
+        return getPnfModelClient
+                .getAaiResponse(model)
+                .flatMap(aaiModel -> Mono.just(model));
+
+                    
+    }
+     
+
+
     private Mono<AaiServiceInstanceQueryModel> checkIfPnfHasRelationToService(final AaiPnfResultModel model) {
+
         return Mono
                 .justOrEmpty(model.getRelationshipList())
                 .map(this::findRelatedTo)
@@ -88,10 +109,12 @@ public class AaiQueryTaskImpl implements AaiQueryTask {
     }
 
     private Boolean checkIfRelatedServiceInstanceIsActive(final AaiServiceInstanceResultModel model) {
+
         return ACTIVE_STATUS.equalsIgnoreCase(model.getOrchestrationStatus());
     }
 
     private Optional<RelationshipDict> findRelatedTo(final Relationship data) {
+
         return Optional.ofNullable(data.getRelationship())
                 .map(Stream::of)
                 .orElseGet(Stream::empty)
@@ -101,10 +124,12 @@ public class AaiQueryTaskImpl implements AaiQueryTask {
     }
 
     private Optional<String> findValue(final List<RelationshipData> data, final String key) {
+
         return data
                 .stream()
                 .filter(y -> key.equals(y.getRelationshipKey()))
                 .findFirst()
                 .map(RelationshipData::getRelationshipValue);
     }
+  
 }
index 68a44eb..f305a92 100644 (file)
@@ -3,6 +3,7 @@
  * PNF-REGISTRATION-HANDLER
  * ================================================================================
  * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.dcaegen2.services.prh.tasks;
 
-import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.INSTANCE_UUID;
-import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE;
-
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.function.Predicate;
 import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
 import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables;
+import org.slf4j.Marker;
+import org.slf4j.MarkerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
-import org.slf4j.Marker;
-import org.slf4j.MarkerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.configurationprocessor.json.JSONException;
+import org.springframework.context.annotation.Profile;
 import org.springframework.http.HttpStatus;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Predicate;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.INSTANCE_UUID;
+import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">PrzemysÅ‚aw WÄ…sala</a> on 3/23/18
  */
+/**
+ * @author <a href="mailto:sangeeta.bellara@t-systems.com">Sangeeta Bellara</a> on 3/12/23
+ */
+
+@Profile("!autoCommitDisabled")
 @Component
 public class ScheduledTasks {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTasks.class);
     private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
-
-    private final DmaapConsumerTask dmaapConsumerTask;
-    private final DmaapPublisherTask dmaapReadyProducerTask;
-    private final DmaapPublisherTask dmaapUpdateProducerTask;
-    private final AaiQueryTask aaiQueryTask;
-    private final AaiProducerTask aaiProducerTask;
-    private final BbsActionsTask bbsActionsTask;
+    private static Boolean pnfFound = true;
+    private DmaapConsumerTask dmaapConsumerTask;
+
+    private DmaapPublisherTask dmaapReadyProducerTask;
+    private DmaapPublisherTask dmaapUpdateProducerTask;
+    private  AaiQueryTask aaiQueryTask;
+    private  AaiProducerTask aaiProducerTask;
+    private  BbsActionsTask bbsActionsTask;
     private Map<String, String> mdcContextMap;
 
     /**
@@ -69,6 +78,7 @@ public class ScheduledTasks {
      * @param dmaapUpdatePublisherTask - fourth task
      * @param aaiPublisherTask         - second task
      */
+
     @Autowired
     public ScheduledTasks(
             final DmaapConsumerTask dmaapConsumerTask,
@@ -90,8 +100,8 @@ public class ScheduledTasks {
     static class State {
         public final ConsumerDmaapModel dmaapModel;
         public final Boolean activationStatus;
-
-        public State(final ConsumerDmaapModel dmaapModel, final Boolean activationStatus) {
+      
+        public State(ConsumerDmaapModel dmaapModel, final Boolean activationStatus) {
             this.dmaapModel = dmaapModel;
             this.activationStatus = activationStatus;
         }
@@ -139,7 +149,7 @@ public class ScheduledTasks {
 
     private void onError(Throwable throwable) {
         if (!(throwable instanceof DmaapEmptyResponseException)) {
-            LOGGER.warn("Chain of tasks have been aborted due to errors in PRH workflow", throwable);
+            LOGGER.warn("Chain of tasks have been aborted due to errors in PRH workflow {}", throwable);
         }
     }
 
@@ -153,7 +163,8 @@ public class ScheduledTasks {
     }
 
     private Mono<State> queryAaiForConfiguration(final ConsumerDmaapModel monoDMaaPModel) {
-        return aaiQueryTask
+        LOGGER.info("Find AAI Info  --> "+monoDMaaPModel.getCorrelationId());
+        return   aaiQueryTask
                 .execute(monoDMaaPModel)
                 .map(x -> new State(monoDMaaPModel, x));
     }
index 70c54a5..09e06da 100644 (file)
@@ -3,6 +3,7 @@
  * PNF-REGISTRATION-HANDLER
  * ================================================================================
  * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -32,6 +33,7 @@ import org.slf4j.Marker;
 import org.slf4j.MarkerFactory;
 import org.springframework.boot.context.event.ApplicationStartedEvent;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Profile;
 import org.springframework.context.event.EventListener;
 import org.springframework.scheduling.TaskScheduler;
 import org.springframework.scheduling.annotation.EnableScheduling;
@@ -40,6 +42,7 @@ import org.springframework.scheduling.annotation.EnableScheduling;
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">PrzemysÅ‚aw WÄ…sala</a> on 6/13/18
  */
+@Profile("!autoCommitDisabled")
 @Configuration
 @EnableScheduling
 public class ScheduledTasksRunner {
@@ -58,9 +61,11 @@ public class ScheduledTasksRunner {
         this.prhProperties = prhProperties;
     }
 
+     String profile = System.getenv("SPRING_PROFILES_ACTIVE");
+
     @EventListener
     public void onApplicationStartedEvent(ApplicationStartedEvent applicationStartedEvent) {
-        tryToStartTask();
+            tryToStartTask();
     }
 
     /**
@@ -88,5 +93,5 @@ public class ScheduledTasksRunner {
             return false;
         }
     }
-
 }
+
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/EpochDateTimeConversion.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/EpochDateTimeConversion.java
new file mode 100644 (file)
index 0000000..4bf4920
--- /dev/null
@@ -0,0 +1,95 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.prh.tasks.commit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Date;
+
+/**
+ * This class will return start date time of the day and end date time of the day in epoch format.
+ * @author <a href="mailto:mohd.khan@t-systems.com">Mohd Usman Khan</a> on 3/13/23
+ */
+
+@Component
+public class EpochDateTimeConversion {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(EpochDateTimeConversion.class);
+
+    private String daysForRecords = System.getenv("number_of_days");
+
+    public Long getStartDateOfTheDay(){
+        return getEpochDateTime(atStartOfDay(getCurrentDate()));
+    }
+
+    public Long getEndDateOfTheDay(){
+        return getEpochDateTime(atEndOfDay(getCurrentDate()));
+    }
+
+    private Long getEpochDateTime(Date date)
+    {
+        DateTimeFormatter dtf  = DateTimeFormatter.ofPattern("E MMM dd HH:mm:ss zzz yyyy");
+        ZonedDateTime zdt  = ZonedDateTime.parse( date.toString(),dtf);
+        return zdt.toInstant().toEpochMilli();
+    }
+
+    private Date getCurrentDate()
+    {
+        return new java.util.Date(System.currentTimeMillis());
+    }
+
+    public Date atStartOfDay(Date date) {
+        LocalDateTime localDateTime = dateToLocalDateTime(date);
+        if(daysForRecords==null)
+            daysForRecords="1";
+        LocalDateTime previousDay = localDateTime.minusDays(Integer.parseInt(daysForRecords) - 1l);
+        LocalDateTime previousStartTime = previousDay.with(LocalTime.MIN);
+        return localDateTimeToDate(previousStartTime);
+    }
+
+    private Date atEndOfDay(Date date) {
+        LocalDateTime localDateTime = dateToLocalDateTime(date);
+        LocalDateTime endOfDay = localDateTime.with(LocalTime.MAX);
+        return localDateTimeToDate(endOfDay);
+    }
+
+    private LocalDateTime dateToLocalDateTime(Date date) {
+        return LocalDateTime.ofInstant(date.toInstant(), ZoneId.systemDefault());
+    }
+
+    private Date localDateTimeToDate(LocalDateTime localDateTime) {
+        return Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());
+    }
+
+    public String getDaysForRecords() {
+        return daysForRecords;
+    }
+
+    public void setDaysForRecords(String daysForRecords) {
+        this.daysForRecords = daysForRecords;
+    }
+}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTask.java
new file mode 100644 (file)
index 0000000..4c70c71
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.tasks.commit;
+
+import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel;
+import org.springframework.boot.configurationprocessor.json.JSONException;
+import reactor.core.publisher.Flux;
+
+/**
+ * @author <a href="mailto:ajinkya-patil@t-systems.com">Ajinkya Patil</a> on 3/13/23
+ */
+
+public interface KafkaConsumerTask {
+    Flux<ConsumerDmaapModel> execute() throws JSONException;
+
+    void commitOffset();
+}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImpl.java
new file mode 100644 (file)
index 0000000..30e6cff
--- /dev/null
@@ -0,0 +1,99 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.tasks.commit;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel;
+import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.configurationprocessor.json.JSONException;
+import org.springframework.context.annotation.Profile;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.listener.BatchAcknowledgingMessageListener;
+import org.springframework.kafka.support.Acknowledgment;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author <a href="mailto:ajinkya-patil@t-systems.com">Ajinkya Patil</a> on 3/13/23
+ */
+
+@Profile("autoCommitDisabled")
+@Component
+public class KafkaConsumerTaskImpl implements KafkaConsumerTask, BatchAcknowledgingMessageListener<String, String> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerTaskImpl.class);
+
+    @Autowired
+    private DmaapConsumerJsonParser dmaapConsumerJsonParser;
+
+    @Autowired
+    private EpochDateTimeConversion epochDateTimeConversion;
+
+    private List<String> jsonEvent = new ArrayList<>();
+
+    private Acknowledgment offset;
+
+    String kafkaTopic = System.getenv("kafkaTopic");
+
+    String groupIdConfig = System.getenv("groupIdConfig");
+
+    @Override
+    @KafkaListener(topics = "${kafkaTopic}", groupId = "${groupIdConfig}")
+    public void onMessage(List<ConsumerRecord<String, String>> list, Acknowledgment acknowledgment) {
+       
+       if (list != null && !list.isEmpty()) {
+           
+            
+            list.stream().filter(consumerRecord -> consumerRecord.timestamp() >= epochDateTimeConversion.getStartDateOfTheDay() && consumerRecord.timestamp() <= epochDateTimeConversion.getEndDateOfTheDay())
+                    .map(ConsumerRecord::value)
+                    .forEach(value -> {
+                          jsonEvent.add(value);
+                    });
+
+                     
+        }
+       
+
+        offset = acknowledgment;
+    }
+
+    @Override
+    public Flux<ConsumerDmaapModel> execute() throws JSONException {
+        return dmaapConsumerJsonParser.getConsumerDmaapModelFromKafkaConsumerRecord(jsonEvent);
+    }
+
+    @Override
+    public void commitOffset() {
+        if(!jsonEvent.isEmpty()){
+            jsonEvent.clear();
+        }
+        if(offset != null){
+            offset.acknowledge();
+        }
+    }
+
+}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommit.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommit.java
new file mode 100644 (file)
index 0000000..64d7798
--- /dev/null
@@ -0,0 +1,99 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.prh.tasks.commit;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ScheduledFuture;
+import javax.annotation.PreDestroy;
+import org.onap.dcaegen2.services.prh.configuration.PrhProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Marker;
+import org.slf4j.MarkerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.context.event.ApplicationStartedEvent;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Profile;
+import org.springframework.context.event.EventListener;
+import org.springframework.scheduling.TaskScheduler;
+import org.springframework.scheduling.annotation.EnableScheduling;
+
+/**
+ * @author <a href="mailto:pravin.kokane@t-systems.com">Pravin Kokane</a> on 3/13/23
+ */
+
+@Profile("autoCommitDisabled")
+@Configuration
+@EnableScheduling
+public class ScheduledTasksRunnerWithCommit {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTasksRunnerWithCommit.class);
+    private static final Marker ENTRY = MarkerFactory.getMarker("ENTRY");
+    private static List<ScheduledFuture> scheduledPrhTaskFutureList = new ArrayList<>();
+
+    private final TaskScheduler taskScheduler;
+    private final PrhProperties prhProperties;
+
+    @Autowired
+    private ScheduledTasksWithCommit scheduledTasksWithCommit;
+
+    public ScheduledTasksRunnerWithCommit(TaskScheduler taskScheduler, ScheduledTasksWithCommit scheduledTasksWithCommit,
+                                          PrhProperties prhProperties) {
+        this.taskScheduler = taskScheduler;
+        this.scheduledTasksWithCommit = scheduledTasksWithCommit;
+        this.prhProperties = prhProperties;
+    }
+
+    @EventListener
+    public void onApplicationStartedEvent(ApplicationStartedEvent applicationStartedEvent) {
+        tryToStartTaskWithCommit();
+    }
+
+    /**
+     * Function which have to stop tasks execution.
+     */
+    @PreDestroy
+    public synchronized void cancelTasks() {
+        scheduledPrhTaskFutureList.forEach(x -> x.cancel(false));
+        scheduledPrhTaskFutureList.clear();
+    }
+
+    /**
+     * Function for starting scheduling PRH workflow.
+     *
+     * @return status of operation execution: true - started, false - not started
+     */
+
+    public synchronized boolean tryToStartTaskWithCommit() {
+        LOGGER.info(ENTRY, "Start scheduling PRH workflow with Commit  Tasks Runner");
+        if (scheduledPrhTaskFutureList.isEmpty()) {
+            Collections.synchronizedList(scheduledPrhTaskFutureList);
+            scheduledPrhTaskFutureList.add(taskScheduler
+                .scheduleWithFixedDelay(scheduledTasksWithCommit::scheduleKafkaPrhEventTask,
+                    prhProperties.getWorkflowSchedulingInterval()));
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+}
+
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommit.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommit.java
new file mode 100644 (file)
index 0000000..b0eae94
--- /dev/null
@@ -0,0 +1,213 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.tasks.commit;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
+import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
+import org.onap.dcaegen2.services.prh.tasks.AaiProducerTask;
+import org.onap.dcaegen2.services.prh.tasks.AaiQueryTask;
+import org.onap.dcaegen2.services.prh.tasks.BbsActionsTask;
+import org.onap.dcaegen2.services.prh.tasks.DmaapPublisherTask;
+import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables;
+import org.slf4j.Marker;
+import org.slf4j.MarkerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.configurationprocessor.json.JSONException;
+import org.springframework.context.annotation.Profile;
+import org.springframework.http.HttpStatus;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+/**
+ * @author <a href="mailto:sangeeta.bellara@t-systems.com">Sangeeta Bellara</a> on 3/13/23
+ */
+@Profile("autoCommitDisabled")
+@Component
+public class ScheduledTasksWithCommit {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTasksWithCommit.class);
+    private static Boolean pnfFound = true;
+    private KafkaConsumerTask kafkaConsumerTask;
+    private DmaapPublisherTask dmaapReadyProducerTask;
+    private DmaapPublisherTask dmaapUpdateProducerTask;
+    private AaiQueryTask aaiQueryTask;
+    private AaiProducerTask aaiProducerTask;
+    private BbsActionsTask bbsActionsTask;
+    private Map<String, String> mdcContextMap;
+
+    /**
+     * Constructor for tasks registration in PRHWorkflow.
+     *
+     * @param kafkaConsumerTask        - fist task
+     * @param dmaapReadyPublisherTask  - third task
+     * @param dmaapUpdatePublisherTask - fourth task
+     * @param aaiPublisherTask         - second task
+     */
+    @Autowired
+    public ScheduledTasksWithCommit(
+        final KafkaConsumerTask kafkaConsumerTask,
+        @Qualifier("ReadyPublisherTask") final DmaapPublisherTask dmaapReadyPublisherTask,
+        @Qualifier("UpdatePublisherTask") final DmaapPublisherTask dmaapUpdatePublisherTask,
+        final AaiQueryTask aaiQueryTask,
+        final AaiProducerTask aaiPublisherTask,
+        final BbsActionsTask bbsActionsTask,
+        final Map<String, String> mdcContextMap) {
+        this.dmaapReadyProducerTask = dmaapReadyPublisherTask;
+        this.dmaapUpdateProducerTask = dmaapUpdatePublisherTask;
+        this.kafkaConsumerTask=kafkaConsumerTask;
+        this.aaiQueryTask = aaiQueryTask;
+        this.aaiProducerTask = aaiPublisherTask;
+        this.bbsActionsTask = bbsActionsTask;
+        this.mdcContextMap = mdcContextMap;
+    }
+
+    static class State {
+        public ConsumerDmaapModel dmaapModel;
+        public  Boolean activationStatus;
+
+        public State(ConsumerDmaapModel dmaapModel, final Boolean activationStatus) {
+            this.dmaapModel = dmaapModel;
+            this.activationStatus = activationStatus;
+        }
+    }
+
+    public void scheduleKafkaPrhEventTask() {
+        MdcVariables.setMdcContextMap(mdcContextMap);
+        try {
+            LOGGER.info("Execution of tasks was registered with commit");
+            CountDownLatch mainCountDownLatch = new CountDownLatch(1);
+            consumeFromKafkaMessage()
+                    .flatMap(model->queryAaiForPnf(model)
+                .doOnError(e -> { LOGGER.info("PNF Not Found in AAI --> {}" + e);
+                   disableCommit();
+                })
+                .onErrorResume(e -> Mono.empty())
+                
+                )
+                    .flatMap(this::queryAaiForConfiguration)
+                .flatMap(this::publishToAaiConfiguration)
+                .flatMap(this::processAdditionalFields)
+                .flatMap(this::publishToDmaapConfiguration)
+                    .onErrorResume(e -> Mono.empty())
+                
+                .doOnTerminate(mainCountDownLatch::countDown)
+                .subscribe(this::onSuccess, this::onError, this::onCompleteKafka);
+            mainCountDownLatch.await();
+        } catch (InterruptedException | JSONException e ) {
+            LOGGER.warn("Interruption problem on countDownLatch {}", e);
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    private static void disableCommit()
+    {
+        pnfFound=false;
+    }
+
+    private void onCompleteKafka() {
+        LOGGER.info("PRH tasks have been completed");
+        if(pnfFound){
+            kafkaConsumerTask.commitOffset();
+            LOGGER.info("Committed the Offset");
+        }
+        else
+        {
+            LOGGER.info("Offset not Committed");
+            pnfFound=true;
+        }
+    }
+
+
+    private void onSuccess(MessageRouterPublishResponse response) {
+        if (response.successful()) {
+            String statusCodeOk = HttpStatus.OK.name();
+            MDC.put(RESPONSE_CODE, statusCodeOk);
+            LOGGER.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}", statusCodeOk);
+            MDC.remove(RESPONSE_CODE);
+        }
+    }
+
+    private void onError(Throwable throwable) {
+        if (!(throwable instanceof DmaapEmptyResponseException)) {
+            LOGGER.warn("Chain of tasks have been aborted due to errors in PRH workflow {}", throwable);
+        }
+    }
+
+    private Flux<ConsumerDmaapModel> consumeFromKafkaMessage() throws JSONException {
+        return kafkaConsumerTask.execute();
+    }
+
+    private Mono<State> queryAaiForConfiguration(final ConsumerDmaapModel monoDMaaPModel) {
+        return   aaiQueryTask
+            .execute(monoDMaaPModel)
+            .map(x -> new State(monoDMaaPModel, x));
+    }
+
+    private Mono<ConsumerDmaapModel> queryAaiForPnf(final ConsumerDmaapModel monoDMaaPModel) {
+
+        LOGGER.info("Find PNF --> "+monoDMaaPModel.getCorrelationId());
+        return aaiQueryTask.findPnfinAAI(monoDMaaPModel);
+    }
+
+
+    private Mono<State> publishToAaiConfiguration(final State state) {
+        try {
+            return aaiProducerTask
+                .execute(state.dmaapModel)
+                .map(x -> state);
+        } catch (PrhTaskException e) {
+            LOGGER.warn("AAIProducerTask exception has been registered: {}", e);
+            return Mono.error(e);
+        }
+    }
+
+    private Mono<State> processAdditionalFields(final State state) {
+        if (state.activationStatus) {
+            LOGGER.debug("Re-registration - Logical links won't be updated.");
+            return Mono.just(state);
+        }
+        return bbsActionsTask.execute(state.dmaapModel).map(x -> state);
+    }
+
+    private Flux<MessageRouterPublishResponse> publishToDmaapConfiguration(final State state) {
+        try {
+            if (state.activationStatus) {
+                LOGGER.debug("Re-registration - Using PNF_UPDATE DMaaP topic.");
+                return dmaapUpdateProducerTask.execute(state.dmaapModel);
+            }
+            return dmaapReadyProducerTask.execute(state.dmaapModel);
+        } catch (PrhTaskException e) {
+            LOGGER.warn("DMaaPProducerTask exception has been registered: ", e);
+            return Flux.error(e);
+        }
+    }
+}
index 8f1950d..e62d4e9 100644 (file)
@@ -1,6 +1,6 @@
 spring:
-  profiles:
-    active: prod
+  profiles: prod
+
 server:
   port: 8433
   ssl:
@@ -25,4 +25,13 @@ logging:
 
 management.endpoints.web.exposure.include: "*"
 
+---
+spring:
+  profiles:
+    default: prod
+
+
+
+
+
 
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfigTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfigTest.java
new file mode 100644 (file)
index 0000000..22b82e3
--- /dev/null
@@ -0,0 +1,56 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.prh.configuration;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class KafkaConfigTest {
+
+    @InjectMocks
+    KafkaConfig kafkaConfig;
+
+    @BeforeEach
+    void setUp() {
+        kafkaConfig.kafkaBoostrapServerConfig = "0.0.0.0";
+        kafkaConfig.groupIdConfig = "consumer-test";
+        kafkaConfig.kafkaSecurityProtocol = "test";
+        kafkaConfig.kafkaSaslMechanism = "test";
+        kafkaConfig.kafkaUsername = "test";
+        kafkaConfig.kafkaPassword = "test";
+        kafkaConfig.kafkaJaasConfig = null;
+        kafkaConfig.kafkaLoginModuleClassConfig = "test";
+        kafkaConfig.kafkaJaasConfig = "test";
+    }
+
+    @Test
+    public void consumerFactoryTest(){
+        kafkaConfig.consumerFactory();
+    }
+
+    @Test
+    public void kafkaListenerContainerFactoryTest(){
+        kafkaConfig.kafkaListenerContainerFactory();
+    }
+}
index ebdec09..bbc6b96 100644 (file)
@@ -3,6 +3,7 @@
  * PNF-REGISTRATION-HANDLER
  * ================================================================================
  * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -42,7 +43,7 @@ class ScheduleControllerTest {
     @Autowired
     private WebTestClient webTestClient;
 
-    @Test
+   @Test
     void startEndpointShouldAllowStartingPrhTasks() {
         when(scheduledTasksRunner.tryToStartTask()).thenReturn(true);
         webTestClient
@@ -72,4 +73,4 @@ class ScheduleControllerTest {
 
         verify(scheduledTasksRunner).cancelTasks();
     }
-}
\ No newline at end of file
+}
index 01beb88..1a6c76c 100644 (file)
@@ -3,6 +3,7 @@
  * PNF-REGISTRATION-HANDLER
  * ================================================================================
  * Copyright (C) 2019-2021 NOKIA Intellectual Property. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -38,11 +39,29 @@ import org.springframework.cloud.contract.wiremock.AutoConfigureWireMock;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Import;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlPathMatching;
+import static com.github.tomakehurst.wiremock.client.WireMock.ok;
+import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.post;
+import static com.github.tomakehurst.wiremock.client.WireMock.anyRequestedFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.matchingJsonPath;
+import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.verify;
+import static com.github.tomakehurst.wiremock.client.WireMock.patch;
+
+
+
+
+
 
 import java.nio.file.Files;
 import java.nio.file.Paths;
 
-import static com.github.tomakehurst.wiremock.client.WireMock.*;
+
 import static java.lang.ClassLoader.getSystemResource;
 import static java.util.Collections.singletonList;
 
index 9dab7aa..ba75935 100644 (file)
@@ -3,6 +3,7 @@
  * PNF-REGISTRATION-HANDLER
  * ================================================================================
  * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.dcaegen2.services.prh.service;
 
-import static org.mockito.Mockito.spy;
-
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import io.vavr.collection.List;
-import java.util.Optional;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
 import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.adapter.aai.api.ImmutableConsumerDmaapModel;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
@@ -37,6 +34,11 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRo
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
+import java.util.Optional;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">PrzemysÅ‚aw WÄ…sala</a> on 5/8/18
  */
@@ -108,7 +110,7 @@ class DmaapConsumerJsonParserTest {
         //when
         DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
         JsonElement jsonElement = jsonParser.parse(parsed);
-        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
+        doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
 
         ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser
@@ -171,7 +173,7 @@ class DmaapConsumerJsonParserTest {
         //when
         DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
         JsonElement jsonElement = new JsonParser().parse(parsed);
-        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
+        doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
         ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser
             .getJsonObject(Mono.just((response))).blockFirst();
@@ -238,7 +240,7 @@ class DmaapConsumerJsonParserTest {
         //when
         DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
         JsonElement jsonElement = jsonParser.parse(parsed);
-        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
+        doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
 
         ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser
@@ -302,7 +304,7 @@ class DmaapConsumerJsonParserTest {
         //when
         DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
         JsonElement jsonElement = jsonParser.parse(parsed);
-        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
+        doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
 
         ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser
@@ -334,7 +336,7 @@ class DmaapConsumerJsonParserTest {
 
         DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
         JsonElement jsonElement = jsonParser.parse(parsed);
-        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
+        doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
         String incorrectMessage = "{\"event\": {"
             + "\"commonEventHeader\": {},"
@@ -380,7 +382,7 @@ class DmaapConsumerJsonParserTest {
 
         DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
         JsonElement jsonElement = jsonParser.parse(parsed);
-        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
+        doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
         String jsonWithoutSourceName =
             "{\"event\": {"
@@ -430,7 +432,7 @@ class DmaapConsumerJsonParserTest {
 
         DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
         JsonElement jsonElement = jsonParser.parse(parsed);
-        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
+        doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
         String jsonWithoutIpInformation =
             "{\"event\": {"
@@ -497,7 +499,7 @@ class DmaapConsumerJsonParserTest {
         //when
         DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
         JsonElement jsonElement = jsonParser.parse(parsed);
-        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
+        doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
 
         dmaapConsumerJsonParser.getJsonObject(Mono.just((response)));
@@ -573,7 +575,7 @@ class DmaapConsumerJsonParserTest {
         //when
         DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
         JsonElement jsonElement = jsonParser.parse(parsed);
-        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
+        doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
         ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just(response))
             .blockFirst();
index e81b374..517fe73 100644 (file)
@@ -3,6 +3,7 @@
  * PROJECT
  * ================================================================================
  * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -26,6 +27,8 @@ import static org.mockito.Mockito.mock;
 
 import java.util.Collections;
 import java.util.List;
+
+
 import org.assertj.core.util.Lists;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
@@ -38,6 +41,7 @@ import org.onap.dcaegen2.services.prh.adapter.aai.api.AaiPnfResultModel;
 import org.onap.dcaegen2.services.prh.adapter.aai.api.AaiServiceInstanceQueryModel;
 import org.onap.dcaegen2.services.prh.adapter.aai.api.AaiServiceInstanceResultModel;
 import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel;
+import org.onap.dcaegen2.services.prh.adapter.aai.api.ImmutableConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.model.ImmutableRelationshipData;
 import org.onap.dcaegen2.services.prh.model.Relationship;
 import org.onap.dcaegen2.services.prh.model.RelationshipData;
@@ -84,18 +88,6 @@ class AaiQueryTaskImplTest {
         sut = new AaiQueryTaskImpl(getPnfModelClient, getServiceClient);
     }
 
-    @Test
-    void whenPnfIsUnavailable_ShouldThrowException() {
-        //given
-        given(getPnfModelClient.getAaiResponse(aaiModel)).willReturn(Mono.error(new Exception("404")));
-
-        //when
-        final Mono<Boolean> task = sut.execute(aaiModel);
-
-        //then
-        Assertions.assertThrows(Exception.class, task::block);
-    }
-
     @Test
     void whenPnfIsAvailableButRelationshipIsNull_ShouldReturnFalse() {
         //given
@@ -203,4 +195,12 @@ class AaiQueryTaskImplTest {
     private void configurePnfClient(final ConsumerDmaapModel aaiModel, final AaiPnfResultModel pnfResultModel) {
         given(getPnfModelClient.getAaiResponse(aaiModel)).willReturn(Mono.just(pnfResultModel));
     }
+
+    @Test
+    void testFindPnfInAAIActive(){
+        ConsumerDmaapModel model = ImmutableConsumerDmaapModel.builder().correlationId("123").build();
+        configurePnfClient(model, pnfResultModel);
+        Mono<ConsumerDmaapModel> test = sut.findPnfinAAI(model);
+        Assertions.assertNotNull(test);
+    }
 }
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/EpochDateTimeConversionTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/EpochDateTimeConversionTest.java
new file mode 100644 (file)
index 0000000..850587e
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.prh.tasks.commit;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class EpochDateTimeConversionTest {
+
+    private EpochDateTimeConversion epochDateTimeConversion;
+
+    @BeforeEach
+    void setUp() {
+        epochDateTimeConversion = new EpochDateTimeConversion();
+        epochDateTimeConversion.setDaysForRecords("3");
+    }
+
+    @Test
+    public void getStartDateOfTheDayTest(){
+        epochDateTimeConversion.getDaysForRecords();
+        Long day = epochDateTimeConversion.getStartDateOfTheDay();
+        Assertions.assertNotNull(day);
+    }
+
+    @Test
+    public void getEndDateOfTheDayTest(){
+        Long day = epochDateTimeConversion.getEndDateOfTheDay();
+        Assertions.assertNotNull(day);
+    }
+}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java
new file mode 100644 (file)
index 0000000..c23a188
--- /dev/null
@@ -0,0 +1,86 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.tasks;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.TimestampType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel;
+import org.onap.dcaegen2.services.prh.adapter.aai.api.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
+import org.onap.dcaegen2.services.prh.tasks.commit.EpochDateTimeConversion;
+import org.onap.dcaegen2.services.prh.tasks.commit.KafkaConsumerTaskImpl;
+import org.springframework.boot.configurationprocessor.json.JSONException;
+import org.springframework.kafka.support.Acknowledgment;
+import reactor.core.publisher.Flux;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class KafkaConsumerTaskImplTest {
+
+    @Mock
+    private Acknowledgment acknowledgment;
+
+    @Mock
+    private DmaapConsumerJsonParser dmaapConsumerJsonParser;
+
+    @Mock
+    private EpochDateTimeConversion epochDateTimeConversion;
+
+    @InjectMocks
+    private KafkaConsumerTaskImpl kafkaConsumerTask;
+
+    @Test
+    public void onMessageTest(){
+        List<ConsumerRecord<String, String>> list = new ArrayList<>();
+        TimestampType timestampType = null;
+        Headers headers = new RecordHeaders();
+        epochDateTimeConversion.setDaysForRecords("3");
+        ConsumerRecord<String, String> records = new ConsumerRecord<>
+                ("test-topic", 1, 1l, 0l, timestampType, 1, 1, "test-key", "test-value", headers
+        , null);
+        list.add(records);
+        kafkaConsumerTask.onMessage(list, acknowledgment);
+    }
+
+    @Test
+    public void commitOffsetTest(){
+        kafkaConsumerTask.commitOffset();
+    }
+
+    @Test
+    public void executeTest() throws JSONException {
+        List<String> jsonEvent = new ArrayList<>();
+        ConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder().correlationId("123").build();
+        when(dmaapConsumerJsonParser.getConsumerDmaapModelFromKafkaConsumerRecord(jsonEvent)).thenReturn(Flux.just(consumerDmaapModel));
+        kafkaConsumerTask.execute();
+    }
+}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommitTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommitTest.java
new file mode 100644 (file)
index 0000000..401e351
--- /dev/null
@@ -0,0 +1,72 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.prh.tasks.commit;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.onap.dcaegen2.services.prh.configuration.PrhProperties;
+import org.springframework.boot.context.event.ApplicationStartedEvent;
+import org.springframework.scheduling.TaskScheduler;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+@ExtendWith(MockitoExtension.class)
+public class ScheduledTasksRunnerWithCommitTest  {
+
+    @Mock
+    private ScheduledTasksWithCommit scheduledTasksWithCommit;
+
+    @Mock
+    private TaskScheduler taskScheduler;
+
+    @Mock
+    private PrhProperties prhProperties;
+
+    @Mock
+    private ApplicationStartedEvent applicationStartedEvent;
+
+    private ScheduledTasksRunnerWithCommit scheduledTasksRunnerWithCommit;
+
+    @BeforeEach
+    void setUp() {
+        scheduledTasksRunnerWithCommit = new ScheduledTasksRunnerWithCommit(taskScheduler, scheduledTasksWithCommit, prhProperties);
+    }
+
+    @Test
+    void onApplicationStartedEvent() {
+        scheduledTasksRunnerWithCommit.onApplicationStartedEvent(applicationStartedEvent);
+        assertFalse(scheduledTasksRunnerWithCommit.tryToStartTaskWithCommit());
+    }
+
+    @Test
+    void cancelTasks() {
+        scheduledTasksRunnerWithCommit.cancelTasks();
+    }
+
+    @Test
+    void tryToStartTaskWithCommit() {
+        scheduledTasksRunnerWithCommit.tryToStartTaskWithCommit();
+        assertFalse(scheduledTasksRunnerWithCommit.tryToStartTaskWithCommit());
+    }
+}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommitTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommitTest.java
new file mode 100644 (file)
index 0000000..6477902
--- /dev/null
@@ -0,0 +1,263 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.prh.tasks.commit;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.util.Collections;
+import java.util.Map;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel;
+import org.onap.dcaegen2.services.prh.adapter.aai.api.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
+import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
+import org.onap.dcaegen2.services.prh.tasks.AaiProducerTask;
+import org.onap.dcaegen2.services.prh.tasks.AaiQueryTask;
+import org.onap.dcaegen2.services.prh.tasks.BbsActionsTask;
+import org.onap.dcaegen2.services.prh.tasks.DmaapPublisherTask;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.springframework.boot.configurationprocessor.json.JSONException;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+
+@ExtendWith(MockitoExtension.class)
+class ScheduledTasksWithCommitTest {
+    private final static ConsumerDmaapModel DMAAP_MODEL =
+        ImmutableConsumerDmaapModel
+            .builder()
+            .correlationId("SomeId")
+            .ipv4("ipv4")
+            .ipv6("ipv6")
+            .build();
+
+    @Mock
+    private DmaapPublisherTask readyPublisher;
+
+    @Mock
+    private DmaapPublisherTask updatePublisher;
+
+
+    @Mock
+    private BbsActionsTask bbsActionsTask;
+
+    @Mock
+    private KafkaConsumerTask kafkaConsumerTask;
+
+    @Mock
+    private AaiQueryTask aaiQueryTask;
+
+    @Mock
+    private AaiProducerTask aaiProducerTask;
+
+    private final Map<String, String> context = Collections.emptyMap();
+
+    private ScheduledTasksWithCommit sut;
+
+    @BeforeEach
+    void setUp() {
+        sut = new ScheduledTasksWithCommit(
+            kafkaConsumerTask,
+            readyPublisher,
+            updatePublisher,
+            aaiQueryTask,
+            aaiProducerTask,
+            bbsActionsTask,
+            context);
+    }
+
+    @Test
+    void testQueryAAiForPNFOnSuccess() throws JSONException, PrhTaskException {
+        ScheduledTasksWithCommit.State state = new ScheduledTasksWithCommit.State(DMAAP_MODEL, false );
+        MessageRouterPublishResponse messageRouterPublishResponse = new MessageRouterPublishResponse() {
+            @Override
+            public @Nullable String failReason() {
+                    return null;
+                }
+        };
+        when(kafkaConsumerTask.execute()).thenReturn(Flux.just(DMAAP_MODEL));
+        when(aaiQueryTask.findPnfinAAI(DMAAP_MODEL)).thenReturn(Mono.just(DMAAP_MODEL));
+        when(aaiQueryTask.execute(DMAAP_MODEL)).thenReturn(Mono.just(true));
+        when(aaiProducerTask.execute(state.dmaapModel)).thenReturn(Mono.just(DMAAP_MODEL));
+        when(updatePublisher.execute(state.dmaapModel)).thenReturn(Flux.just(messageRouterPublishResponse));
+
+        sut.scheduleKafkaPrhEventTask();
+
+        verifyIfLogicalLinkWasNotCreated();
+        verifyThatPnfModelWasNotSentDmaapPnfReadyTopic();
+    }
+
+    @Test
+    void testQueryAAiForPNF() throws JSONException, PrhTaskException {
+        ScheduledTasksWithCommit.State state = new ScheduledTasksWithCommit.State(DMAAP_MODEL, true);
+        MessageRouterPublishResponse messageRouterPublishResponse = new MessageRouterPublishResponse() {
+            @Override
+            public @Nullable String failReason() {
+                    return null;
+                }
+        };
+        when(kafkaConsumerTask.execute()).thenReturn(Flux.just(DMAAP_MODEL));
+        when(aaiQueryTask.findPnfinAAI(DMAAP_MODEL)).thenReturn(Mono.just(DMAAP_MODEL));
+        when(aaiQueryTask.execute(DMAAP_MODEL)).thenReturn(Mono.just(true));
+        when(aaiProducerTask.execute(state.dmaapModel)).thenReturn(Mono.just(DMAAP_MODEL));
+        when(updatePublisher.execute(state.dmaapModel)).thenReturn(Flux.just(messageRouterPublishResponse));
+
+        sut.scheduleKafkaPrhEventTask();
+
+        verifyIfLogicalLinkWasNotCreated();
+        verifyThatPnfModelWasNotSentDmaapPnfReadyTopic();
+    }
+
+    @Test
+    void testQueryAAiForPNFOnError() throws JSONException, PrhTaskException {
+            when(kafkaConsumerTask.execute()).thenReturn(Flux.just(DMAAP_MODEL));
+
+            sut.scheduleKafkaPrhEventTask();
+
+            verifyThatPnfUpdateWasNotSentToAai();
+
+            verifyIfLogicalLinkWasNotCreated();
+            verifyThatPnfModelWasNotSentDmaapPnfReadyTopic();
+            verifyThatPnfModelWasNotSentDmaapPnfUpdateTopic();
+    }
+
+    @Test
+    void testQueryAAiForPNFOnPRHException() throws JSONException, PrhTaskException {
+        ScheduledTasksWithCommit.State state = new ScheduledTasksWithCommit.State(DMAAP_MODEL, false );
+        MessageRouterPublishResponse messageRouterPublishResponse = new MessageRouterPublishResponse() {
+            @Override
+            public @Nullable String failReason() {
+                return null;
+            }
+        };
+        when(kafkaConsumerTask.execute()).thenReturn(Flux.just(DMAAP_MODEL));
+        when(aaiQueryTask.findPnfinAAI(DMAAP_MODEL)).thenReturn(Mono.just(DMAAP_MODEL));
+        when(aaiQueryTask.execute(DMAAP_MODEL)).thenReturn(Mono.just(true));
+        when(aaiProducerTask.execute(state.dmaapModel)).thenThrow(new PrhTaskException());
+
+        sut.scheduleKafkaPrhEventTask();
+
+        verifyIfLogicalLinkWasNotCreated();
+        verifyThatPnfModelWasNotSentDmaapPnfReadyTopic();
+    }
+
+    @Test
+    void queryAAiForPNFOnPRHExceptionTest() throws JSONException, PrhTaskException {
+        ScheduledTasksWithCommit.State state = new ScheduledTasksWithCommit.State(DMAAP_MODEL, true);
+        MessageRouterPublishResponse messageRouterPublishResponse = new MessageRouterPublishResponse() {
+            @Override
+            public @Nullable String failReason() {
+                return null;
+            }
+        };
+        when(kafkaConsumerTask.execute()).thenReturn(Flux.just(DMAAP_MODEL));
+        when(aaiQueryTask.findPnfinAAI(DMAAP_MODEL)).thenReturn(Mono.just(DMAAP_MODEL));
+        when(aaiQueryTask.execute(DMAAP_MODEL)).thenReturn(Mono.just(true));
+        when(aaiProducerTask.execute(state.dmaapModel)).thenReturn(Mono.just(DMAAP_MODEL));
+        when(updatePublisher.execute(state.dmaapModel)).thenThrow(new PrhTaskException());
+
+        sut.scheduleKafkaPrhEventTask();
+
+        verifyIfLogicalLinkWasNotCreated();
+        verifyThatPnfModelWasNotSentDmaapPnfReadyTopic();
+    }
+
+    @Test
+    void queryAAiForPNFOnPRHExceptionOnDmaapEmptyResponseExceptionTest() throws JSONException, PrhTaskException {
+        ScheduledTasksWithCommit.State state = new ScheduledTasksWithCommit.State(DMAAP_MODEL, true);
+        MessageRouterPublishResponse messageRouterPublishResponse = new MessageRouterPublishResponse() {
+            @Override
+            public @Nullable String failReason() {
+                return null;
+            }
+        };
+        when(kafkaConsumerTask.execute()).thenReturn(Flux.just(DMAAP_MODEL));
+        when(aaiQueryTask.findPnfinAAI(DMAAP_MODEL)).thenReturn(Mono.just(DMAAP_MODEL));
+        when(aaiQueryTask.execute(DMAAP_MODEL)).thenReturn(Mono.just(true));
+        when(aaiProducerTask.execute(state.dmaapModel)).thenReturn(Mono.just(DMAAP_MODEL));
+        when(updatePublisher.execute(state.dmaapModel)).thenThrow(new DmaapEmptyResponseException());
+
+        sut.scheduleKafkaPrhEventTask();
+
+        verifyIfLogicalLinkWasNotCreated();
+        verifyThatPnfModelWasNotSentDmaapPnfReadyTopic();
+    }
+
+    @Test
+    void queryAAiForPNFOnPRHExceptionOnFalseTest() throws JSONException, PrhTaskException {
+        ScheduledTasksWithCommit.State state = new ScheduledTasksWithCommit.State(DMAAP_MODEL, false);
+        MessageRouterPublishResponse messageRouterPublishResponse = new MessageRouterPublishResponse() {
+            @Override
+            public @Nullable String failReason() {
+                return null;
+            }
+        };
+        when(kafkaConsumerTask.execute()).thenReturn(Flux.just(DMAAP_MODEL));
+        when(aaiQueryTask.findPnfinAAI(DMAAP_MODEL)).thenReturn(Mono.just(DMAAP_MODEL));
+        when(aaiQueryTask.execute(DMAAP_MODEL)).thenReturn(Mono.just(false));
+        when(aaiProducerTask.execute(state.dmaapModel)).thenReturn(Mono.just(DMAAP_MODEL));
+
+        sut.scheduleKafkaPrhEventTask();
+        verifyThatPnfModelWasNotSentDmaapPnfReadyTopic();
+    }
+
+    @Test
+    void queryAAiForPNFOnPRHExceptionOnJSONExceptionTest() throws PrhTaskException, JSONException {
+        ScheduledTasksWithCommit.State state = new ScheduledTasksWithCommit.State(DMAAP_MODEL, true);
+        MessageRouterPublishResponse messageRouterPublishResponse = new MessageRouterPublishResponse() {
+            @Override
+            public @Nullable String failReason() {
+                return null;
+            }
+        };
+        when(kafkaConsumerTask.execute()).thenThrow(new JSONException("json format exception"));
+
+        sut.scheduleKafkaPrhEventTask();
+
+        verifyIfLogicalLinkWasNotCreated();
+        verifyThatPnfModelWasNotSentDmaapPnfReadyTopic();
+    }
+
+    private void verifyThatPnfModelWasNotSentDmaapPnfReadyTopic() throws PrhTaskException {
+        verify(readyPublisher, never()).execute(DMAAP_MODEL);
+    }
+
+    private void verifyThatPnfModelWasNotSentDmaapPnfUpdateTopic() throws PrhTaskException {
+        verify(updatePublisher, never()).execute(DMAAP_MODEL);
+    }
+
+    private void verifyThatPnfUpdateWasNotSentToAai() throws PrhTaskException {
+        verify(aaiProducerTask, never()).execute(DMAAP_MODEL);
+    }
+
+    private void verifyIfLogicalLinkWasNotCreated(){
+        verify(bbsActionsTask, never()).execute(DMAAP_MODEL);
+    }
+}
+
index fa7f11c..85ab663 100644 (file)
@@ -2,7 +2,9 @@ spring:
   profiles:
     active: prod
 
+
+
 logging:
   level:
     org.onap.dcaegen2.services.prh: debug
-    org.onap.dcaegen2.services.sdk: debug
\ No newline at end of file
+    org.onap.dcaegen2.services.sdk: debug
index a3ffd40..20dfb28 100644 (file)
@@ -5,6 +5,7 @@
   ~ ================================================================================
   ~ Copyright (C) 2018-2022 NOKIA Intellectual Property. All rights reserved.
   ~ Copyright (C) 2021 Samsung electronics. All rights reserved.
+  ~ Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
   ~ ================================================================================
   ~ Licensed under the Apache License, Version 2.0 (the "License");
   ~ you may not use this file except in compliance with the License.
@@ -27,7 +28,7 @@
   <parent>
     <groupId>org.onap.dcaegen2.services</groupId>
     <artifactId>prh</artifactId>
-    <version>1.8.1-SNAPSHOT</version>
+    <version>1.9.0-SNAPSHOT</version>
   </parent>
 
   <groupId>org.onap.dcaegen2.services.prh</groupId>
index cae6385..258cb63 100644 (file)
@@ -3,6 +3,7 @@
  * PNF-REGISTRATION-HANDLER
  * ================================================================================
  * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -189,6 +190,13 @@ public interface AaiPnfResultModel {
     @SerializedName("prov-status")
     String getProvStatus();
 
+    /**
+     * OrchestrationStatus Status of pnf for commit task
+     **/
+    @Nullable
+    @SerializedName("orchestration-status")
+    String getOrchestrationStatus();
+
     /**
      * Nf Role is the role performed by this instance in the network.
      **/
index dfe1ef3..a1653f6 100644 (file)
@@ -1,6 +1,6 @@
 major=1
-minor=8
-patch=1
+minor=9
+patch=0
 base_version=${major}.${minor}.${patch}
 release_version=${base_version}
 snapshot_version=${base_version}-SNAPSHOT