Async request response dmi -> NCMP 86/128686/31
authorJosephKeenan <joseph.keenan@est.tech>
Mon, 23 May 2022 14:43:05 +0000 (15:43 +0100)
committerJosephKeenan <joseph.keenan@est.tech>
Wed, 1 Jun 2022 15:56:50 +0000 (16:56 +0100)
-Added Async for passthrough running and operational
-Build will fail until cps is merged https://gerrit.onap.org/r/c/cps/+/128685

Issue-ID: CPS-830
Change-Id: Iedbfab109f5cd777a5be8eed7414758d0f5ec05c
Signed-off-by: JosephKeenan <joseph.keenan@est.tech>
Signed-off-by: ToineSiebelink <toine.siebelink@est.tech>
Signed-off-by: JosephKeenan <joseph.keenan@est.tech>
18 files changed:
docs/api/swagger/openapi.yaml
openapi/components.yml
pom.xml
src/main/java/org/onap/cps/ncmp/dmi/exception/HttpClientRequestException.java [moved from src/main/java/org/onap/cps/ncmp/dmi/exception/ResourceDataNotFound.java with 64% similarity]
src/main/java/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutor.java [new file with mode: 0644]
src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventCreator.java [new file with mode: 0644]
src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventProducer.java [moved from src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherService.java with 53% similarity]
src/main/java/org/onap/cps/ncmp/dmi/rest/controller/DmiRestController.java
src/main/java/org/onap/cps/ncmp/dmi/service/DmiServiceImpl.java
src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisher.java [deleted file]
src/main/java/org/onap/cps/ncmp/dmi/service/client/SdncRestconfClient.java
src/main/resources/application.yml
src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy [new file with mode: 0644]
src/test/groovy/org/onap/cps/ncmp/dmi/rest/controller/DmiRestControllerSpec.groovy
src/test/groovy/org/onap/cps/ncmp/dmi/service/DmiServiceImplSpec.groovy
src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherServiceSpec.groovy [deleted file]
src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherSpec.groovy [deleted file]
src/test/resources/application.yml

index 9fd83e0..a3b07c2 100644 (file)
@@ -407,7 +407,7 @@ components:
           $ref: '#/components/schemas/cmHandleProperties'
         requestId:
           type: string
-          example: uuid-random-id-eg
+          example: 3a9ce55c-e365-4dc9-8da3-a06f07cbc6d7
     ModuleSet_schemas:
       type: object
       properties:
index fd9e2ff..212ddf0 100644 (file)
@@ -1,5 +1,5 @@
 #  ============LICENSE_START=======================================================
-#  Copyright (C) 2021 Nordix Foundation
+#  Copyright (C) 2021-2022 Nordix Foundation
 #  Modifications Copyright (C) 2022 Bell Canada
 #  ================================================================================
 #  Licensed under the Apache License, Version 2.0 (the "License");
@@ -118,7 +118,7 @@ components:
           $ref: '#/components/schemas/cmHandleProperties'
         requestId:
           type: string
-          example: uuid-random-id-eg
+          example: 3a9ce55c-e365-4dc9-8da3-a06f07cbc6d7
 
     cmHandleProperties:
       type: object
@@ -199,7 +199,7 @@ components:
           value:
             options: (key1=10,key2=value2,key3=val31,val32)
     topicParamInQuery:
-      name: topicParamInQuery
+      name: topic
       in: query
       description: valid topic name passed from client(NCMP).
       required: false
@@ -208,4 +208,4 @@ components:
       allowReserved: true
       examples:
         sample1:
-          value: ncmp-async-m2m
\ No newline at end of file
+          value: my-topic-name
diff --git a/pom.xml b/pom.xml
index d18acff..c5bd106 100644 (file)
--- a/pom.xml
+++ b/pom.xml
     </properties>
     <dependencyManagement>
         <dependencies>
+            <dependency>
+                <groupId>com.google.code.gson</groupId>
+                <artifactId>gson</artifactId>
+                <version>2.8.9</version>
+            </dependency>
+            <dependency>
+                <groupId>io.swagger</groupId>
+                <artifactId>swagger-annotations</artifactId>
+                <version>1.6.2</version>
+            </dependency>
+            <dependency>
+                <groupId>net.logstash.logback</groupId>
+                <artifactId>logstash-logback-encoder</artifactId>
+                <version>7.0.1</version>
+            </dependency>
             <dependency>
                 <groupId>org.springframework.boot</groupId>
                 <artifactId>spring-boot-dependencies</artifactId>
@@ -64,9 +79,9 @@
                 <scope>import</scope>
             </dependency>
             <dependency>
-                <groupId>com.google.code.gson</groupId>
-                <artifactId>gson</artifactId>
-                <version>2.8.9</version>
+                <groupId>org.springdoc</groupId>
+                <artifactId>springdoc-openapi-ui</artifactId>
+                <version>1.5.9</version>
             </dependency>
             <dependency>
                 <groupId>org.springframework.cloud</groupId>
                 <type>pom</type>
                 <scope>import</scope>
             </dependency>
-            <dependency>
-                <groupId>net.logstash.logback</groupId>
-                <artifactId>logstash-logback-encoder</artifactId>
-                <version>7.0.1</version>
-            </dependency>
             <dependency>
                 <groupId>org.codehaus.janino</groupId>
                 <artifactId>janino</artifactId>
                 <version>3.1.7</version>
             </dependency>
+            <dependency>
+                <groupId>org.onap.cps</groupId>
+                <artifactId>cps-ncmp-events</artifactId>
+                <version>3.1.0-SNAPSHOT</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
     <dependencies>
         <dependency>
-            <groupId>org.springframework.boot</groupId>
-            <artifactId>spring-boot-starter-web</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.springframework.boot</groupId>
-            <artifactId>spring-boot-starter-validation</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.codehaus.groovy</groupId>
-            <artifactId>groovy</artifactId>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
         </dependency>
         <dependency>
             <groupId>com.jayway.jsonpath</groupId>
             <artifactId>json-path</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.springframework.boot</groupId>
