Fix ControlLoopAck and remove circular dependency 48/123648/3
authorFrancescoFioraEst <francesco.fiora@est.tech>
Fri, 27 Aug 2021 09:33:52 +0000 (10:33 +0100)
committerAjith Sreekumar <ajith.sreekumar@bell.ca>
Tue, 31 Aug 2021 10:13:54 +0000 (10:13 +0000)
Issue-ID: CLAMP-1028
Change-Id: I23b6806bf9a22449c7709b619a20f3412e3e3742
Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
models/src/main/java/org/onap/policy/clamp/controlloop/models/controlloop/concepts/ControlLoopElementAck.java [new file with mode: 0644]
models/src/main/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ControlLoopAck.java
models/src/test/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ControlLoopAckTest.java
models/src/test/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ControlLoopUpdateTest.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/MessageSender.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java

diff --git a/models/src/main/java/org/onap/policy/clamp/controlloop/models/controlloop/concepts/ControlLoopElementAck.java b/models/src/main/java/org/onap/policy/clamp/controlloop/models/controlloop/concepts/ControlLoopElementAck.java
new file mode 100644 (file)
index 0000000..a5918fe
--- /dev/null
@@ -0,0 +1,40 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.models.controlloop.concepts;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+@Getter
+@Setter
+@AllArgsConstructor
+@ToString
+public class ControlLoopElementAck {
+
+    // Result: Success/Fail.
+    private Boolean result;
+
+    // Message indicating reason for failure
+    private String message;
+
+}
index 55ba7fa..8e36049 100644 (file)
@@ -27,7 +27,7 @@ import java.util.function.UnaryOperator;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
-import org.apache.commons.lang3.tuple.Pair;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElementAck;
 import org.onap.policy.models.base.PfUtils;
 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
 
