NCMP Kafka Producer Infrastructure 91/128391/4
authormpriyank <priyank.maheshwari@est.tech>
Fri, 8 Apr 2022 09:42:22 +0000 (15:12 +0530)
committermpriyank <priyank.maheshwari@est.tech>
Mon, 11 Apr 2022 14:01:37 +0000 (19:31 +0530)
- Producer configuration and Topic related information.
- Accepted topicParameter and request id from NCMP to process async
- Replacing Springfox and using Springdoc instead

Issue-ID: CPS-829
Change-Id: I369b5ec6c16318220bb218701006918a0bf21419
Signed-off-by: mpriyank <priyank.maheshwari@est.tech>
14 files changed:
docs/api/swagger/openapi.yaml
openapi/components.yml
openapi/openapi.yml
pom.xml
src/main/java/org/onap/cps/ncmp/dmi/config/DmiPluginConfig.java
src/main/java/org/onap/cps/ncmp/dmi/rest/controller/DmiRestController.java
src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisher.java [new file with mode: 0644]
src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherService.java [new file with mode: 0644]
src/main/resources/application.yml
src/test/groovy/org/onap/cps/ncmp/dmi/config/DmiPluginConfigSpec.groovy
src/test/groovy/org/onap/cps/ncmp/dmi/rest/controller/DmiRestControllerSpec.groovy
src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherServiceSpec.groovy [new file with mode: 0644]
src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherSpec.groovy [new file with mode: 0644]
src/test/resources/application.yml

index f41e513..9fd83e0 100644 (file)
@@ -405,6 +405,9 @@ components:
           example: my-data
         cmHandleProperties:
           $ref: '#/components/schemas/cmHandleProperties'
+        requestId:
+          type: string
+          example: uuid-random-id-eg
     ModuleSet_schemas:
       type: object
       properties:
index c317ba7..fd9e2ff 100644 (file)
@@ -116,6 +116,9 @@ components:
           example: my-data
         cmHandleProperties:
           $ref: '#/components/schemas/cmHandleProperties'
+        requestId:
+          type: string
+          example: uuid-random-id-eg
 
     cmHandleProperties:
       type: object
@@ -194,4 +197,15 @@ components:
             options: (key1=value1,key2=value1/value2)
         sample3:
           value:
-            options: (key1=10,key2=value2,key3=val31,val32)
\ No newline at end of file
+            options: (key1=10,key2=value2,key3=val31,val32)
+    topicParamInQuery:
+      name: topicParamInQuery
+      in: query
+      description: valid topic name passed from client(NCMP).
+      required: false
+      schema:
+        type: string
+      allowReserved: true
+      examples:
+        sample1:
+          value: ncmp-async-m2m
\ No newline at end of file
index bae7639..6e0e8ae 100644 (file)
@@ -126,6 +126,7 @@ paths:
         - $ref: 'components.yml#/components/parameters/cmHandleInPath'
         - $ref: 'components.yml#/components/parameters/resourceIdentifierInQuery'
         - $ref: 'components.yml#/components/parameters/optionsParamInQuery'
+        - $ref: 'components.yml#/components/parameters/topicParamInQuery'
       requestBody:
         description: Operational body
         content:
@@ -159,6 +160,7 @@ paths:
         - $ref: 'components.yml#/components/parameters/cmHandleInPath'
         - $ref: 'components.yml#/components/parameters/resourceIdentifierInQuery'
         - $ref: 'components.yml#/components/parameters/optionsParamInQuery'
+        - $ref: 'components.yml#/components/parameters/topicParamInQuery'
       requestBody:
         content:
           application/json:
diff --git a/pom.xml b/pom.xml
index 616a882..3335412 100644 (file)
--- a/pom.xml
+++ b/pom.xml
                 <type>pom</type>
                 <scope>import</scope>
             </dependency>
+            <dependency>
+                <groupId>org.testcontainers</groupId>
+                <artifactId>testcontainers-bom</artifactId>
+                <version>1.15.3</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
         </dependencies>
     </dependencyManagement>
     <dependencies>
             <artifactId>swagger-annotations</artifactId>
             <version>1.6.2</version>
         </dependency>