-            <artifactId>spring-boot-starter-test</artifactId>
-            <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.junit.vintage</groupId>
-                    <artifactId>junit-vintage-engine</artifactId>
-                </exclusion>
-            </exclusions>
+            <groupId>io.micrometer</groupId>
+            <artifactId>micrometer-registry-prometheus</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.springdoc</groupId>
-            <artifactId>springdoc-openapi-ui</artifactId>
-            <version>1.5.9</version>
+            <groupId>io.swagger</groupId>
+            <artifactId>swagger-annotations</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.spockframework</groupId>
-            <artifactId>spock-core</artifactId>
-            <scope>test</scope>
+            <groupId>net.logstash.logback</groupId>
+            <artifactId>logstash-logback-encoder</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.spockframework</groupId>
-            <artifactId>spock-spring</artifactId>
-            <scope>test</scope>
+            <groupId>net.minidev</groupId>
+            <artifactId>json-smart</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.springframework.security</groupId>
-            <artifactId>spring-security-test</artifactId>
-            <scope>test</scope>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>${httpclient.version}</version>
         </dependency>
         <dependency>
-            <groupId>io.swagger</groupId>
-            <artifactId>swagger-annotations</artifactId>
-            <version>1.6.2</version>
+            <groupId>org.codehaus.groovy</groupId>
+            <artifactId>groovy</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.springframework.boot</groupId>
-            <artifactId>spring-boot-starter-security</artifactId>
+            <groupId>org.codehaus.janino</groupId>
+            <artifactId>janino</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.springframework.boot</groupId>
-            <artifactId>spring-boot-starter-actuator</artifactId>
+            <groupId>org.onap.cps</groupId>
+            <artifactId>cps-ncmp-events</artifactId>
         </dependency>
         <dependency>
             <groupId>org.projectlombok</groupId>
             <artifactId>lombok</artifactId>
         </dependency>
         <dependency>
-            <groupId>io.micrometer</groupId>
-            <artifactId>micrometer-registry-prometheus</artifactId>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
         </dependency>
         <dependency>
-            <groupId>net.minidev</groupId>
-            <artifactId>json-smart</artifactId>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-validation</artifactId>
         </dependency>
         <dependency>
-            <groupId>com.google.code.gson</groupId>
-            <artifactId>gson</artifactId>
+            <groupId>org.springdoc</groupId>
+            <artifactId>springdoc-openapi-ui</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.apache.httpcomponents</groupId>
-            <artifactId>httpclient</artifactId>
-            <version>${httpclient.version}</version>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-security</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-actuator</artifactId>
         </dependency>
         <dependency>
             <groupId>org.springframework.kafka</groupId>
             <artifactId>spring-kafka</artifactId>
         </dependency>
+        <!--  T E S T - D E P E N D E N C I E S -->
         <dependency>
-            <groupId>org.springframework.kafka</groupId>
-            <artifactId>spring-kafka-test</artifactId>
+            <groupId>org.spockframework</groupId>
+            <artifactId>spock-core</artifactId>
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>org.testcontainers</groupId>
-            <artifactId>spock</artifactId>
+            <groupId>org.spockframework</groupId>
+            <artifactId>spock-spring</artifactId>
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>org.testcontainers</groupId>
-            <artifactId>kafka</artifactId>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
             <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.junit.vintage</groupId>
+                    <artifactId>junit-vintage-engine</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
-            <groupId>org.springframework.cloud</groupId>
-            <artifactId>spring-cloud-starter-sleuth</artifactId>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka-test</artifactId>
+            <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>net.logstash.logback</groupId>
-            <artifactId>logstash-logback-encoder</artifactId>
+            <groupId>org.springframework.security</groupId>
+            <artifactId>spring-security-test</artifactId>
+            <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>org.codehaus.janino</groupId>
-            <artifactId>janino</artifactId>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>spock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>kafka</artifactId>
+            <scope>test</scope>
         </dependency>
     </dependencies>
     <build>
@@ -1,6 +1,6 @@
 /*
  *  ============LICENSE_START=======================================================
- *  Copyright (C) 2021 Nordix Foundation
+ *  Copyright (C) 2021-2022 Nordix Foundation
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
 
 package org.onap.cps.ncmp.dmi.exception;
 
-public class ResourceDataNotFound extends DmiException {
+import lombok.Getter;
+import org.springframework.http.HttpStatus;
+
+@Getter
+public class HttpClientRequestException extends DmiException {
 
     private static final long serialVersionUID = 881438585188332404L;
 
-    private static final String ERROR_MESSAGE = "Resource data not found for the given cmHandles: ";
+    private final HttpStatus httpStatus;
 
     /**
      * Constructor.
      *
      * @param cmHandle cmHandle identifier
-     * @param details the error details
+     * @param details    response body from the client available as details
+     * @param httpStatus http status from the client
      */
