Add new topic for publishing events in Acm Element Impl 56/130456/2
authorrameshiyer27 <ramesh.murugan.iyer@est.tech>
Fri, 26 Aug 2022 09:56:31 +0000 (10:56 +0100)
committerrameshiyer27 <ramesh.murugan.iyer@est.tech>
Fri, 26 Aug 2022 14:19:59 +0000 (15:19 +0100)
Listening and publishing on different topics for better readability of
logs and segregation of events between PDP and AC element.

Issue-ID: POLICY-4332
Signed-off-by: zrrmmua <ramesh.murugan.iyer@est.tech>
Change-Id: I7c44ba1498c73a8bd395ad54eeb09950c584156e

models/src/main/java/org/onap/policy/clamp/models/acm/messages/rest/element/DmaapConfig.java
models/src/main/java/org/onap/policy/clamp/models/acm/messages/rest/element/ElementConfig.java
participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/main/parameters/ElementTopicParameters.java [new file with mode: 0644]
participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/BridgeService.java
participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ConfigService.java
participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/StarterService.java
participant/participant-impl/participant-impl-acelement/src/test/java/org/onap/policy/clamp/acm/element/service/BridgeServiceTest.java
participant/participant-impl/participant-impl-acelement/src/test/java/org/onap/policy/clamp/acm/element/service/ConfigServiceTest.java
participant/participant-impl/participant-impl-acelement/src/test/java/org/onap/policy/clamp/acm/element/service/StarterServiceTest.java