-        <dependency>
-            <groupId>io.springfox</groupId>
-            <artifactId>springfox-boot-starter</artifactId>
-            <version>3.0.0</version>
-        </dependency>
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-security</artifactId>
             <artifactId>httpclient</artifactId>
             <version>${httpclient.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>spock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>kafka</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <build>
         <resources>
                     <excludes>
                         <exclude>**/IT*.java</exclude>
                     </excludes>
+                    <environmentVariables>
+                        <!--
+                            Disable privileged container usage to cleanup the test containers;
+                            these are removed automatically on jvm termination;
+                            see https://www.testcontainers.org/features/configuration/#disabling-ryuk
+                        -->
+                        <TESTCONTAINERS_RYUK_DISABLED>true</TESTCONTAINERS_RYUK_DISABLED>
+                    </environmentVariables>
                 </configuration>
             </plugin>
             <plugin>
index 31a7811..6106c6a 100644 (file)
@@ -1,6 +1,6 @@
 /*
  *  ============LICENSE_START=======================================================
- *  Copyright (C) 2021 Nordix Foundation
+ *  Copyright (C) 2021-2022 Nordix Foundation
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
 package org.onap.cps.ncmp.dmi.config;
 
 import lombok.Getter;
+import org.springdoc.core.GroupedOpenApi;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.stereotype.Component;
-import springfox.documentation.builders.PathSelectors;
-import springfox.documentation.builders.RequestHandlerSelectors;
-import springfox.documentation.spi.DocumentationType;
-import springfox.documentation.spring.web.plugins.Docket;
 
 
 @Configuration
 public class DmiPluginConfig {
+
     /**
-     * Swagger-ui configuration.
+     * Swagger-ui configuration using springdoc.
      */
-    @Bean("dmi-plugin-docket")
-    public Docket api() {
-        return new Docket(DocumentationType.OAS_30)
-                .groupName("dmi-plugin-docket")
-                .select()
-                .apis(RequestHandlerSelectors.any())
-                .paths(PathSelectors.any())
-                .build();
+    @Bean("dmi-plugin-api")
+    public GroupedOpenApi api() {
+        return GroupedOpenApi.builder().group("dmi-plugin-api")
+                .pathsToMatch("/swagger-ui/**,/swagger-resources/**,/v3/api-docs").build();
     }
 
     @Getter
index 653ebf7..d12b9ee 100644 (file)
@@ -40,6 +40,7 @@ import org.onap.cps.ncmp.dmi.model.YangResources;
 import org.onap.cps.ncmp.dmi.rest.api.DmiPluginApi;
 import org.onap.cps.ncmp.dmi.rest.api.DmiPluginInternalApi;
 import org.onap.cps.ncmp.dmi.service.DmiService;
+import org.onap.cps.ncmp.dmi.service.NcmpKafkaPublisherService;
 import org.onap.cps.ncmp.dmi.service.model.ModuleReference;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
@@ -56,6 +57,8 @@ public class DmiRestController implements DmiPluginApi, DmiPluginInternalApi {
 
     private final ObjectMapper objectMapper;
 
+    private final NcmpKafkaPublisherService ncmpKafkaPublisherService;
+
     private static final Map<OperationEnum, HttpStatus> operationToHttpStatusMap = new HashMap<>(6);
 
     static {
@@ -67,6 +70,7 @@ public class DmiRestController implements DmiPluginApi, DmiPluginInternalApi {
         operationToHttpStatusMap.put(OperationEnum.DELETE, HttpStatus.NO_CONTENT);
     }
 
+
     @Override
     public ResponseEntity<ModuleSet> getModuleReferences(final String cmHandle,
                                                            final @Valid ModuleReferencesRequest body) {
@@ -107,6 +111,7 @@ public class DmiRestController implements DmiPluginApi, DmiPluginInternalApi {
      * @param cmHandle              cm handle identifier
      * @param dataAccessRequest     data Access Request
      * @param optionsParamInQuery   options query parameter
+     * @param topicParamInQuery     optional topic parameter
      * @return {@code ResponseEntity} response entity
      */
     @Override
@@ -114,7 +119,8 @@ public class DmiRestController implements DmiPluginApi, DmiPluginInternalApi {
                                                                    final String cmHandle,
                                                                    final @Valid DataAccessRequest
                                                                                 dataAccessRequest,
-                                                                   final @Valid String optionsParamInQuery) {
+                                                                   final @Valid String optionsParamInQuery,
+                                                                   final String topicParamInQuery) {
         if (isReadOperation(dataAccessRequest)) {
             final String resourceDataAsJson = dmiService.getResourceData(cmHandle,
                 resourceIdentifier,
@@ -130,7 +136,8 @@ public class DmiRestController implements DmiPluginApi, DmiPluginInternalApi {
                                                                final String cmHandle,
                                                                final @Valid DataAccessRequest
                                                                        dataAccessRequest,
-                                                               final @Valid String optionsParamInQuery) {
+                                                               final @Valid String optionsParamInQuery,
+                                                               final String topicParamInQuery) {
         final String sdncResponse;
         if (isReadOperation(dataAccessRequest)) {
             sdncResponse = dmiService.getResourceData(cmHandle,
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisher.java b/src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisher.java
new file mode 100644 (file)
index 0000000..373a09d
--- /dev/null
@@ -0,0 +1,72 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.stereotype.Component;
+import org.springframework.util.concurrent.ListenableFuture;
+import org.springframework.util.concurrent.ListenableFutureCallback;
+
+@Component
+@Slf4j
+public class NcmpKafkaPublisher {
+
+    private final KafkaTemplate<String, Object> kafkaTemplate;
+    private final String topicName;
+
+    /**
+     * KafkaTemplate and Topic name.
+     *
+     * @param kafkaTemplate kafka template
+     * @param topicName     topic name
+     */
+    @Autowired
+    public NcmpKafkaPublisher(final KafkaTemplate<String, Object> kafkaTemplate,
+            @Value("${app.ncmp.async-m2m.topic}") final String topicName) {
+        this.kafkaTemplate = kafkaTemplate;
+        this.topicName = topicName;
+    }
+
+    /**
+     * Sends message to the configured topic with a message key.
+     *
+     * @param messageKey message key
+     * @param payload    message payload
+     */
+    public void sendMessage(final String messageKey, final Object payload) {
+        final ListenableFuture<SendResult<String, Object>> send = kafkaTemplate.send(topicName, messageKey, payload);
+        send.addCallback(new ListenableFutureCallback<>() {
+            @Override
+            public void onFailure(final Throwable ex) {
+                log.warn("Failed to send the messages {}", ex.getMessage());
+            }
+
+            @Override
+            public void onSuccess(final SendResult<String, Object> result) {
+                log.debug("Sent message {}", result.getProducerRecord());
+            }
+        });
+    }
+}
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherService.java b/src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherService.java
new file mode 100644 (file)
index 0000000..f5e1839
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.service;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+@AllArgsConstructor
+public class NcmpKafkaPublisherService {
+
+    private final NcmpKafkaPublisher ncmpKafkaPublisher;
+
+    /**
+     * publish the message to NCMP.
+     *
+     * @param messageKey message key
+     * @param message    message payload
+     */
+    public void publishToNcmp(final String messageKey, final Object message) {
+        log.debug("Publishing message : {} to NCMP with message-key : {}", message, messageKey);
+        ncmpKafkaPublisher.sendMessage(messageKey, message);
+    }
+}
index 2d324c5..71a689c 100644 (file)
@@ -1,5 +1,5 @@
 #  ============LICENSE_START=======================================================
-#  Copyright (C) 2021 Nordix Foundation
+#  Copyright (C) 2021-2022 Nordix Foundation
 #  Modifications Copyright (C) 2021 Bell Canada.
 #  ================================================================================
 #  Licensed under the Apache License, Version 2.0 (the "License");
@@ -42,6 +42,19 @@ spring:
   mvc:
     pathmatch:
       matching-strategy: ANT_PATH_MATCHER
+  kafka:
+    bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER}
+    security:
+      protocol: PLAINTEXT
+    producer:
+      key-serializer: org.apache.kafka.common.serialization.StringSerializer
+      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
+      client-id: dmi-plugin
+
+app:
+  ncmp:
+    async-m2m:
+      topic: ${NCMP_ASYNC_M2M_TOPIC:ncmp-async-m2m}
 
 # Actuator
 management:
index 64b8232..b391f8c 100644 (file)
@@ -1,6 +1,6 @@
 /*
  *  ============LICENSE_START=======================================================
- *  Copyright (C) 2021 Nordix Foundation
+ *  Copyright (C) 2021-2022 Nordix Foundation
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
 
 package org.onap.cps.ncmp.dmi.config
 
+import org.springdoc.core.GroupedOpenApi
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.boot.test.context.SpringBootTest
 import org.springframework.test.context.ContextConfiguration
 import spock.lang.Specification
-import springfox.documentation.spring.web.plugins.Docket
 
 @SpringBootTest
 @ContextConfiguration(classes = [DmiPluginConfig.DmiPluginProperties])
@@ -38,15 +38,15 @@ class DmiPluginConfigSpec extends Specification {
             dmiPluginProperties.dmiServiceUrl == 'some url for the dmi service'
     }
 
-    def 'DMI plugin docket creation.'() {
+    def 'DMI plugin api creation.'() {
         given: 'a DMI plugin configuration'
             DmiPluginConfig objectUnderTest = new DmiPluginConfig()
         when: 'the api method is invoked'
             def result = objectUnderTest.api()
         then: 'a spring web plugin docket is returned'
-            result instanceof Docket
-        and: 'it is named "dmi-plugin-docket"'
-            result.groupName == 'dmi-plugin-docket'
+            result instanceof GroupedOpenApi
+        and: 'it is named "dmi-plugin-api"'
+            result.group == 'dmi-plugin-api'
     }
 
 }
index 2f200cf..1541f8c 100644 (file)
@@ -25,6 +25,7 @@ import org.onap.cps.ncmp.dmi.TestUtils
 import org.onap.cps.ncmp.dmi.exception.DmiException
 import org.onap.cps.ncmp.dmi.exception.ModuleResourceNotFoundException
 import org.onap.cps.ncmp.dmi.exception.ModulesNotFoundException
+import org.onap.cps.ncmp.dmi.service.NcmpKafkaPublisherService
 import org.onap.cps.ncmp.dmi.service.model.ModuleReference
 import org.onap.cps.ncmp.dmi.model.ModuleSet
 import org.onap.cps.ncmp.dmi.model.ModuleSetSchemas
@@ -62,6 +63,9 @@ class DmiRestControllerSpec extends Specification {
     @SpringBean
     DmiService mockDmiService = Mock()
 
+    @SpringBean
+    NcmpKafkaPublisherService mockNcmpKafkaPublisherService = Mock()
+
     @Value('${rest.api.dmi-base-path}/v1')
     def basePathV1
 
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherServiceSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherServiceSpec.groovy
new file mode 100644 (file)
index 0000000..4fc697e
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.service
+
+import spock.lang.Specification
+
+class NcmpKafkaPublisherServiceSpec extends Specification {
+
+    def mockNcmpKafkaPublisher = Mock(NcmpKafkaPublisher)
+    def objectUnderTest = new NcmpKafkaPublisherService(mockNcmpKafkaPublisher)
+
+    def 'Message publishing'() {
+        given: 'a sample message with key'
+            def message = 'sample message'
+            def messageKey = 'sample-key'
+        when: 'published'
+            objectUnderTest.publishToNcmp(messageKey, message)
+        then: 'no exception is thrown'
+            noExceptionThrown()
+    }
+}
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherSpec.groovy
new file mode 100644 (file)
index 0000000..54f3502
--- /dev/null
@@ -0,0 +1,69 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.service
+
+
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.beans.factory.annotation.Value
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.kafka.core.KafkaTemplate
+import org.springframework.test.context.DynamicPropertyRegistry
+import org.springframework.test.context.DynamicPropertySource
+import org.testcontainers.containers.KafkaContainer
+import org.testcontainers.spock.Testcontainers
+import spock.lang.Specification
+
+@SpringBootTest
+@Testcontainers
+class NcmpKafkaPublisherSpec extends Specification {
+
+    static kafkaTestContainer = new KafkaContainer()
+    static {
+        Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::stop))
+    }
+
+    def setupSpec() {
+        kafkaTestContainer.start()
+    }
+
+    @Autowired
+    KafkaTemplate<String, Object> kafkaTemplate
+
+    @Value('${app.ncmp.async-m2m.topic}')
+    String topic
+
+    def 'Publish message'() {
+        given: 'a sample messsage and key'
+            def message = 'sample message'
+            def messageKey = 'message-key'
+            def objectUnderTest = new NcmpKafkaPublisher(kafkaTemplate, topic)
+        when: 'a message is published'
+            objectUnderTest.sendMessage(messageKey, message)
+        then: 'no exception is thrown'
+            noExceptionThrown()
+
+    }
+
+    @DynamicPropertySource
+    static void registerKafkaProperties(DynamicPropertyRegistry registry) {
+        registry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers)
+    }
+}
index dc30c9d..344743b 100644 (file)
@@ -1,5 +1,5 @@
 # ============LICENSE_START=======================================================
-# Copyright (C) 2021 Nordix Foundation
+# Copyright (C) 2021-2022 Nordix Foundation
 # ================================================================================
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@ rest:
     dmi-base-path: /dmi
 
 security:
+  permit-uri: /manage/**,/swagger-ui/**,/swagger-resources/**,/v3/api-docs
   auth:
     username: cpsuser
     password: cpsr0cks!
@@ -43,3 +44,18 @@ dmi:
   service:
     url: some url for the dmi service
 
+spring:
+  kafka:
+    bootstrap-servers: localhost:9092
+    security:
+      protocol: PLAINTEXT
+    producer:
+      key-serializer: org.apache.kafka.common.serialization.StringSerializer
+      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
+      client-id: dmi-plugin
+
+app:
+  ncmp:
+    async-m2m:
+      topic: ncmp-async-m2m
+