-    public ResourceDataNotFound(final String cmHandle, final String details) {
-        super(ERROR_MESSAGE + cmHandle, details);
+    public HttpClientRequestException(final String cmHandle, final String details, final HttpStatus httpStatus) {
+        super("Resource data request failed for CM Handle: " + cmHandle, details);
+        this.httpStatus = httpStatus;
     }
 }
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutor.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutor.java
new file mode 100644 (file)
index 0000000..7189f6c
--- /dev/null
@@ -0,0 +1,122 @@
+/*
+ *  ============LICENSE_START=======================================================
+ *  Copyright (C) 2022 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.notifications.async;
+
+import com.google.gson.JsonObject;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.dmi.exception.HttpClientRequestException;
+import org.onap.cps.ncmp.dmi.model.DataAccessRequest;
+import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent;
+import org.springframework.http.HttpStatus;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class AsyncTaskExecutor {
+
+    private final DmiAsyncRequestResponseEventProducer dmiAsyncRequestResponseEventProducer;
+
+    private static final DmiAsyncRequestResponseEventCreator dmiAsyncRequestResponseEventCreator =
+        new DmiAsyncRequestResponseEventCreator();
+
+    private static final Map<DataAccessRequest.OperationEnum, HttpStatus> operationToHttpStatusMap = new HashMap<>(6);
+
+    static {
+        operationToHttpStatusMap.put(null, HttpStatus.OK);
+        operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.READ, HttpStatus.OK);
+        operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.CREATE, HttpStatus.CREATED);
+        operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.PATCH, HttpStatus.OK);
+        operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.UPDATE, HttpStatus.OK);
+        operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.DELETE, HttpStatus.NO_CONTENT);
+    }
+
+    /**
+     * Execute task asynchronously and publish response to supplied topic.
+     *
+     * @param taskSupplier          functional method is get() task need to executed asynchronously
+     * @param topicName             topic name where message need to be published
+     * @param requestId             unique requestId for async request
+     * @param operation             the operation performed
+     * @param timeOutInMilliSeconds task timeout in milliseconds
+     */
+    public void executeAsyncTask(final Supplier<String> taskSupplier,
+                                 final String topicName,
+                                 final String requestId,
+                                 final DataAccessRequest.OperationEnum operation,
+                                 final int timeOutInMilliSeconds) {
+        CompletableFuture.supplyAsync(taskSupplier::get)
+            .orTimeout(timeOutInMilliSeconds, TimeUnit.MILLISECONDS)
+            .whenCompleteAsync((resourceDataAsJson, throwable) -> {
+                if (throwable == null) {
+                    final String status = operationToHttpStatusMap.get(operation).getReasonPhrase();
+                    final String code = String.valueOf(operationToHttpStatusMap.get(operation).value());
+                    publishAsyncEvent(topicName, requestId, resourceDataAsJson, status, code);
+                } else {
+                    log.error("Error occurred with async request {}", throwable.getMessage());
+                    publishAsyncFailureEvent(topicName, requestId, operation, throwable);
+                }
+            });
+        log.info("Async task completed.");
+    }
+
+    private void publishAsyncEvent(final String topicName,
+                                   final String requestId,
+                                   final String resourceDataAsJson,
+                                   final String status,
+                                   final String code) {
+        final DmiAsyncRequestResponseEvent cpsAsyncRequestResponseEvent =
+            dmiAsyncRequestResponseEventCreator.createEvent(resourceDataAsJson, topicName, requestId, status, code);
+
+        dmiAsyncRequestResponseEventProducer.sendMessage(requestId, cpsAsyncRequestResponseEvent);
+    }
+
+    protected void publishAsyncFailureEvent(final String topicName,
+                                            final String requestId,
+                                            final DataAccessRequest.OperationEnum operation,
+                                            final Throwable throwable) {
+        HttpStatus httpStatus = HttpStatus.INTERNAL_SERVER_ERROR;
+
+        if (throwable instanceof HttpClientRequestException) {
+            final HttpClientRequestException httpClientRequestException = (HttpClientRequestException) throwable;
+            httpStatus = httpClientRequestException.getHttpStatus();
+        }
+
+        final JsonObject errorDetails = new JsonObject();
+        errorDetails.addProperty("errorDetails", throwable.getMessage());
+        publishAsyncEvent(
+            topicName,
+            requestId,
+            errorDetails.toString(),
+            httpStatus.getReasonPhrase(),
+            String.valueOf(httpStatus.value())
+        );
+    }
+}
+
+
+
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventCreator.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventCreator.java
new file mode 100644 (file)
index 0000000..de1fc95
--- /dev/null
@@ -0,0 +1,89 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.notifications.async;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.UUID;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.dmi.Application;
+import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent;
+import org.onap.cps.ncmp.event.model.EventContent;
+
+/**
+ * Helper to create DmiAsyncRequestResponseEvent.
+ */
+@Slf4j
+public class DmiAsyncRequestResponseEventCreator {
+
+    private static final DateTimeFormatter dateTimeFormatter
+        = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    /**
+     * Create an event.
+     *
+     * @param resourceDataAsJson the resource data as json
+     * @param topicParamInQuery the topic to send response to
+     * @param requestId the request id
+     * @param status the status of the request
+     * @param code the code of the response
+     *
+     * @return DmiAsyncRequestResponseEvent
+     */
+    public DmiAsyncRequestResponseEvent createEvent(final String resourceDataAsJson,
+                                                    final String topicParamInQuery,
+                                                    final String requestId,
+                                                    final String status,
+                                                    final String code) {
+        final DmiAsyncRequestResponseEvent dmiAsyncRequestResponseEvent = new DmiAsyncRequestResponseEvent();
+
+        dmiAsyncRequestResponseEvent.setEventId(UUID.randomUUID().toString());
+        dmiAsyncRequestResponseEvent.setEventCorrelationId(requestId);
+        dmiAsyncRequestResponseEvent.setEventType(DmiAsyncRequestResponseEvent.class.getName());
+        dmiAsyncRequestResponseEvent.setEventSchema("urn:cps:" + DmiAsyncRequestResponseEvent.class.getName() + ":v1");
+        dmiAsyncRequestResponseEvent.setEventSource(Application.class.getPackageName());
+        dmiAsyncRequestResponseEvent.setEventTarget(topicParamInQuery);
+        dmiAsyncRequestResponseEvent.setEventTime(ZonedDateTime.now().format(dateTimeFormatter));
+        dmiAsyncRequestResponseEvent.setEventContent(getEventContent(resourceDataAsJson, status, code));
+
+        return dmiAsyncRequestResponseEvent;
+    }
+
+    @SneakyThrows
+    private EventContent getEventContent(final String resourceDataAsJson, final String status, final String code) {
+        final EventContent eventContent = new EventContent();
+
+        eventContent.setResponseDataSchema("urn:cps:" + DmiAsyncRequestResponseEvent.class.getName() + ":v1");
+        eventContent.setResponseStatus(status);
+        eventContent.setResponseCode(code);
+
+        eventContent.setAdditionalProperty("response-data",
+            objectMapper.readValue(resourceDataAsJson, HashMap.class));
+
+        return eventContent;
+    }
+
+}
  * ============LICENSE_END=========================================================
  */
 
-package org.onap.cps.ncmp.dmi.service;
+package org.onap.cps.ncmp.dmi.notifications.async;
 
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
+import lombok.RequiredArgsConstructor;
+import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.stereotype.Service;
 
-@Slf4j
 @Service