@@ -45,7 +45,7 @@ public class ControlLoopAck extends ParticipantAckMessage {
 
     // A map with ControlLoopElementID as its key, and a pair of result and message as value per
     // ControlLoopElement.
-    private Map<UUID, Pair<Boolean, String>> controlLoopResultMap = new LinkedHashMap<>();
+    private Map<UUID, ControlLoopElementAck> controlLoopResultMap = new LinkedHashMap<>();
 
     /**
      * Constructor for instantiating ParticipantRegisterAck class with message name.
index d7d7e43..5fded73 100644 (file)
@@ -22,35 +22,36 @@ package org.onap.policy.clamp.controlloop.models.messages.dmaap.participant;
 
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertEquals;
+import static org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageUtils.assertSerializable;
 import static org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageUtils.removeVariableFields;
 
 import java.util.Map;
 import java.util.UUID;
-import org.apache.commons.lang3.tuple.Pair;
 import org.junit.jupiter.api.Test;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElementAck;
+import org.onap.policy.common.utils.coder.CoderException;
 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
 
 class ControlLoopAckTest {
 
     @Test
-    void testCopyConstructor() {
+    void testCopyConstructor() throws CoderException {
         assertThatThrownBy(() -> new ControlLoopAck((ControlLoopAck) null))
             .isInstanceOf(NullPointerException.class);
 
-        final ControlLoopAck orig = new ControlLoopAck(ParticipantMessageType.CONTROL_LOOP_UPDATE);
+        final var orig = new ControlLoopAck(ParticipantMessageType.CONTROL_LOOP_UPDATE);
 
         // verify with null values
         assertEquals(removeVariableFields(orig.toString()),
                 removeVariableFields(new ControlLoopAck(orig).toString()));
 
         // verify with all values
-        ToscaConceptIdentifier id = new ToscaConceptIdentifier("id", "1.2.3");
+        var id = new ToscaConceptIdentifier("id", "1.2.3");
         orig.setControlLoopId(id);
         orig.setParticipantId(id);
         orig.setParticipantType(id);
-
-        Pair<Boolean, String> clElementResult = Pair.of(true, "ControlLoopElement result");
-        final Map<UUID, Pair<Boolean, String>> controlLoopResultMap = Map.of(UUID.randomUUID(), clElementResult);
+        var clElementResult = new ControlLoopElementAck(true, "ControlLoopElement result");
+        final var controlLoopResultMap = Map.of(UUID.randomUUID(), clElementResult);
         orig.setControlLoopResultMap(controlLoopResultMap);
 
         orig.setResponseTo(UUID.randomUUID());
@@ -59,5 +60,7 @@ class ControlLoopAckTest {
 
         assertEquals(removeVariableFields(orig.toString()),
                 removeVariableFields(new ControlLoopAck(orig).toString()));
+
+        assertSerializable(orig, ControlLoopAck.class);
     }
 }
index 3aafe56..1b155a1 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.policy.clamp.controlloop.models.messages.dmaap.participant;
 
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertEquals;
+import static org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageUtils.assertSerializable;
 import static org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageUtils.removeVariableFields;
 
 import java.time.Instant;
@@ -33,6 +34,7 @@ import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop
 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopOrderedState;
 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState;
 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantUpdates;
+import org.onap.policy.common.utils.coder.CoderException;
 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
 
 /**
@@ -40,7 +42,7 @@ import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
  */
 class ControlLoopUpdateTest {
     @Test
-    void testCopyConstructor() {
+    void testCopyConstructor() throws CoderException {
         assertThatThrownBy(() -> new ControlLoopUpdate(null)).isInstanceOf(NullPointerException.class);
 
         ControlLoopUpdate orig = new ControlLoopUpdate();
@@ -71,5 +73,6 @@ class ControlLoopUpdateTest {
         ControlLoopUpdate other = new ControlLoopUpdate(orig);
 
         assertEquals(removeVariableFields(orig.toString()), removeVariableFields(other.toString()));
+        assertSerializable(orig, ControlLoopUpdate.class);
     }
 }
index 680acd2..e11c883 100644 (file)
@@ -25,52 +25,41 @@ import java.util.TimerTask;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop;
-import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement;
-import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoops;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopAck;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck;
-import org.onap.policy.clamp.controlloop.participant.intermediary.api.ControlLoopElementListener;
 import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
-import org.onap.policy.models.base.PfModelException;
-import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
+import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
 
 /**
  * This class sends messages from participants to CLAMP.
  */
+@Component
 public class MessageSender extends TimerTask implements Closeable {
     private static final Logger LOGGER = LoggerFactory.getLogger(MessageSender.class);
 
     private final ParticipantHandler participantHandler;
-    private final ParticipantMessagePublisher publisher;
     private ScheduledExecutorService timerPool;
 
     /**
      * Constructor, set the publisher.
      *
      * @param participantHandler the participant handler to use for gathering information
-     * @param publisher the publisher to use for sending messages
-     * @param interval time interval to send Participant Status periodic messages
+     * @param parameters the parameters of the participant
      */
-    public MessageSender(ParticipantHandler participantHandler, ParticipantMessagePublisher publisher,
-            long interval) {
+    public MessageSender(ParticipantHandler participantHandler, ParticipantParameters parameters) {
         this.participantHandler = participantHandler;
-        this.publisher = publisher;
 
         // Kick off the timer
         timerPool = makeTimerPool();
+        var interval = parameters.getIntermediaryParameters().getReportingTimeIntervalMs();
         timerPool.scheduleAtFixedRate(this, interval, interval, TimeUnit.MILLISECONDS);
     }
 
     @Override
     public void run() {
         LOGGER.debug("Sent heartbeat to CLAMP");
-        this.sendHeartbeat();
+        participantHandler.sendHeartbeat();
     }
 
     @Override
@@ -78,97 +67,6 @@ public class MessageSender extends TimerTask implements Closeable {
         timerPool.shutdown();
     }
 
-    /**
-     * Send a response message for this participant.
-     *
-     * @param ackMessage the details to include in the response message
-     */
-    public void sendAckResponse(ControlLoopAck ackMessage) {
-        sendAckResponse(null, ackMessage);
-    }
-
-    /**
-     * Dispatch a response message for this participant.
-     *
-     * @param controlLoopId the control loop to which this message is a response
-     * @param ackMessage the details to include in the response message
-     */
-    public void sendAckResponse(ToscaConceptIdentifier controlLoopId, ControlLoopAck ackMessage) {
-        // Participant related fields
-        ackMessage.setParticipantType(participantHandler.getParticipantType());
-        ackMessage.setParticipantId(participantHandler.getParticipantId());
-        publisher.sendControlLoopAck(ackMessage);
-    }
-
-    /**
-     * Send a ParticipantRegister message for this participant.
-     *
-     * @param message the participantRegister message
-     */
-    public void sendParticipantRegister(ParticipantRegister message) {
-        publisher.sendParticipantRegister(message);
-    }
-
-    /**
-     * Send a ParticipantDeregister message for this participant.
-     *
-     * @param message the participantDeRegister message
-     */
-    public void sendParticipantDeregister(ParticipantDeregister message) {
-        publisher.sendParticipantDeregister(message);
-    }
-
-    /**
-     * Send a ParticipantUpdateAck message for this participant update.
-     *
-     * @param message the participantUpdateAck message
-     */
-    public void sendParticipantUpdateAck(ParticipantUpdateAck message) {
-        publisher.sendParticipantUpdateAck(message);
-    }
-
-    /**
-     * Send a ParticipantStatus message for this participant.
-     *
-     * @param participantStatus the ParticipantStatus message
-     */
-    public void sendParticipantStatus(ParticipantStatus participantStatus) {
-        var controlLoops = participantHandler.getControlLoopHandler().getControlLoops();
-        for (ControlLoopElementListener clElementListener :
-            participantHandler.getControlLoopHandler().getListeners()) {
-            updateClElementStatistics(controlLoops, clElementListener);
-        }
-
-        publisher.sendParticipantStatus(participantStatus);
-    }
-
-    /**
-     * Dispatch a heartbeat for this participant.
-     */
-    public void sendHeartbeat() {
-        publisher.sendHeartbeat(participantHandler.makeHeartbeat(false));
-    }
-
-    /**
-     * Update ControlLoopElement statistics. The control loop elements listening will be
-     * notified to retrieve statistics from respective controlloop elements, and controlloopelements
-     * data on the handler will be updated.
-     *
-     * @param controlLoops the control loops
-     * @param clElementListener control loop element listener
-     */
-    public void updateClElementStatistics(ControlLoops controlLoops, ControlLoopElementListener clElementListener) {
-        for (ControlLoop controlLoop : controlLoops.getControlLoopList()) {
-            for (ControlLoopElement element : controlLoop.getElements().values()) {
-                try {
-                    clElementListener.handleStatistics(element.getId());
-                } catch (PfModelException e) {
-                    LOGGER.debug("Getting statistics for Control loop element failed");
-                }
-            }
-        }
-    }
-
     /**
      * Makes a new timer pool.
      *
index 0e276f3..8b4c61d 100644 (file)
@@ -29,12 +29,11 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.stream.Collectors;
 import lombok.Getter;
-import lombok.NoArgsConstructor;
 import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.tuple.Pair;
 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ClElementStatistics;
 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop;
 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElementAck;
 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElementDefinition;
 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopOrderedState;
 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState;
@@ -45,24 +44,25 @@ import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.Contr
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopUpdate;
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
 import org.onap.policy.clamp.controlloop.participant.intermediary.api.ControlLoopElementListener;
-import org.onap.policy.clamp.controlloop.participant.intermediary.comm.MessageSender;
-import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantIntermediaryParameters;
+import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantMessagePublisher;
+import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters;
 import org.onap.policy.models.base.PfModelException;
 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
 import org.onap.policy.models.tosca.authorative.concepts.ToscaNodeTemplate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
 
 /*
  * This class is responsible for managing the state of all control loops in the participant.
  */
-@NoArgsConstructor
+@Component
 public class ControlLoopHandler {
     private static final Logger LOGGER = LoggerFactory.getLogger(ControlLoopHandler.class);
 
-    private ToscaConceptIdentifier participantType = null;
-    private ToscaConceptIdentifier participantId = null;
-    private MessageSender messageSender = null;
+    private final ToscaConceptIdentifier participantType;
+    private final ToscaConceptIdentifier participantId;
+    private final ParticipantMessagePublisher publisher;
 
     @Getter
     private final Map<ToscaConceptIdentifier, ControlLoop> controlLoopMap = new LinkedHashMap<>();
@@ -77,12 +77,12 @@ public class ControlLoopHandler {
      * Constructor, set the participant ID and messageSender.
      *
      * @param parameters the parameters of the participant
-     * @param messageSender the messageSender for sending responses to messages
+     * @param publisher the ParticipantMessage Publisher
      */
-    public ControlLoopHandler(ParticipantIntermediaryParameters parameters, MessageSender messageSender) {
-        this.participantType = parameters.getParticipantType();
-        this.participantId = parameters.getParticipantId();
-        this.messageSender = messageSender;
+    public ControlLoopHandler(ParticipantParameters parameters, ParticipantMessagePublisher publisher) {
+        this.participantType = parameters.getIntermediaryParameters().getParticipantType();
+        this.participantId = parameters.getIntermediaryParameters().getParticipantId();
+        this.publisher = publisher;
     }
 
     public void registerControlLoopElementListener(ControlLoopElementListener listener) {
@@ -104,18 +104,20 @@ public class ControlLoopHandler {
             LOGGER.warn("Cannot update Control loop element state, id is null");
         }
 
-        var controlLoopStateChangeAck =
-                new ControlLoopAck(ParticipantMessageType.CONTROLLOOP_STATECHANGE_ACK);
         ControlLoopElement clElement = elementsOnThisParticipant.get(id);
         if (clElement != null) {
+            var controlLoopStateChangeAck =
+                    new ControlLoopAck(ParticipantMessageType.CONTROLLOOP_STATECHANGE_ACK);
+            controlLoopStateChangeAck.setParticipantId(participantId);
+            controlLoopStateChangeAck.setParticipantType(participantType);
             clElement.setOrderedState(orderedState);
             clElement.setState(newState);
             controlLoopStateChangeAck.getControlLoopResultMap().put(clElement.getId(),
-                Pair.of(true, "Control loop element {} state changed to {}\", id, newState)"));
+                new  ControlLoopElementAck(true, "Control loop element {} state changed to {}\", id, newState)"));
             LOGGER.debug("Control loop element {} state changed to {}", id, newState);
             controlLoopStateChangeAck.setMessage("ControlLoopElement state changed to {} " + newState);
             controlLoopStateChangeAck.setResult(true);
-            messageSender.sendAckResponse(controlLoopStateChangeAck);
+            publisher.sendControlLoopAck(controlLoopStateChangeAck);
             return clElement;
         }
         return null;
@@ -147,15 +149,17 @@ public class ControlLoopHandler {
         }
 
         var controlLoop = controlLoopMap.get(stateChangeMsg.getControlLoopId());
-        var controlLoopAck = new ControlLoopAck(ParticipantMessageType.CONTROL_LOOP_STATE_CHANGE);
 
         if (controlLoop == null) {
+            var controlLoopAck = new ControlLoopAck(ParticipantMessageType.CONTROL_LOOP_STATE_CHANGE);
+            controlLoopAck.setParticipantId(participantId);
+            controlLoopAck.setParticipantType(participantType);
             controlLoopAck.setMessage("Control loop " + stateChangeMsg.getControlLoopId()
                 + " does not use this participant " + participantId);
             controlLoopAck.setResult(false);
             controlLoopAck.setResponseTo(stateChangeMsg.getMessageId());
             controlLoopAck.setControlLoopId(stateChangeMsg.getControlLoopId());
-            messageSender.sendAckResponse(controlLoopAck);
+            publisher.sendControlLoopAck(controlLoopAck);
             LOGGER.debug("Control loop {} does not use this participant", stateChangeMsg.getControlLoopId());
             return;
         }
@@ -200,17 +204,19 @@ public class ControlLoopHandler {
 
         var controlLoop = controlLoopMap.get(updateMsg.getControlLoopId());
 
-        var controlLoopUpdateAck = new ControlLoopAck(ParticipantMessageType.CONTROLLOOP_UPDATE_ACK);
-
         // TODO: Updates to existing ControlLoops are not supported yet (Addition/Removal of ControlLoop
         // elements to existing ControlLoop has to be supported).
         if (controlLoop != null) {
+            var controlLoopUpdateAck = new ControlLoopAck(ParticipantMessageType.CONTROLLOOP_UPDATE_ACK);
+            controlLoopUpdateAck.setParticipantId(participantId);
+            controlLoopUpdateAck.setParticipantType(participantType);
+
             controlLoopUpdateAck.setMessage("Control loop " + updateMsg.getControlLoopId()
                 + " already defined on participant " + participantId);
             controlLoopUpdateAck.setResult(false);
             controlLoopUpdateAck.setResponseTo(updateMsg.getMessageId());
             controlLoopUpdateAck.setControlLoopId(updateMsg.getControlLoopId());
-            messageSender.sendAckResponse(controlLoopUpdateAck);
+            publisher.sendControlLoopAck(controlLoopUpdateAck);
             return;
         }
 
@@ -319,10 +325,12 @@ public class ControlLoopHandler {
 
         if (orderedState.equals(controlLoop.getOrderedState())) {
             var controlLoopAck = new ControlLoopAck(ParticipantMessageType.CONTROL_LOOP_STATE_CHANGE);
+            controlLoopAck.setParticipantId(participantId);
+            controlLoopAck.setParticipantType(participantType);
             controlLoopAck.setMessage("Control loop is already in state" + orderedState);
             controlLoopAck.setResult(false);
             controlLoopAck.setControlLoopId(controlLoop.getDefinition());
-            messageSender.sendAckResponse(controlLoopAck);
+            publisher.sendControlLoopAck(controlLoopAck);
             return;
         }
 
index 66e09e7..d3f6c4d 100644 (file)
@@ -22,7 +22,6 @@
 
 package org.onap.policy.clamp.controlloop.participant.intermediary.handler;
 
-import java.io.Closeable;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
@@ -33,9 +32,11 @@ import lombok.Getter;
 import lombok.Setter;
 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ClElementStatisticsList;
 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement;
 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElementDefinition;
 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopInfo;
 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopStatistics;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoops;
 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant;
 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantDefinition;
 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantHealthStatus;
@@ -53,9 +54,10 @@ import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.Parti
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatusReq;
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdate;
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck;
-import org.onap.policy.clamp.controlloop.participant.intermediary.comm.MessageSender;
+import org.onap.policy.clamp.controlloop.participant.intermediary.api.ControlLoopElementListener;
 import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantMessagePublisher;
 import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters;
+import org.onap.policy.models.base.PfModelException;
 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
 import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate;
 import org.slf4j.Logger;
@@ -67,14 +69,14 @@ import org.springframework.stereotype.Component;
  */
 @Getter
 @Component
-public class ParticipantHandler implements Closeable {
+public class ParticipantHandler {
     private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantHandler.class);
 
     private final ToscaConceptIdentifier participantType;
     private final ToscaConceptIdentifier participantId;
-    private final MessageSender sender;
     private final ControlLoopHandler controlLoopHandler;
     private final ParticipantStatistics participantStatistics;
+    private final ParticipantMessagePublisher publisher;
 
     @Setter
     private ParticipantState state = ParticipantState.UNKNOWN;
@@ -82,10 +84,9 @@ public class ParticipantHandler implements Closeable {
     @Setter
     private ParticipantHealthStatus healthStatus = ParticipantHealthStatus.UNKNOWN;
 
-    private List<ControlLoopElementDefinition> clElementDefsOnThisParticipant =
-            new ArrayList<>();
+    private final List<ControlLoopElementDefinition> clElementDefsOnThisParticipant = new ArrayList<>();
 
-    public ToscaServiceTemplate toscaServiceTemplate = new ToscaServiceTemplate();
+    private ToscaServiceTemplate toscaServiceTemplate = new ToscaServiceTemplate();
 
     /**
      * Constructor, set the participant ID and sender.
@@ -93,13 +94,12 @@ public class ParticipantHandler implements Closeable {
      * @param parameters the parameters of the participant
      * @param publisher the publisher for sending responses to messages
      */
-    public ParticipantHandler(ParticipantParameters parameters, ParticipantMessagePublisher publisher) {
+    public ParticipantHandler(ParticipantParameters parameters, ParticipantMessagePublisher publisher,
+            ControlLoopHandler controlLoopHandler) {
         this.participantType = parameters.getIntermediaryParameters().getParticipantType();
         this.participantId = parameters.getIntermediaryParameters().getParticipantId();
-        this.sender =
-                new MessageSender(this, publisher,
-                        parameters.getIntermediaryParameters().getReportingTimeIntervalMs());
-        this.controlLoopHandler = new ControlLoopHandler(parameters.getIntermediaryParameters(), sender);
+        this.publisher = publisher;
+        this.controlLoopHandler = controlLoopHandler;
         this.participantStatistics = new ParticipantStatistics();
         this.participantStatistics.setParticipantId(participantId);
         this.participantStatistics.setState(state);
@@ -107,18 +107,40 @@ public class ParticipantHandler implements Closeable {
         this.participantStatistics.setTimeStamp(Instant.now());
     }
 
-    @Override
-    public void close() {
-        sender.close();
-    }
-
     /**
      * Method which handles a participant health check event from clamp.
      *
      * @param participantStatusReqMsg participant participantStatusReq message
      */
     public void handleParticipantStatusReq(final ParticipantStatusReq participantStatusReqMsg) {
-        sender.sendParticipantStatus(makeHeartbeat(true));
+        var controlLoops = controlLoopHandler.getControlLoops();
+        for (ControlLoopElementListener clElementListener : controlLoopHandler.getListeners()) {
+            updateClElementStatistics(controlLoops, clElementListener);
+        }
+
+        var participantStatus = makeHeartbeat(true);
+        publisher.sendParticipantStatus(participantStatus);
+    }
+
+    /**
+     * Update ControlLoopElement statistics. The control loop elements listening will be
+     * notified to retrieve statistics from respective controlloop elements, and controlloopelements
+     * data on the handler will be updated.
+     *
+     * @param controlLoops the control loops
+     * @param clElementListener control loop element listener
+     */
+    private void updateClElementStatistics(ControlLoops controlLoops, ControlLoopElementListener clElementListener) {
+        for (ControlLoop controlLoop : controlLoops.getControlLoopList()) {
+            for (ControlLoopElement element : controlLoop.getElements().values()) {
+                try {
+                    clElementListener.handleStatistics(element.getId());
+                } catch (PfModelException e) {
+                    LOGGER.debug("Getting statistics for Control loop element failed for element ID {}",
+                            element.getId(), e);
+                }
+            }
+        }
     }
 
     /**
@@ -165,7 +187,7 @@ public class ParticipantHandler implements Closeable {
 
         var participantUpdateAck = new ParticipantUpdateAck();
         handleStateChange(participantState, participantUpdateAck);
-        sender.sendParticipantUpdateAck(participantUpdateAck);
+        publisher.sendParticipantUpdateAck(participantUpdateAck);
         return getParticipant(definition.getName(), definition.getVersion());
     }
 
@@ -215,7 +237,7 @@ public class ParticipantHandler implements Closeable {
         participantRegister.setParticipantId(participantId);
         participantRegister.setParticipantType(participantType);
 
-        sender.sendParticipantRegister(participantRegister);
+        publisher.sendParticipantRegister(participantRegister);
     }
 
     /**
@@ -236,7 +258,7 @@ public class ParticipantHandler implements Closeable {
         participantDeregister.setParticipantId(participantId);
         participantDeregister.setParticipantType(participantType);
 
-        sender.sendParticipantDeregister(participantDeregister);
+        publisher.sendParticipantDeregister(participantDeregister);
     }
 
     /**
@@ -267,7 +289,8 @@ public class ParticipantHandler implements Closeable {
             // This message is to commission the controlloop
             for (ParticipantDefinition participantDefinition : participantUpdateMsg.getParticipantDefinitionUpdates()) {
                 if (participantDefinition.getParticipantId().equals(participantType)) {
-                    clElementDefsOnThisParticipant = participantDefinition.getControlLoopElementDefinitionList();
+                    clElementDefsOnThisParticipant.clear();
+                    clElementDefsOnThisParticipant.addAll(participantDefinition.getControlLoopElementDefinitionList());
                     break;
                 }
             }
@@ -289,7 +312,14 @@ public class ParticipantHandler implements Closeable {
         participantUpdateAck.setParticipantId(participantId);
         participantUpdateAck.setParticipantType(participantType);
 
-        sender.sendParticipantUpdateAck(participantUpdateAck);
+        publisher.sendParticipantUpdateAck(participantUpdateAck);
+    }
+
+    /**
+     * Dispatch a heartbeat for this participant.
+     */
+    public void sendHeartbeat() {
+        publisher.sendHeartbeat(makeHeartbeat(false));
     }
 
     /**
@@ -322,15 +352,14 @@ public class ParticipantHandler implements Closeable {
 
     private List<ControlLoopInfo> getControlLoopInfoList() {
         List<ControlLoopInfo> controlLoopInfoList = new ArrayList<>();
-        for (Map.Entry<ToscaConceptIdentifier, ControlLoop> entry :
-                controlLoopHandler.getControlLoopMap().entrySet()) {
+        for (Map.Entry<ToscaConceptIdentifier, ControlLoop> entry : controlLoopHandler.getControlLoopMap().entrySet()) {
             ControlLoopInfo clInfo = new ControlLoopInfo();
             clInfo.setControlLoopId(entry.getKey());
             ControlLoopStatistics clStatitistics = new ControlLoopStatistics();
             clStatitistics.setControlLoopId(entry.getKey());
             ClElementStatisticsList clElementStatisticsList = new ClElementStatisticsList();
-            clElementStatisticsList.setClElementStatistics(
-                entry.getValue().getControlLoopElementStatisticsList(entry.getValue()));
+            clElementStatisticsList
+                    .setClElementStatistics(entry.getValue().getControlLoopElementStatisticsList(entry.getValue()));
             clStatitistics.setClElementStatisticsList(clElementStatisticsList);
             clInfo.setControlLoopStatistics(clStatitistics);
             clInfo.setState(entry.getValue().getState());