index 0d13bcd..dad92af 100644 (file)
@@ -26,7 +26,9 @@ import lombok.Data;
 public class DmaapConfig {
     private String server;
 
-    private String topic;
+    private String listenerTopic;
+
+    private String publisherTopic;
 
     private Integer fetchTimeout;
 
index d1f0648..c86f7f2 100644 (file)
@@ -26,11 +26,11 @@ import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
 @Data
 public class ElementConfig {
 
-    private ToscaConceptIdentifier elementId;
+    private ToscaConceptIdentifier receiverId;
 
     private ElementType elementType;
 
-    private Integer timerSec;
+    private Integer timerMs;
 
     private DmaapConfig topicParameterGroup;
 }
diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/main/parameters/ElementTopicParameters.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/main/parameters/ElementTopicParameters.java
new file mode 100644 (file)
index 0000000..2139440
--- /dev/null
@@ -0,0 +1,44 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.element.main.parameters;
+
+import java.util.List;
+import lombok.Data;
+import org.onap.policy.clamp.models.acm.messages.rest.element.DmaapConfig;
+import org.onap.policy.common.endpoints.parameters.TopicParameters;
+
+@Data
+public class ElementTopicParameters extends TopicParameters {
+
+    /**
+     * Constructor.
+     * @param parameters DmaapConfig
+     */
+    public ElementTopicParameters(DmaapConfig parameters) {
+        super();
+        this.setTopic(parameters.getListenerTopic());
+        this.setServers(List.of(parameters.getServer()));
+        this.setFetchTimeout(parameters.getFetchTimeout());
+        this.setTopicCommInfrastructure(parameters.getTopicCommInfrastructure());
+        this.setUseHttps(parameters.isUseHttps());
+    }
+
+}
index f8f9024..255974d 100644 (file)
@@ -26,11 +26,11 @@ import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
 import org.onap.policy.clamp.acm.element.handler.MessageActivator;
 import org.onap.policy.clamp.acm.element.handler.MessageHandler;
+import org.onap.policy.clamp.acm.element.main.parameters.ElementTopicParameters;
 import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException;
 import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementMessage;
 import org.onap.policy.clamp.models.acm.messages.rest.element.ElementConfig;
 import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
-import org.onap.policy.common.endpoints.parameters.TopicParameters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
@@ -52,16 +52,14 @@ public class ConfigService {
      * @param elementConfig the configuration
      */
     public void activateElement(@NonNull ElementConfig elementConfig) {
-        var topicParameters = new TopicParameters();
-        topicParameters.setTopic(elementConfig.getTopicParameterGroup().getTopic());
-        topicParameters.setServers(List.of(elementConfig.getTopicParameterGroup().getServer()));
-        topicParameters.setFetchTimeout(elementConfig.getTopicParameterGroup().getFetchTimeout());
-        topicParameters.setTopicCommInfrastructure(elementConfig.getTopicParameterGroup().getTopicCommInfrastructure());
-        topicParameters.setUseHttps(elementConfig.getTopicParameterGroup().isUseHttps());
+        var listenerTopicParameters = new ElementTopicParameters(elementConfig.getTopicParameterGroup());
+
+        var publisherTopicParameters = new ElementTopicParameters(elementConfig.getTopicParameterGroup());
+        publisherTopicParameters.setTopic(elementConfig.getTopicParameterGroup().getPublisherTopic());
 
         var parameters = new TopicParameterGroup();
-        parameters.setTopicSinks(List.of(topicParameters));
-        parameters.setTopicSources(List.of(topicParameters));
+        parameters.setTopicSinks(List.of(publisherTopicParameters));
+        parameters.setTopicSources(List.of(listenerTopicParameters));
 
         if (!parameters.isValid()) {
             throw new AutomationCompositionRuntimeException(Response.Status.BAD_REQUEST,
index 589397d..479fa44 100644 (file)
@@ -75,12 +75,12 @@ public class StarterService extends AbstractElementService implements AutoClosea
         if (timerPool != null) {
             throw new PfModelRuntimeException(Response.Status.CONFLICT, "StarterService alredy actived!");
         }
-        receiver = elementConfig.getElementId();
+        receiver = elementConfig.getReceiverId();
 
         timerPool = new ScheduledThreadPoolExecutor(1);
         timerPool.setRemoveOnCancelPolicy(true);
-        future = timerPool.scheduleAtFixedRate(this::sendMessage, elementConfig.getTimerSec(),
-                elementConfig.getTimerSec(), TimeUnit.MILLISECONDS);
+        future = timerPool.scheduleAtFixedRate(this::sendMessage, elementConfig.getTimerMs(),
+                elementConfig.getTimerMs(), TimeUnit.MILLISECONDS);
     }
 
     private void sendMessage() {
@@ -99,8 +99,8 @@ public class StarterService extends AbstractElementService implements AutoClosea
         if (future != null) {
             future.cancel(true);
         }
-        future = timerPool.scheduleAtFixedRate(this::sendMessage, elementConfig.getTimerSec(),
-                elementConfig.getTimerSec(), TimeUnit.MILLISECONDS);
+        future = timerPool.scheduleAtFixedRate(this::sendMessage, elementConfig.getTimerMs(),
+                elementConfig.getTimerMs(), TimeUnit.MILLISECONDS);
     }
 
     @Override
index cc62d8d..aae8d58 100644 (file)
@@ -47,7 +47,7 @@ class BridgeServiceTest {
         assertThat(bridgeService.getType()).isEqualTo(ElementType.BRIDGE);
 
         var elementConfig = new ElementConfig();
-        elementConfig.setElementId(new ToscaConceptIdentifier("onap.policy.clamp.ac.element2", "1.0.0"));
+        elementConfig.setReceiverId(new ToscaConceptIdentifier("onap.policy.clamp.ac.element2", "1.0.0"));
         bridgeService.active(elementConfig);
 
         bridgeService.handleMessage(new ElementStatus());
index 785673b..156f7d3 100644 (file)
@@ -39,7 +39,8 @@ class ConfigServiceTest {
         var elementConfig = new ElementConfig();
         elementConfig.setTopicParameterGroup(new DmaapConfig());
         elementConfig.getTopicParameterGroup().setTopicCommInfrastructure("dmaap");
-        elementConfig.getTopicParameterGroup().setTopic("topic");
+        elementConfig.getTopicParameterGroup().setListenerTopic("topic");
+        elementConfig.getTopicParameterGroup().setPublisherTopic("topic");
         elementConfig.getTopicParameterGroup().setServer("localhost");
         elementConfig.getTopicParameterGroup().setFetchTimeout(1000);
 
index ee58a35..28af70d 100644 (file)
@@ -47,8 +47,8 @@ class StarterServiceTest {
             assertThat(starterService.getType()).isEqualTo(ElementType.STARTER);
 
             var elementConfig = new ElementConfig();
-            elementConfig.setTimerSec(100);
-            elementConfig.setElementId(new ToscaConceptIdentifier("onap.policy.clamp.ac.element2", "1.0.0"));
+            elementConfig.setTimerMs(100);
+            elementConfig.setReceiverId(new ToscaConceptIdentifier("onap.policy.clamp.ac.element2", "1.0.0"));
             starterService.active(elementConfig);
             verify(messagePublisher, timeout(200).atLeastOnce()).publishMsg(any(ElementMessage.class));
             starterService.deactivate();