-@AllArgsConstructor
-public class NcmpKafkaPublisherService {
+@RequiredArgsConstructor
+public class DmiAsyncRequestResponseEventProducer {
 
-    private final NcmpKafkaPublisher ncmpKafkaPublisher;
+    private final KafkaTemplate<String, DmiAsyncRequestResponseEvent> kafkaTemplate;
+
+    @Value("${app.ncmp.async.topic}")
+    private String dmiNcmpTopic;
 
     /**
-     * publish the message to NCMP.
+     * Sends message to the configured topic with a message key.
      *
-     * @param messageKey message key
-     * @param message    message payload
+     * @param requestId the request id
+     * @param dmiAsyncRequestResponseEvent the event to publish
      */
-    public void publishToNcmp(final String messageKey, final Object message) {
-        log.debug("Publishing message : {} to NCMP with message-key : {}", message, messageKey);
-        ncmpKafkaPublisher.sendMessage(messageKey, message);
+    public void sendMessage(final String requestId, final DmiAsyncRequestResponseEvent dmiAsyncRequestResponseEvent) {
+        kafkaTemplate.send(dmiNcmpTopic, requestId, dmiAsyncRequestResponseEvent);
     }
 }
index 4dbe852..bdd1fff 100644 (file)
@@ -37,11 +37,12 @@ import org.onap.cps.ncmp.dmi.model.ModuleReferencesRequest;
 import org.onap.cps.ncmp.dmi.model.ModuleResourcesReadRequest;
 import org.onap.cps.ncmp.dmi.model.ModuleSet;
 import org.onap.cps.ncmp.dmi.model.YangResources;
+import org.onap.cps.ncmp.dmi.notifications.async.AsyncTaskExecutor;
 import org.onap.cps.ncmp.dmi.rest.api.DmiPluginApi;
 import org.onap.cps.ncmp.dmi.rest.api.DmiPluginInternalApi;
 import org.onap.cps.ncmp.dmi.service.DmiService;
-import org.onap.cps.ncmp.dmi.service.NcmpKafkaPublisherService;
 import org.onap.cps.ncmp.dmi.service.model.ModuleReference;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.RequestMapping;
@@ -54,13 +55,13 @@ import org.springframework.web.bind.annotation.RestController;
 public class DmiRestController implements DmiPluginApi, DmiPluginInternalApi {
 
     private final DmiService dmiService;
-
     private final ObjectMapper objectMapper;
-
-    private final NcmpKafkaPublisherService ncmpKafkaPublisherService;
-
+    private final AsyncTaskExecutor asyncTaskExecutor;
     private static final Map<OperationEnum, HttpStatus> operationToHttpStatusMap = new HashMap<>(6);
 
+    @Value("${notification.async.executor.time-out-value-in-ms:2000}")
+    private int timeOutInMillis;
+
     static {
         operationToHttpStatusMap.put(null, HttpStatus.OK);
         operationToHttpStatusMap.put(OperationEnum.READ, HttpStatus.OK);
@@ -70,10 +71,9 @@ public class DmiRestController implements DmiPluginApi, DmiPluginInternalApi {
         operationToHttpStatusMap.put(OperationEnum.DELETE, HttpStatus.NO_CONTENT);
     }
 
-
     @Override
     public ResponseEntity<ModuleSet> getModuleReferences(final String cmHandle,
-                                                           final @Valid ModuleReferencesRequest body) {
+                                                         final @Valid ModuleReferencesRequest body) {
         // For onap-dmi-plugin we don't need cmHandleProperties, so DataAccessReadRequest is not used.
         final ModuleSet moduleSet = dmiService.getModulesForCmHandle(cmHandle);
         return ResponseEntity.ok(moduleSet);
@@ -104,66 +104,119 @@ public class DmiRestController implements DmiPluginApi, DmiPluginInternalApi {
     }
 
     /**
-     * This method fetches the resource for given cm handle using pass through operational. It filters the response on
-     * the basis of options query parameters and returns response. Does not support write operations.
+     * This method fetches the resource for given cm handle using pass through operational datastore. It filters the
+     * response on the basis of options query parameters and returns response. Does not support write operations.
      *
      * @param resourceIdentifier    resource identifier to fetch data
      * @param cmHandle              cm handle identifier
      * @param dataAccessRequest     data Access Request
      * @param optionsParamInQuery   options query parameter
-     * @param topicParamInQuery     optional topic parameter
+     * @param topicParamInQuery     topic name for (triggering) async responses
      * @return {@code ResponseEntity} response entity
      */
     @Override
     public ResponseEntity<Object> dataAccessPassthroughOperational(final String resourceIdentifier,
                                                                    final String cmHandle,
-                                                                   final @Valid DataAccessRequest
-                                                                                dataAccessRequest,
+                                                                   final @Valid DataAccessRequest dataAccessRequest,
                                                                    final @Valid String optionsParamInQuery,
                                                                    final String topicParamInQuery) {
         if (isReadOperation(dataAccessRequest)) {
-            final String resourceDataAsJson = dmiService.getResourceData(cmHandle,
-                resourceIdentifier,
-                optionsParamInQuery,
-                DmiService.RESTCONF_CONTENT_PASSTHROUGH_OPERATIONAL_QUERY_PARAM);
+            if (hasTopic(topicParamInQuery)) {
+                return handleAsyncRequest(resourceIdentifier, cmHandle, dataAccessRequest, optionsParamInQuery,
+                    topicParamInQuery);
+            }
+
+            final String resourceDataAsJson = dmiService.getResourceData(cmHandle, resourceIdentifier,
+                optionsParamInQuery, DmiService.RESTCONF_CONTENT_PASSTHROUGH_OPERATIONAL_QUERY_PARAM);
             return ResponseEntity.ok(resourceDataAsJson);
         }
         return new ResponseEntity<>(HttpStatus.BAD_REQUEST);
     }
 
+    /**
+     * This method fetches the resource for given cm handle using pass through running datastore. It filters the
+     * response on the basis of options query parameters and returns response. It supports both read and write
+     * operation.
+     *
+     * @param resourceIdentifier    resource identifier to fetch data
+     * @param cmHandle              cm handle identifier
+     * @param dataAccessRequest     data Access Request
+     * @param optionsParamInQuery   options query parameter
+     * @param topicParamInQuery     topic name for (triggering) async responses
+     * @return {@code ResponseEntity} response entity
+     */
     @Override
     public ResponseEntity<Object> dataAccessPassthroughRunning(final String resourceIdentifier,
                                                                final String cmHandle,
-                                                               final @Valid DataAccessRequest
-                                                                       dataAccessRequest,
+                                                               final @Valid DataAccessRequest dataAccessRequest,
                                                                final @Valid String optionsParamInQuery,
                                                                final String topicParamInQuery) {
-        final String sdncResponse;
-        if (isReadOperation(dataAccessRequest)) {
-            sdncResponse = dmiService.getResourceData(cmHandle,
-                resourceIdentifier,
-                optionsParamInQuery,
-                DmiService.RESTCONF_CONTENT_PASSTHROUGH_RUNNING_QUERY_PARAM);
-        } else {
-            sdncResponse = dmiService.writeData(
+        if (hasTopic(topicParamInQuery)) {
+            asyncTaskExecutor.executeAsyncTask(() ->
+                    getSdncResponseForPassThroughRunning(
+                        resourceIdentifier,
+                        cmHandle,
+                        dataAccessRequest,
+                        optionsParamInQuery),
+                topicParamInQuery,
+                dataAccessRequest.getRequestId(),
                 dataAccessRequest.getOperation(),
-                cmHandle,
-                resourceIdentifier,
-                dataAccessRequest.getDataType(),
-                dataAccessRequest.getData());
+                timeOutInMillis
+            );
+            return new ResponseEntity<>(HttpStatus.NO_CONTENT);
         }
+
+        final String sdncResponse =
+            getSdncResponseForPassThroughRunning(resourceIdentifier, cmHandle, dataAccessRequest, optionsParamInQuery);
         return new ResponseEntity<>(sdncResponse, operationToHttpStatusMap.get(dataAccessRequest.getOperation()));
     }
 
+    private String getSdncResponseForPassThroughRunning(final String resourceIdentifier,
+                                                        final String cmHandle,
+                                                        final DataAccessRequest dataAccessRequest,
+                                                        final String optionsParamInQuery) {
+        if (isReadOperation(dataAccessRequest)) {
+            return dmiService.getResourceData(cmHandle, resourceIdentifier, optionsParamInQuery,
+                DmiService.RESTCONF_CONTENT_PASSTHROUGH_RUNNING_QUERY_PARAM);
+        }
+
+        return dmiService.writeData(dataAccessRequest.getOperation(), cmHandle, resourceIdentifier,
+            dataAccessRequest.getDataType(), dataAccessRequest.getData());
+    }
+
     private boolean isReadOperation(final @Valid DataAccessRequest dataAccessRequest) {
         return dataAccessRequest.getOperation() == null
             || dataAccessRequest.getOperation().equals(DataAccessRequest.OperationEnum.READ);
     }
 
     private List<ModuleReference> convertRestObjectToJavaApiObject(
-            final ModuleResourcesReadRequest moduleResourcesReadRequest) {
+        final ModuleResourcesReadRequest moduleResourcesReadRequest) {
         return objectMapper
             .convertValue(moduleResourcesReadRequest.getData().getModules(),
-                          new TypeReference<List<ModuleReference>>() {});
+                new TypeReference<List<ModuleReference>>() {});
+    }
+
+    private boolean hasTopic(final String topicParamInQuery) {
+        return !(topicParamInQuery == null || topicParamInQuery.isBlank());
     }
+
+    private ResponseEntity<Object> handleAsyncRequest(final String resourceIdentifier,
+                                                      final String cmHandle,
+                                                      final DataAccessRequest dataAccessRequest,
+                                                      final String optionsParamInQuery,
+                                                      final String topicParamInQuery) {
+        asyncTaskExecutor.executeAsyncTask(() ->
+                dmiService.getResourceData(
+                    cmHandle,
+                    resourceIdentifier,
+                    optionsParamInQuery,
+                    DmiService.RESTCONF_CONTENT_PASSTHROUGH_OPERATIONAL_QUERY_PARAM),
+            topicParamInQuery,
+            dataAccessRequest.getRequestId(),
+            dataAccessRequest.getOperation(),
+            timeOutInMillis
+        );
+        return new ResponseEntity<>(HttpStatus.NO_CONTENT);
+    }
+
 }
index 22d4744..753d16f 100644 (file)
@@ -33,9 +33,9 @@ import lombok.extern.slf4j.Slf4j;
 import org.onap.cps.ncmp.dmi.config.DmiPluginConfig.DmiPluginProperties;
 import org.onap.cps.ncmp.dmi.exception.CmHandleRegistrationException;
 import org.onap.cps.ncmp.dmi.exception.DmiException;
+import org.onap.cps.ncmp.dmi.exception.HttpClientRequestException;
 import org.onap.cps.ncmp.dmi.exception.ModuleResourceNotFoundException;
 import org.onap.cps.ncmp.dmi.exception.ModulesNotFoundException;
-import org.onap.cps.ncmp.dmi.exception.ResourceDataNotFound;
 import org.onap.cps.ncmp.dmi.model.DataAccessRequest;
 import org.onap.cps.ncmp.dmi.model.ModuleSet;
 import org.onap.cps.ncmp.dmi.model.ModuleSetSchemas;
@@ -59,8 +59,6 @@ public class DmiServiceImpl implements DmiService {
     private NcmpRestClient ncmpRestClient;
     private ObjectMapper objectMapper;
     private DmiPluginProperties dmiPluginProperties;
-    private static final String RESPONSE_CODE = "response code : ";
-    private static final String MESSAGE = " message : ";
 
     /**
      * Constructor.
@@ -107,8 +105,8 @@ public class DmiServiceImpl implements DmiService {
                     "SDNC did not return a module resource for the given cmHandle.");
             } else {
                 log.error("Error occurred when getting module resources from SDNC for the given cmHandle {}", cmHandle);
-                throw new DmiException(cmHandle,
-                    RESPONSE_CODE + responseEntity.getStatusCode() + MESSAGE + responseEntity.getBody());
+                throw new HttpClientRequestException(
+                    cmHandle, responseEntity.getBody(), responseEntity.getStatusCode());
             }
         }
         return yangResources;
@@ -166,20 +164,14 @@ public class DmiServiceImpl implements DmiService {
                             final String dataType, final String data) {
         final ResponseEntity<String> responseEntity =
             sdncOperations.writeData(operation, cmHandle, resourceIdentifier, dataType, data);
-        if (responseEntity.getStatusCode().is2xxSuccessful()) {
-            return responseEntity.getBody();
-        } else {
-            throw new DmiException(cmHandle,
-                RESPONSE_CODE + responseEntity.getStatusCode() + MESSAGE + responseEntity.getBody());
-        }
+        return prepareAndSendResponse(responseEntity, cmHandle);
     }
 
     private String prepareAndSendResponse(final ResponseEntity<String> responseEntity, final String cmHandle) {
-        if (responseEntity.getStatusCode() == HttpStatus.OK) {
+        if (responseEntity.getStatusCode().is2xxSuccessful()) {
             return responseEntity.getBody();
         } else {
-            throw new ResourceDataNotFound(cmHandle,
-                RESPONSE_CODE + responseEntity.getStatusCode() + MESSAGE + responseEntity.getBody());
+            throw new HttpClientRequestException(cmHandle, responseEntity.getBody(), responseEntity.getStatusCode());
         }
     }
 
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisher.java b/src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisher.java
deleted file mode 100644 (file)
index 373a09d..0000000
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2022 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.dmi.service;
-
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.support.SendResult;
-import org.springframework.stereotype.Component;
-import org.springframework.util.concurrent.ListenableFuture;
-import org.springframework.util.concurrent.ListenableFutureCallback;
-
-@Component
-@Slf4j
-public class NcmpKafkaPublisher {
-
-    private final KafkaTemplate<String, Object> kafkaTemplate;
-    private final String topicName;
-
-    /**
-     * KafkaTemplate and Topic name.
-     *
-     * @param kafkaTemplate kafka template
-     * @param topicName     topic name
-     */
-    @Autowired
-    public NcmpKafkaPublisher(final KafkaTemplate<String, Object> kafkaTemplate,
-            @Value("${app.ncmp.async-m2m.topic}") final String topicName) {
-        this.kafkaTemplate = kafkaTemplate;
-        this.topicName = topicName;
-    }
-
-    /**
-     * Sends message to the configured topic with a message key.
-     *
-     * @param messageKey message key
-     * @param payload    message payload
-     */
-    public void sendMessage(final String messageKey, final Object payload) {
-        final ListenableFuture<SendResult<String, Object>> send = kafkaTemplate.send(topicName, messageKey, payload);
-        send.addCallback(new ListenableFutureCallback<>() {
-            @Override
-            public void onFailure(final Throwable ex) {
-                log.warn("Failed to send the messages {}", ex.getMessage());
-            }
-
-            @Override
-            public void onSuccess(final SendResult<String, Object> result) {
-                log.debug("Sent message {}", result.getProducerRecord());
-            }
-        });
-    }
-}
index cf7b459..179707a 100644 (file)
@@ -1,6 +1,6 @@
 /*
  *  ============LICENSE_START=======================================================
- *  Copyright (C) 2021 Nordix Foundation
+ *  Copyright (C) 2021-2022 Nordix Foundation
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -69,9 +69,10 @@ public class SdncRestconfClient {
      * @param httpHeaders HTTP Headers
      * @return response entity
      */
-    public ResponseEntity<String> httpOperationWithJsonData(final HttpMethod httpMethod, final String resourceUrl,
-                                                             final String jsonData,
-                                                             final HttpHeaders httpHeaders) {
+    public ResponseEntity<String> httpOperationWithJsonData(final HttpMethod httpMethod,
+                                                            final String resourceUrl,
+                                                            final String jsonData,
+                                                            final HttpHeaders httpHeaders) {
         final String sdncBaseUrl = sdncProperties.getBaseUrl();
         final String sdncRestconfUrl = sdncBaseUrl.concat(resourceUrl);
         httpHeaders.setBasicAuth(sdncProperties.getAuthUsername(), sdncProperties.getAuthPassword());
index 8be97d2..6ad9d58 100644 (file)
@@ -45,19 +45,24 @@ spring:
     pathmatch:
       matching-strategy: ANT_PATH_MATCHER
   kafka:
-    bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER}
+    bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER:localhost:9092}
     security:
       protocol: PLAINTEXT
     producer:
       key-serializer: org.apache.kafka.common.serialization.StringSerializer
       value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
-      client-id: dmi-plugin
+      client-id: ncmp-dmi-plugin
 
 app:
   ncmp:
-    async-m2m:
+    async:
       topic: ${NCMP_ASYNC_M2M_TOPIC:ncmp-async-m2m}
 
+notification:
+  async:
+    executor:
+      time-out-value-in-ms: 2000
+
 # Actuator
 management:
   server:
@@ -106,4 +111,4 @@ springdoc:
     urlsPrimaryName: query
     urls:
       - name: query
-        url: /api-docs/openapi.yaml
\ No newline at end of file
+        url: /api-docs/openapi.yaml
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy
new file mode 100644 (file)
index 0000000..54c0fe0
--- /dev/null
@@ -0,0 +1,136 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the 'License');
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an 'AS IS' BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.notifications.async
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.apache.kafka.common.serialization.StringSerializer
+import org.onap.cps.ncmp.dmi.exception.HttpClientRequestException
+import org.onap.cps.ncmp.dmi.model.DataAccessRequest
+import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent
+import org.spockframework.spring.SpringBean
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.http.HttpStatus
+import org.springframework.kafka.core.DefaultKafkaProducerFactory
+import org.springframework.kafka.core.KafkaTemplate
+import org.springframework.kafka.support.serializer.JsonSerializer
+import org.springframework.test.annotation.DirtiesContext
+import org.springframework.test.context.DynamicPropertyRegistry
+import org.springframework.test.context.DynamicPropertySource
+import org.testcontainers.containers.KafkaContainer
+import org.testcontainers.spock.Testcontainers
+import org.testcontainers.utility.DockerImageName
+import spock.lang.Specification
+
+import java.time.Duration
+
+@SpringBootTest(classes = [AsyncTaskExecutor, DmiAsyncRequestResponseEventProducer])
+@Testcontainers
+@DirtiesContext
+class AsyncTaskExecutorIntegrationSpec extends Specification {
+
+    static kafkaTestContainer = new KafkaContainer(
+        DockerImageName.parse('confluentinc/cp-kafka:6.2.1')
+    )
+
+    static {
+        Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::stop))
+    }
+
+    def setupSpec() {
+        kafkaTestContainer.start()
+    }
+
+    def producerConfigProperties = [
+        'bootstrap.servers' : kafkaTestContainer.getBootstrapServers().split(',')[0],
+        'retries'           : 0,
+        'batch.size'        : 16384,
+        'linger.ms'         : 1,
+        'buffer.memory'     : 33554432,
+        'key.serializer'    : StringSerializer,
+        'value.serializer'  : JsonSerializer
+    ]
+
+    def consumerConfigProperties = [
+        'bootstrap.servers' : kafkaTestContainer.getBootstrapServers().split(',')[0],
+        'key.deserializer'  : StringDeserializer,
+        'value.deserializer': StringDeserializer,
+        'auto.offset.reset' : 'earliest',
+        'group.id'          : 'test'
+    ]
+
+    def kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<Integer, String>(producerConfigProperties))
+
+    @SpringBean
+    DmiAsyncRequestResponseEventProducer cpsAsyncRequestResponseEventProducer =
+        new DmiAsyncRequestResponseEventProducer(kafkaTemplate)
+
+    KafkaConsumer<String, Object> consumer = new KafkaConsumer<>(consumerConfigProperties)
+
+    def spiedObjectMapper = Spy(ObjectMapper)
+
+    def objectUnderTest = new AsyncTaskExecutor(cpsAsyncRequestResponseEventProducer)
+
+    private static final String TEST_TOPIC = 'test-topic'
+
+    def setup() {
+        cpsAsyncRequestResponseEventProducer.dmiNcmpTopic = TEST_TOPIC
+        consumer.subscribe([TEST_TOPIC] as List<String>)
+    }
+
+    def cleanup() {
+        consumer.close()
+    }
+
+    def 'Publish and Subscribe message - success'() {
+        when: 'a successful event is published'
+            objectUnderTest.publishAsyncEvent(TEST_TOPIC, '12345','{}', 'OK', '200')
+        and: 'the topic is polled'
+            def records = consumer.poll(Duration.ofMillis(1500))
+        then: 'the record received is the event sent'
+            def record = records.iterator().next()
+            DmiAsyncRequestResponseEvent event  = spiedObjectMapper.readValue(record.value(), DmiAsyncRequestResponseEvent)
+        and: 'the status & code matches expected'
+            assert event.getEventContent().getResponseStatus() == 'OK'
+            assert event.getEventContent().getResponseCode() == '200'
+    }
+
+    def 'Publish and Subscribe message - failure'() {
+        when: 'a failure event is published'
+            def exception = new HttpClientRequestException('some cm handle', 'Node not found', HttpStatus.INTERNAL_SERVER_ERROR)
+            objectUnderTest.publishAsyncFailureEvent(TEST_TOPIC, '67890', DataAccessRequest.OperationEnum.READ, exception)
+        and: 'the topic is polled'
+            def records = consumer.poll(Duration.ofMillis(1500))
+        then: 'the record received is the event sent'
+            def record = records.iterator().next()
+            DmiAsyncRequestResponseEvent event  = spiedObjectMapper.readValue(record.value(), DmiAsyncRequestResponseEvent)
+        and: 'the status & code matches expected'
+            assert event.getEventContent().getResponseStatus() == 'Internal Server Error'
+            assert event.getEventContent().getResponseCode() == '500'
+    }
+
+    @DynamicPropertySource
+    static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) {
+        dynamicPropertyRegistry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers)
+    }
+
+}
\ No newline at end of file
index 1541f8c..5bfbc40 100644 (file)
 
 package org.onap.cps.ncmp.dmi.rest.controller
 
+
 import org.onap.cps.ncmp.dmi.TestUtils
 import org.onap.cps.ncmp.dmi.exception.DmiException
 import org.onap.cps.ncmp.dmi.exception.ModuleResourceNotFoundException
 import org.onap.cps.ncmp.dmi.exception.ModulesNotFoundException
-import org.onap.cps.ncmp.dmi.service.NcmpKafkaPublisherService
+import org.onap.cps.ncmp.dmi.notifications.async.AsyncTaskExecutor
+import org.onap.cps.ncmp.dmi.notifications.async.DmiAsyncRequestResponseEventProducer
+
 import org.onap.cps.ncmp.dmi.service.model.ModuleReference
 import org.onap.cps.ncmp.dmi.model.ModuleSet
 import org.onap.cps.ncmp.dmi.model.ModuleSetSchemas
@@ -38,6 +41,7 @@ import org.springframework.beans.factory.annotation.Value
 import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest
 import org.springframework.http.HttpStatus
 import org.springframework.http.MediaType
+import org.springframework.kafka.core.KafkaTemplate
 import org.springframework.security.test.context.support.WithMockUser
 import org.springframework.test.web.servlet.MockMvc
 import spock.lang.Specification
@@ -53,7 +57,7 @@ import static org.onap.cps.ncmp.dmi.model.DataAccessRequest.OperationEnum.UPDATE
 import static org.springframework.http.HttpStatus.CREATED
 import static org.springframework.http.HttpStatus.OK
 
-@WebMvcTest(DmiRestController)
+@WebMvcTest(DmiRestController.class)
 @WithMockUser
 class DmiRestControllerSpec extends Specification {
 
@@ -64,7 +68,10 @@ class DmiRestControllerSpec extends Specification {
     DmiService mockDmiService = Mock()
 
     @SpringBean
-    NcmpKafkaPublisherService mockNcmpKafkaPublisherService = Mock()
+    DmiAsyncRequestResponseEventProducer cpsAsyncRequestResponseEventProducer = new DmiAsyncRequestResponseEventProducer(Mock(KafkaTemplate))
+
+    @SpringBean
+    AsyncTaskExecutor asyncTaskExecutor = new AsyncTaskExecutor(cpsAsyncRequestResponseEventProducer)
 
     @Value('${rest.api.dmi-base-path}/v1')
     def basePathV1
@@ -256,6 +263,21 @@ class DmiRestControllerSpec extends Specification {
             response.getContentAsString() == '{some-json}'
     }
 
+    def 'PassThrough Returns OK when topic is used for async'(){
+        given: 'an endpoint'
+            def readPassThroughUrl ="${basePathV1}/ch/some-cmHandle/data/ds/ncmp-datastore:" +
+                resourceIdentifier +
+                '?resourceIdentifier=some-resourceIdentifier&topic=test-topic'
+        when: 'endpoint is invoked'
+            def jsonData = TestUtils.getResourceFileContent('readData.json')
+            def response = mvc.perform(
+                post(readPassThroughUrl).contentType(MediaType.APPLICATION_JSON).content(jsonData)
+            ).andReturn().response
+        then: 'response status is OK'
+            assert response.status == HttpStatus.NO_CONTENT.value()
+        where: 'the following values are used'
+             resourceIdentifier << ['passthrough-operational', 'passthrough-running']
+    }
 
     def 'Get resource data for pass-through running with #scenario value in resource identifier param.'() {
         given: 'Get resource data url'
index e38d5c3..1d87b77 100644 (file)
@@ -29,7 +29,7 @@ import org.onap.cps.ncmp.dmi.exception.CmHandleRegistrationException
 import org.onap.cps.ncmp.dmi.exception.DmiException
 import org.onap.cps.ncmp.dmi.exception.ModuleResourceNotFoundException
 import org.onap.cps.ncmp.dmi.exception.ModulesNotFoundException
-import org.onap.cps.ncmp.dmi.exception.ResourceDataNotFound
+import org.onap.cps.ncmp.dmi.exception.HttpClientRequestException
 import org.onap.cps.ncmp.dmi.service.model.ModuleReference
 import org.onap.cps.ncmp.dmi.model.YangResource
 import org.onap.cps.ncmp.dmi.model.YangResources
@@ -221,7 +221,7 @@ class DmiServiceImplSpec extends Specification {
             objectUnderTest.getResourceData(cmHandle,
                 resourceId, optionsParam, restConfQueryParam)
         then: 'resource data not found'
-            thrown(ResourceDataNotFound.class)
+            thrown(HttpClientRequestException.class)
     }
 
     def 'Get resource data for passthrough running.'() {
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherServiceSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherServiceSpec.groovy
deleted file mode 100644 (file)
index f5bc4ac..0000000
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2022 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.dmi.service
-
-import spock.lang.Specification
-
-class NcmpKafkaPublisherServiceSpec extends Specification {
-
-    def mockNcmpKafkaPublisher = Mock(NcmpKafkaPublisher)
-    def objectUnderTest = new NcmpKafkaPublisherService(mockNcmpKafkaPublisher)
-
-    def 'Message publishing'() {
-        given: 'a sample message with key'
-            def message = 'sample message'
-            def messageKey = 'sample-key'
-        when: 'published'
-            objectUnderTest.publishToNcmp(messageKey, message)
-        then: 'no exception is thrown'
-            noExceptionThrown()
-        and: 'message is published once'
-            1 * mockNcmpKafkaPublisher.sendMessage(messageKey, message)
-    }
-}
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherSpec.groovy
deleted file mode 100644 (file)
index 00c8e6e..0000000
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2022 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.dmi.service
-
-import org.apache.kafka.clients.admin.NewTopic
-import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.common.serialization.StringDeserializer
-import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.beans.factory.annotation.Value
-import org.springframework.boot.test.context.SpringBootTest
-import org.springframework.context.annotation.Bean
-import org.springframework.context.annotation.Configuration
-import org.springframework.kafka.core.KafkaTemplate
-import org.springframework.kafka.support.serializer.JsonDeserializer
-import org.springframework.test.annotation.DirtiesContext
-import org.springframework.test.context.DynamicPropertyRegistry
-import org.springframework.test.context.DynamicPropertySource
-import org.testcontainers.containers.KafkaContainer
-import org.testcontainers.spock.Testcontainers
-import spock.lang.Specification
-
-import java.time.Duration
-
-import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
-import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
-import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG
-import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
-import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
-
-@SpringBootTest
-@Testcontainers
-@DirtiesContext
-class NcmpKafkaPublisherSpec extends Specification {
-
-    static kafkaTestContainer = new KafkaContainer()
-    static {
-        Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::stop))
-    }
-
-    def setupSpec() {
-        kafkaTestContainer.start()
-    }
-
-    @Autowired
-    KafkaTemplate<String, Object> kafkaTemplate
-
-    @Value('${app.ncmp.async-m2m.topic}')
-    String topic
-
-    KafkaConsumer<String, Object> consumer = new KafkaConsumer<>(kafkaConsumerConfig())
-
-    def 'Publish and Subscribe message'() {
-        given: 'a sample messsage and key'
-            def message = 'sample message'
-            def messageKey = 'message-key'
-            def objectUnderTest = new NcmpKafkaPublisher(kafkaTemplate, topic)
-        when: 'a message is published'
-            objectUnderTest.sendMessage(messageKey, message)
-        then: 'a message is consumed'
-            consumer.subscribe([topic] as List<String>)
-            def records = consumer.poll(Duration.ofMillis(1000))
-            assert records.size() == 1
-            assert messageKey == records[0].key
-            assert message == records[0].value
-    }
-
-    def kafkaConsumerConfig() {
-        def configs = [:]
-        configs.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.name)
-        configs.put(VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.name)
-        configs.put(AUTO_OFFSET_RESET_CONFIG, 'earliest')
-        configs.put(BOOTSTRAP_SERVERS_CONFIG, kafkaTestContainer.getBootstrapServers().split(",")[0])
-        configs.put(GROUP_ID_CONFIG, 'test')
-        return configs
-    }
-
-    @DynamicPropertySource
-    static void registerKafkaProperties(DynamicPropertyRegistry registry) {
-        registry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers)
-    }
-}
-
-@Configuration
-class TopicConfig {
-    @Bean
-    NewTopic newTopic() {
-        return new NewTopic("my-topic-name", 1, (short) 1);
-    }
-}
index 0d3784f..9ed37a7 100644 (file)
@@ -47,19 +47,22 @@ dmi:
 spring:
   application:
     name: ncmp-dmi-plugin
+  mvc:
+    pathmatch:
+      matching-strategy: ANT_PATH_MATCHER
   kafka:
-    bootstrap-servers: localhost:9092
+    bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER}
     security:
       protocol: PLAINTEXT
     producer:
       key-serializer: org.apache.kafka.common.serialization.StringSerializer
       value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
-      client-id: dmi-plugin
+      client-id: ncmp-dmi-plugin
 
 app:
   ncmp:
-    async-m2m:
-      topic: my-topic-name
+    async:
+      topic: ${NCMP_ASYNC_M2M_TOPIC:ncmp-async-m2m}
 
 logging:
   format: json