Fix issue in event handling in participants 11/123511/4
authorFrancescoFioraEst <francesco.fiora@est.tech>
Wed, 18 Aug 2021 14:25:59 +0000 (15:25 +0100)
committerFrancescoFioraEst <francesco.fiora@est.tech>
Thu, 26 Aug 2021 12:44:57 +0000 (13:44 +0100)
Fix issue in event handling in participants
and refactor Participant Publisher and Listener

Issue-ID: POLICY-3544
Change-Id: Ic92ffa79d303adfb1c3319fbfefb1faef911a9d4
Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
24 files changed:
models/src/main/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantAckMessage.java
models/src/main/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantMessage.java
models/src/test/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantAckMessageTest.java
models/src/test/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantMessageTest.java
participant/participant-impl/participant-impl-policy/src/test/java/org/onap/policy/clamp/controlloop/participant/policy/endtoend/ParticipantMessagesTest.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantAckListener.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantDeregisterAckListener.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantListener.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantMessagePublisher.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantRegisterAckListener.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusReqListener.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantUpdateListener.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/config/BeanFactory.java [deleted file]
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Listener.java [new file with mode: 0644]
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Publisher.java [new file with mode: 0644]
runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivator.java
runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java
runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java
runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java
runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivatorTest.java

index 8b59a18..c6f5c61 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.policy.clamp.controlloop.models.messages.dmaap.participant;
 
 import java.util.UUID;
 import lombok.Getter;
+import lombok.NonNull;
 import lombok.Setter;
 import lombok.ToString;
 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
@@ -77,4 +78,31 @@ public class ParticipantAckMessage {
         this.participantType = source.participantType;
         this.participantId = source.participantId;
     }
+
+    /**
+     * Determines if this message applies to this participant type.
+     *
+     * @param participantType type of the participant to match against
+     * @param participantId id of the participant to match against
+     * @return {@code true} if this message applies to this participant, {@code false} otherwise
+     */
+    public boolean appliesTo(@NonNull final ToscaConceptIdentifier participantType,
+            @NonNull final ToscaConceptIdentifier participantId) {
+        // Broadcast message to all participants
+        if (this.participantType == null) {
+            return true;
+        }
+
+        if (!participantType.equals(this.participantType)) {
+            return false;
+        }
+
+        // Broadcast message to all control loop elements on this participant
+        if (this.participantId == null) {
+            return true;
+        }
+
+        // Targeted message at this specific participant
+        return participantId.equals(this.participantId);
+    }
 }
index 3ca4d3d..f98a88c 100644 (file)
@@ -97,17 +97,16 @@ public class ParticipantMessage {
             return true;
         }
 
-        // Broadcast message to all control loop elements on this participant
-        if (participantType.equals(this.participantType) && this.participantId == null) {
-            return true;
+        if (!participantType.equals(this.participantType)) {
+            return false;
         }
 
-        // Targeted message at this specific participant
-        if (participantType.equals(this.participantType) && participantId.equals(this.participantId)) {
+        // Broadcast message to all control loop elements on this participant
+        if (this.participantId == null) {
             return true;
         }
 
-        // Message is not for this participant
-        return false;
+        // Targeted message at this specific participant
+        return participantId.equals(this.participantId);
     }
 }
index b9c1053..df82ab0 100644 (file)
@@ -22,15 +22,23 @@ package org.onap.policy.clamp.controlloop.models.messages.dmaap.participant;
 
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageUtils.assertSerializable;
 
 import java.util.UUID;
 import org.junit.jupiter.api.Test;
 import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
 
 class ParticipantAckMessageTest {
     private ParticipantAckMessage message;
 
+    private static final ToscaConceptIdentifier PTYPE_456 = new ToscaConceptIdentifier("PType", "4.5.6");
+    private static final ToscaConceptIdentifier PTYPE_457 = new ToscaConceptIdentifier("PType", "4.5.7");
+    private static final ToscaConceptIdentifier ID_123 = new ToscaConceptIdentifier("id", "1.2.3");
+    private static final ToscaConceptIdentifier ID_124 = new ToscaConceptIdentifier("id", "1.2.4");
+
     @Test
     void testCopyConstructor() throws CoderException {
         assertThatThrownBy(() -> new ParticipantAckMessage((ParticipantAckMessage) null))
@@ -51,9 +59,43 @@ class ParticipantAckMessageTest {
         assertSerializable(message, ParticipantAckMessage.class);
     }
 
+    @Test
+    void testAppliesTo_NullParticipantId() {
+        message = makeMessage();
+
+        assertThatThrownBy(() -> message.appliesTo(null, null)).isInstanceOf(NullPointerException.class);
+        assertThatThrownBy(() -> message.appliesTo(PTYPE_456, null)).isInstanceOf(NullPointerException.class);
+        assertThatThrownBy(() -> message.appliesTo(null, ID_123)).isInstanceOf(NullPointerException.class);
+    }
+
+    @Test
+    void testAppliesTo_ParticipantIdMatches() {
+        message = makeMessage();
+
+        // ParticipantId matches
+        assertTrue(message.appliesTo(PTYPE_456, ID_123));
+        assertFalse(message.appliesTo(PTYPE_456, ID_124));
+        assertFalse(message.appliesTo(PTYPE_457, ID_123));
+    }
+
+    @Test
+    void testAppliesTo_ParticipantIdNoMatch() {
+        message = makeMessage();
+
+        // ParticipantId does not match
+        ToscaConceptIdentifier id = new ToscaConceptIdentifier();
+        id.setName("id1111");
+        id.setVersion("3.2.1");
+        assertFalse(message.appliesTo(id, id));
+        message.setParticipantType(null);
+        assertTrue(message.appliesTo(id, id));
+    }
+
     private ParticipantAckMessage makeMessage() {
         ParticipantAckMessage msg = new ParticipantAckMessage(ParticipantMessageType.PARTICIPANT_DEREGISTER_ACK);
 
+        msg.setParticipantType(PTYPE_456);
+        msg.setParticipantId(ID_123);
         msg.setMessage("Successfull Ack");
         msg.setResult(true);
         msg.setResponseTo(UUID.randomUUID());
index 924ad8f..58d3afe 100644 (file)
@@ -35,6 +35,11 @@ import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
 class ParticipantMessageTest {
     private ParticipantMessage message;
 
+    private static final ToscaConceptIdentifier PTYPE_456 = new ToscaConceptIdentifier("PType", "4.5.6");
+    private static final ToscaConceptIdentifier PTYPE_457 = new ToscaConceptIdentifier("PType", "4.5.7");
+    private static final ToscaConceptIdentifier ID_123 = new ToscaConceptIdentifier("id", "1.2.3");
+    private static final ToscaConceptIdentifier ID_124 = new ToscaConceptIdentifier("id", "1.2.4");
+
     @Test
     void testCopyConstructor() throws CoderException {
         assertThatThrownBy(() -> new ParticipantMessage((ParticipantMessage) null))
@@ -62,10 +67,8 @@ class ParticipantMessageTest {
         message = makeMessage();
 
         assertThatThrownBy(() -> message.appliesTo(null, null)).isInstanceOf(NullPointerException.class);
-        assertThatThrownBy(() -> message.appliesTo(new ToscaConceptIdentifier("PType", "4.5.6"), null))
-                .isInstanceOf(NullPointerException.class);
-        assertThatThrownBy(() -> message.appliesTo(null, new ToscaConceptIdentifier("id", "1.2.3")))
-                .isInstanceOf(NullPointerException.class);
+        assertThatThrownBy(() -> message.appliesTo(PTYPE_456, null)).isInstanceOf(NullPointerException.class);
+        assertThatThrownBy(() -> message.appliesTo(null, ID_123)).isInstanceOf(NullPointerException.class);
     }
 
     @Test
@@ -73,12 +76,9 @@ class ParticipantMessageTest {
         message = makeMessage();
 
         // ParticipantId matches
-        assertTrue(message.appliesTo(new ToscaConceptIdentifier("PType", "4.5.6"),
-                new ToscaConceptIdentifier("id", "1.2.3")));
-        assertFalse(message.appliesTo(new ToscaConceptIdentifier("PType", "4.5.6"),
-                new ToscaConceptIdentifier("id", "1.2.4")));
-        assertFalse(message.appliesTo(new ToscaConceptIdentifier("PType", "4.5.7"),
-                new ToscaConceptIdentifier("id", "1.2.3")));
+        assertTrue(message.appliesTo(PTYPE_456, ID_123));
+        assertFalse(message.appliesTo(PTYPE_456, ID_124));
+        assertFalse(message.appliesTo(PTYPE_457, ID_123));
     }
 
     @Test
@@ -97,8 +97,8 @@ class ParticipantMessageTest {
     private ParticipantMessage makeMessage() {
         ParticipantMessage msg = new ParticipantMessage(ParticipantMessageType.PARTICIPANT_STATE_CHANGE);
 
-        msg.setParticipantType(new ToscaConceptIdentifier("PType", "4.5.6"));
-        msg.setParticipantId(new ToscaConceptIdentifier("id", "1.2.3"));
+        msg.setParticipantType(PTYPE_456);
+        msg.setParticipantId(ID_123);
         msg.setMessageId(UUID.randomUUID());
         msg.setTimestamp(Instant.ofEpochMilli(3000));
 
index 699df25..8187378 100644 (file)
@@ -85,7 +85,8 @@ class ParticipantMessagesTest {
 
         synchronized (lockit) {
             ParticipantMessagePublisher participantMessagePublisher =
-                    new ParticipantMessagePublisher(Collections.singletonList(Mockito.mock(TopicSink.class)));
+                    new ParticipantMessagePublisher();
+            participantMessagePublisher.active(Collections.singletonList(Mockito.mock(TopicSink.class)));
             participantMessagePublisher.sendParticipantRegister(participantRegisterMsg);
         }
     }
@@ -113,7 +114,8 @@ class ParticipantMessagesTest {
 
         synchronized (lockit) {
             ParticipantMessagePublisher participantMessagePublisher =
-                    new ParticipantMessagePublisher(Collections.singletonList(Mockito.mock(TopicSink.class)));
+                    new ParticipantMessagePublisher();
+            participantMessagePublisher.active(Collections.singletonList(Mockito.mock(TopicSink.class)));
             participantMessagePublisher.sendParticipantDeregister(participantDeregisterMsg);
         }
     }
@@ -153,8 +155,8 @@ class ParticipantMessagesTest {
         participantUpdateAckMsg.setResult(true);
 
         synchronized (lockit) {
-            ParticipantMessagePublisher participantMessagePublisher =
-                    new ParticipantMessagePublisher(Collections.singletonList(Mockito.mock(TopicSink.class)));
+            ParticipantMessagePublisher participantMessagePublisher = new ParticipantMessagePublisher();
+            participantMessagePublisher.active(Collections.singletonList(Mockito.mock(TopicSink.class)));
             participantMessagePublisher.sendParticipantUpdateAck(participantUpdateAckMsg);
         }
     }
@@ -163,8 +165,8 @@ class ParticipantMessagesTest {
     void testParticipantStatusHeartbeat() throws Exception {
         final ParticipantStatus heartbeat = participantHandler.makeHeartbeat(true);
         synchronized (lockit) {
-            ParticipantMessagePublisher publisher =
-                    new ParticipantMessagePublisher(Collections.singletonList(Mockito.mock(TopicSink.class)));
+            ParticipantMessagePublisher publisher = new ParticipantMessagePublisher();
+            publisher.active(Collections.singletonList(Mockito.mock(TopicSink.class)));
             assertThatCode(() -> publisher.sendHeartbeat(heartbeat)).doesNotThrowAnyException();
         }
     }
index 0b9110b..d24f32f 100644 (file)
@@ -21,6 +21,7 @@
 package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
 
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopStateChange;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
 import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
 import org.springframework.stereotype.Component;
 
@@ -39,4 +40,9 @@ public class ControlLoopStateChangeListener extends ParticipantListener<ControlL
         super(ControlLoopStateChange.class, participantHandler,
                 participantHandler::handleControlLoopStateChange);
     }
+
+    @Override
+    public String getType() {
+        return ParticipantMessageType.CONTROL_LOOP_STATE_CHANGE.name();
+    }
 }
index 56bc1fd..f9dec18 100644 (file)
@@ -21,6 +21,7 @@
 package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
 
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopUpdate;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
 import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
 import org.springframework.stereotype.Component;
 
@@ -38,4 +39,9 @@ public class ControlLoopUpdateListener extends ParticipantListener<ControlLoopUp
     public ControlLoopUpdateListener(final ParticipantHandler participantHandler) {
         super(ControlLoopUpdate.class, participantHandler, participantHandler::handleControlLoopUpdate);
     }
+
+    @Override
+    public String getType() {
+        return ParticipantMessageType.CONTROL_LOOP_UPDATE.name();
+    }
 }
index 4b72249..113f75d 100644 (file)
@@ -24,6 +24,7 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
 
 import java.util.function.Consumer;
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantAckMessage;
+import org.onap.policy.clamp.controlloop.participant.intermediary.handler.Listener;
 import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
 import org.onap.policy.common.endpoints.listeners.ScoListener;
@@ -32,7 +33,8 @@ import org.onap.policy.common.utils.coder.StandardCoderObject;
 /**
  * Abstract Listener for Participant Ack messages sent by runtime.
  */
-public abstract class ParticipantAckListener<T extends ParticipantAckMessage> extends ScoListener<T> {
+public abstract class ParticipantAckListener<T extends ParticipantAckMessage> extends ScoListener<T>
+        implements Listener {
 
     private final ParticipantHandler participantHandler;
     private final Consumer<T> consumer;
@@ -52,6 +54,13 @@ public abstract class ParticipantAckListener<T extends ParticipantAckMessage> ex
 
     @Override
     public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco, T message) {
-        consumer.accept(message);
+        if (participantHandler.appliesTo(message)) {
+            consumer.accept(message);
+        }
+    }
+
+    @Override
+    public ScoListener<T> getScoListener() {
+        return this;
     }
 }
index e20f481..5440e00 100644 (file)
@@ -21,6 +21,7 @@
 package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
 
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
 import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
 import org.springframework.stereotype.Component;
 
@@ -39,4 +40,9 @@ public class ParticipantDeregisterAckListener extends ParticipantAckListener<Par
     public ParticipantDeregisterAckListener(final ParticipantHandler participantHandler) {
         super(ParticipantDeregisterAck.class, participantHandler, participantHandler::handleParticipantDeregisterAck);
     }
+
+    @Override
+    public String getType() {
+        return ParticipantMessageType.PARTICIPANT_DEREGISTER_ACK.name();
+    }
 }
index c6ad900..67af5c8 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
 
 import java.util.function.Consumer;
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessage;
+import org.onap.policy.clamp.controlloop.participant.intermediary.handler.Listener;
 import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
 import org.onap.policy.common.endpoints.listeners.ScoListener;
@@ -30,7 +31,7 @@ import org.onap.policy.common.utils.coder.StandardCoderObject;
 /**
  * Abstract Listener for Participant messages sent by CLAMP.
  */
-public abstract class ParticipantListener<T extends ParticipantMessage> extends ScoListener<T> {
+public abstract class ParticipantListener<T extends ParticipantMessage> extends ScoListener<T> implements Listener {
 
     private final ParticipantHandler participantHandler;
     private final Consumer<T> consumer;
@@ -54,4 +55,9 @@ public abstract class ParticipantListener<T extends ParticipantMessage> extends
             consumer.accept(message);
         }
     }
+
+    @Override
+    public ScoListener<T> getScoListener() {
+        return this;
+    }
 }
index d8cc9eb..2941e9f 100644 (file)
 package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
 
 import java.util.List;
+import javax.ws.rs.core.Response.Status;
+import org.onap.policy.clamp.controlloop.common.exception.ControlLoopRuntimeException;
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopAck;
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister;
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister;
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus;
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck;
+import org.onap.policy.clamp.controlloop.participant.intermediary.handler.Publisher;
 import org.onap.policy.common.endpoints.event.comm.TopicSink;
 import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
 
 /**
  * This class is used to send Participant Status messages to clamp using TopicSinkClient.
  *
  */
-public class ParticipantMessagePublisher {
+@Component
+public class ParticipantMessagePublisher implements Publisher {
     private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantMessagePublisher.class);
 
-    private final TopicSinkClient topicSinkClient;
+    private boolean active = false;
+    private TopicSinkClient topicSinkClient;
 
     /**
      * Constructor for instantiating ParticipantMessagePublisher.
      *
      * @param topicSinks the topic sinks
      */
-    public ParticipantMessagePublisher(List<TopicSink> topicSinks) {
+    @Override
+    public void active(List<TopicSink> topicSinks) {
         if (topicSinks.size() != 1) {
             throw new IllegalArgumentException("Configuration unsupported, Topic sinks greater than 1");
         }
         this.topicSinkClient = new TopicSinkClient(topicSinks.get(0));
+        active = true;
     }
 
     /**
@@ -58,6 +66,9 @@ public class ParticipantMessagePublisher {
      * @param participantStatus the Participant Status
      */
     public void sendParticipantStatus(final ParticipantStatus participantStatus) {
+        if (!active) {
+            throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!");
+        }
         topicSinkClient.send(participantStatus);
         LOGGER.debug("Sent Participant Status message to CLAMP - {}", participantStatus);
     }
@@ -68,6 +79,9 @@ public class ParticipantMessagePublisher {
      * @param participantRegister the Participant Status
      */
     public void sendParticipantRegister(final ParticipantRegister participantRegister) {
+        if (!active) {
+            throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!");
+        }
         topicSinkClient.send(participantRegister);
         LOGGER.debug("Sent Participant Register message to CLAMP - {}", participantRegister);
     }
@@ -78,6 +92,9 @@ public class ParticipantMessagePublisher {
      * @param participantDeregister the Participant Status
      */
     public void sendParticipantDeregister(final ParticipantDeregister participantDeregister) {
+        if (!active) {
+            throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!");
+        }
         topicSinkClient.send(participantDeregister);
         LOGGER.debug("Sent Participant Deregister message to CLAMP - {}", participantDeregister);
     }
@@ -88,6 +105,9 @@ public class ParticipantMessagePublisher {
      * @param participantUpdateAck the Participant Update Ack
      */
     public void sendParticipantUpdateAck(final ParticipantUpdateAck participantUpdateAck) {
+        if (!active) {
+            throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!");
+        }
         topicSinkClient.send(participantUpdateAck);
         LOGGER.debug("Sent Participant Update Ack message to CLAMP - {}", participantUpdateAck);
     }
@@ -98,6 +118,9 @@ public class ParticipantMessagePublisher {
      * @param controlLoopAck ControlLoop Update/StateChange Ack
      */
     public void sendControlLoopAck(final ControlLoopAck controlLoopAck) {
+        if (!active) {
+            throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!");
+        }
         topicSinkClient.send(controlLoopAck);
         LOGGER.debug("Sent ControlLoop Update/StateChange Ack to runtime - {}", controlLoopAck);
     }
@@ -108,7 +131,15 @@ public class ParticipantMessagePublisher {
      * @param participantStatus the Participant Status
      */
     public void sendHeartbeat(final ParticipantStatus participantStatus) {
+        if (!active) {
+            throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!");
+        }
         topicSinkClient.send(participantStatus);
         LOGGER.debug("Sent Participant heartbeat to CLAMP - {}", participantStatus);
     }
+
+    @Override
+    public void stop() {
+        active = false;
+    }
 }
index a15a2a8..7be4608 100644 (file)
@@ -20,6 +20,7 @@
 
 package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
 
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck;
 import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
 import org.springframework.stereotype.Component;
@@ -39,4 +40,9 @@ public class ParticipantRegisterAckListener extends ParticipantAckListener<Parti
     public ParticipantRegisterAckListener(final ParticipantHandler participantHandler) {
         super(ParticipantRegisterAck.class, participantHandler, participantHandler::handleParticipantRegisterAck);
     }
+
+    @Override
+    public String getType() {
+        return ParticipantMessageType.PARTICIPANT_REGISTER_ACK.name();
+    }
 }
index 0881edb..9e978fe 100644 (file)
@@ -20,6 +20,7 @@
 
 package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
 
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatusReq;
 import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
 import org.springframework.stereotype.Component;
@@ -38,4 +39,9 @@ public class ParticipantStatusReqListener extends ParticipantListener<Participan
     public ParticipantStatusReqListener(final ParticipantHandler participantHandler) {
         super(ParticipantStatusReq.class, participantHandler, participantHandler::handleParticipantStatusReq);
     }
+
+    @Override
+    public String getType() {
+        return ParticipantMessageType.PARTICIPANT_STATUS_REQ.name();
+    }
 }
index 42bd52d..da45501 100644 (file)
@@ -20,6 +20,7 @@
 
 package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
 
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdate;
 import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
 import org.springframework.stereotype.Component;
@@ -38,4 +39,9 @@ public class ParticipantUpdateListener extends ParticipantListener<ParticipantUp
     public ParticipantUpdateListener(final ParticipantHandler participantHandler) {
         super(ParticipantUpdate.class, participantHandler, participantHandler::handleParticipantUpdate);
     }
+
+    @Override
+    public String getType() {
+        return ParticipantMessageType.PARTICIPANT_UPDATE.name();
+    }
 }
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/config/BeanFactory.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/config/BeanFactory.java
deleted file mode 100644 (file)
index e363504..0000000
+++ /dev/null
@@ -1,55 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- *  Copyright (C) 2021 Nordix Foundation.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.clamp.controlloop.participant.intermediary.config;
-
-import java.util.List;
-import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantMessagePublisher;
-import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-@Configuration
-public class BeanFactory {
-
-    // Name of the message type for messages on topics
-    private static final String[] MSG_TYPE_NAMES = {"messageType"};
-
-    /**
-     * create ParticipantMessagePublisher.
-     *
-     * @param parameters the ParticipantParameters
-     * @return ParticipantMessagePublisher
-     */
-    @Bean
-    public ParticipantMessagePublisher publisher(final ParticipantParameters parameters) {
-        List<TopicSink> topicSinks = TopicEndpointManager.getManager()
-                .addTopicSinks(parameters.getIntermediaryParameters().getClampControlLoopTopics().getTopicSinks());
-        return new ParticipantMessagePublisher(topicSinks);
-    }
-
-    @Bean
-    public MessageTypeDispatcher msgDispatcher() {
-        return new MessageTypeDispatcher(MSG_TYPE_NAMES);
-    }
-}
index 4fc0ae1..754bf28 100644 (file)
@@ -23,20 +23,13 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.handler;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
 import org.onap.policy.clamp.controlloop.participant.intermediary.api.ParticipantIntermediaryApi;
-import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ControlLoopStateChangeListener;
-import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ControlLoopUpdateListener;
-import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantDeregisterAckListener;
-import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantRegisterAckListener;
-import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantStatusReqListener;
-import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantUpdateListener;
 import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters;
 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
 import org.onap.policy.common.endpoints.event.comm.TopicSource;
 import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
 import org.onap.policy.common.utils.services.ServiceManagerContainer;
-import org.springframework.context.ApplicationContext;
 import org.springframework.context.event.ContextClosedEvent;
 import org.springframework.context.event.ContextRefreshedEvent;
 import org.springframework.context.event.EventListener;
@@ -48,32 +41,50 @@ import org.springframework.stereotype.Component;
 @Component
 public class IntermediaryActivator extends ServiceManagerContainer implements Closeable {
 
-    private final ApplicationContext applicationContext;
+    private static final String[] MSG_TYPE_NAMES = {"messageType"};
 
     // Topics from which the participant receives and to which the participant sends messages
+    private List<TopicSink> topicSinks;
     private List<TopicSource> topicSources;
 
     ParticipantIntermediaryApi participantIntermediaryApi;
 
+    private final MessageTypeDispatcher msgDispatcher;
+
     /**
      * Instantiate the activator for participant.
      *
-     * @param applicationContext ApplicationContext
      * @param parameters the ParticipantParameters
+     * @param publishers list of Publishers
+     * @param listeners list of Listeners
      */
-    public IntermediaryActivator(final ApplicationContext applicationContext, final ParticipantParameters parameters,
-            ParticipantIntermediaryApi participantIntermediaryApi) {
-        this.applicationContext = applicationContext;
+    public IntermediaryActivator(final ParticipantParameters parameters,
+            ParticipantIntermediaryApi participantIntermediaryApi, List<Publisher> publishers,
+            List<Listener> listeners) {
         this.participantIntermediaryApi = participantIntermediaryApi;
 
+        topicSinks = TopicEndpointManager.getManager()
+                .addTopicSinks(parameters.getIntermediaryParameters().getClampControlLoopTopics().getTopicSinks());
+
         topicSources = TopicEndpointManager.getManager()
                 .addTopicSources(parameters.getIntermediaryParameters().getClampControlLoopTopics().getTopicSources());
 
-        // @formatter:off
+        msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
 
+        // @formatter:off
         addAction("Topic endpoint management",
-            () -> TopicEndpointManager.getManager().start(),
-            () -> TopicEndpointManager.getManager().shutdown());
+                () -> TopicEndpointManager.getManager().start(),
+                () -> TopicEndpointManager.getManager().shutdown());
+
+        publishers.forEach(publisher ->
+            addAction("Publisher " + publisher.getClass().getSimpleName(),
+                () -> publisher.active(topicSinks),
+                publisher::stop));
+
+        listeners.forEach(listener ->
+            addAction("Listener " + listener.getClass().getSimpleName(),
+                    () -> msgDispatcher.register(listener.getType(), listener.getScoListener()),
+                    () -> msgDispatcher.unregister(listener.getType())));
 
         addAction("Topic Message Dispatcher", this::registerMsgDispatcher, this::unregisterMsgDispatcher);
         // @formatter:on
@@ -117,26 +128,6 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
      * Registers the dispatcher with the topic source(s).
      */
     private void registerMsgDispatcher() {
-        MessageTypeDispatcher msgDispatcher = applicationContext.getBean(MessageTypeDispatcher.class);
-
-        msgDispatcher.register(ParticipantMessageType.PARTICIPANT_STATUS_REQ.name(),
-                applicationContext.getBean(ParticipantStatusReqListener.class));
-
-        msgDispatcher.register(ParticipantMessageType.CONTROL_LOOP_STATE_CHANGE.name(),
-                applicationContext.getBean(ControlLoopStateChangeListener.class));
-
-        msgDispatcher.register(ParticipantMessageType.CONTROL_LOOP_UPDATE.name(),
-                applicationContext.getBean(ControlLoopUpdateListener.class));
-
-        msgDispatcher.register(ParticipantMessageType.PARTICIPANT_REGISTER_ACK.name(),
-                applicationContext.getBean(ParticipantRegisterAckListener.class));
-
-        msgDispatcher.register(ParticipantMessageType.PARTICIPANT_DEREGISTER_ACK.name(),
-                applicationContext.getBean(ParticipantDeregisterAckListener.class));
-
-        msgDispatcher.register(ParticipantMessageType.PARTICIPANT_UPDATE.name(),
-                applicationContext.getBean(ParticipantUpdateListener.class));
-
         for (final TopicSource source : topicSources) {
             source.register(msgDispatcher);
         }
@@ -146,8 +137,6 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
      * Unregisters the dispatcher from the topic source(s).
      */
     private void unregisterMsgDispatcher() {
-        MessageTypeDispatcher msgDispatcher = applicationContext.getBean(MessageTypeDispatcher.class);
-
         for (final TopicSource source : topicSources) {
             source.unregister(msgDispatcher);
         }
@@ -155,6 +144,8 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
 
     @Override
     public void close() throws IOException {
-        super.shutdown();
+        if (isAlive()) {
+            super.shutdown();
+        }
     }
 }
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Listener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Listener.java
new file mode 100644 (file)
index 0000000..bca71af
--- /dev/null
@@ -0,0 +1,40 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.participant.intermediary.handler;
+
+import org.onap.policy.common.endpoints.listeners.ScoListener;
+
+public interface Listener {
+
+    /**
+     * Get the type of message of interest to the listener.
+     *
+     * @return type of message of interest to the listener
+     */
+    String getType();
+
+    /**
+     * Get listener to register.
+     *
+     * @return listener to register
+     */
+    <T> ScoListener<T> getScoListener();
+}
index 1947fda..66e09e7 100644 (file)
@@ -43,6 +43,7 @@ import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant
 import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantStatistics;
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopStateChange;
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopUpdate;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantAckMessage;
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister;
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck;
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessage;
@@ -196,6 +197,16 @@ public class ParticipantHandler implements Closeable {
         return participantMsg.appliesTo(participantType, participantId);
     }
 
+    /**
+     * Check if a participant message applies to this participant handler.
+     *
+     * @param participantMsg the message to check
+     * @return true if it applies, false otherwise
+     */
+    public boolean appliesTo(ParticipantAckMessage participantMsg) {
+        return participantMsg.appliesTo(participantType, participantId);
+    }
+
     /**
      * Method to send ParticipantRegister message to controlloop runtime.
      */
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Publisher.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Publisher.java
new file mode 100644 (file)
index 0000000..287d7c0
--- /dev/null
@@ -0,0 +1,34 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.participant.intermediary.handler;
+
+import java.util.List;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+
+/**
+ * Publisher.
+ */
+public interface Publisher {
+
+    void active(List<TopicSink> topicSinks);
+
+    void stop();
+}
index 891dab9..d196dd1 100644 (file)
@@ -23,8 +23,6 @@ package org.onap.policy.clamp.controlloop.runtime.config.messaging;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
-import java.util.stream.Stream;
-import javax.ws.rs.core.Response.Status;
 import lombok.Getter;
 import org.onap.policy.clamp.controlloop.common.exception.ControlLoopRuntimeException;
 import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup;
@@ -33,6 +31,7 @@ import org.onap.policy.common.endpoints.event.comm.TopicSink;
 import org.onap.policy.common.endpoints.event.comm.TopicSource;
 import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
 import org.onap.policy.common.utils.services.ServiceManagerContainer;
+import org.springframework.context.event.ContextClosedEvent;
 import org.springframework.context.event.ContextRefreshedEvent;
 import org.springframework.context.event.EventListener;
 import org.springframework.stereotype.Component;
@@ -53,36 +52,31 @@ public class MessageDispatcherActivator extends ServiceManagerContainer implemen
      * Constructor.
      *
      * @param clRuntimeParameterGroup the parameters for the control loop runtime service
-     * @param publishers array of Publishers
-     * @param listeners array of Listeners
+     * @param publishers list of Publishers
+     * @param listeners list of Listeners
      * @throws ControlLoopRuntimeException if the activator does not start
      */
-    public MessageDispatcherActivator(final ClRuntimeParameterGroup clRuntimeParameterGroup, Publisher[] publishers,
-            Listener[] listeners) {
+    public MessageDispatcherActivator(final ClRuntimeParameterGroup clRuntimeParameterGroup, List<Publisher> publishers,
+            List<Listener> listeners) {
         topicSinks = TopicEndpointManager.getManager()
                 .addTopicSinks(clRuntimeParameterGroup.getTopicParameterGroup().getTopicSinks());
 
         topicSources = TopicEndpointManager.getManager()
                 .addTopicSources(clRuntimeParameterGroup.getTopicParameterGroup().getTopicSources());
 
-        try {
-            msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
-        } catch (final RuntimeException e) {
-            throw new ControlLoopRuntimeException(Status.INTERNAL_SERVER_ERROR,
-                    "topic message dispatcher failed to start", e);
-        }
+        msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
 
         // @formatter:off
         addAction("Topic endpoint management",
                 () -> TopicEndpointManager.getManager().start(),
                 () -> TopicEndpointManager.getManager().shutdown());
 
-        Stream.of(publishers).forEach(publisher ->
+        publishers.forEach(publisher ->
             addAction("Publisher " + publisher.getClass().getSimpleName(),
                 () -> publisher.active(topicSinks),
-                () -> publisher.stop()));
+                publisher::stop));
 
-        Stream.of(listeners).forEach(listener ->
+        listeners.forEach(listener ->
             addAction("Listener " + listener.getClass().getSimpleName(),
                     () -> msgDispatcher.register(listener.getType(), listener.getScoListener()),
                     () -> msgDispatcher.unregister(listener.getType())));
@@ -121,10 +115,22 @@ public class MessageDispatcherActivator extends ServiceManagerContainer implemen
         }
     }
 
+    /**
+     * Handle ContextClosedEvent.
+     *
+     * @param ctxClosedEvent ContextClosedEvent
+     */
+    @EventListener
+    public void handleContextClosedEvent(ContextClosedEvent ctxClosedEvent) {
+        if (isAlive()) {
+            stop();
+        }
+    }
+
     @Override
     public void close() throws IOException {
         if (isAlive()) {
-            stop();
+            super.shutdown();
         }
     }
 }
index 2cc0f94..b395734 100644 (file)
@@ -139,7 +139,8 @@ public class SupervisionHandler {
     public void handleParticipantMessage(ParticipantRegister participantRegisterMessage) {
         LOGGER.debug("Participant Register received {}", participantRegisterMessage);
 
-        participantRegisterAckPublisher.send(participantRegisterMessage.getMessageId());
+        participantRegisterAckPublisher.send(participantRegisterMessage.getMessageId(),
+                participantRegisterMessage.getParticipantId(), participantRegisterMessage.getParticipantType());
 
         participantUpdatePublisher.send(participantRegisterMessage.getParticipantId(),
                 participantRegisterMessage.getParticipantType(), true);
@@ -358,15 +359,15 @@ public class SupervisionHandler {
             throws PfModelException, ControlLoopException {
         if (participantStatusMessage.getControlLoopInfoList() != null) {
             for (ControlLoopInfo clEntry : participantStatusMessage.getControlLoopInfoList()) {
-                var dbControlLoop = controlLoopProvider.getControlLoop(
-                        new ToscaConceptIdentifier(clEntry.getControlLoopId()));
+                var dbControlLoop =
+                        controlLoopProvider.getControlLoop(new ToscaConceptIdentifier(clEntry.getControlLoopId()));
                 if (dbControlLoop == null) {
                     exceptionOccured(Response.Status.NOT_FOUND,
                             "PARTICIPANT_STATUS control loop not found in database: " + clEntry.getControlLoopId());
                 }
                 dbControlLoop.setState(clEntry.getState());
-                monitoringProvider.createClElementStatistics(clEntry.getControlLoopStatistics()
-                        .getClElementStatisticsList().getClElementStatistics());
+                monitoringProvider.createClElementStatistics(
+                        clEntry.getControlLoopStatistics().getClElementStatisticsList().getClElementStatistics());
             }
         }
     }
index 151b04c..a053379 100644 (file)
@@ -147,7 +147,7 @@ public class SupervisionScanner {
 
                 if (participantUpdateCounter.count(id)) {
                     LOGGER.debug("retry message ParticipantUpdate");
-                    participantUpdatePublisher.send(id.getLeft(), id.getRight());
+                    participantUpdatePublisher.send(id.getLeft(), id.getRight(), true);
                 } else {
                     LOGGER.debug("report Participant Update fault");
                     participantUpdateCounter.setFault(id);
index 73860b5..8cbaec8 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
 
 import java.util.UUID;
 import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
 import org.springframework.stereotype.Component;
 
 /**
@@ -34,9 +35,13 @@ public class ParticipantRegisterAckPublisher extends AbstractParticipantAckPubli
      * Send ParticipantRegisterAck to Participant.
      *
      * @param responseTo the original request id in the request.
+     * @param participantId the participant Id
+     * @param participantType the participant Type
      */
-    public void send(UUID responseTo) {
+    public void send(UUID responseTo, ToscaConceptIdentifier participantId, ToscaConceptIdentifier participantType) {
         var message = new ParticipantRegisterAck();
+        message.setParticipantId(participantId);
+        message.setParticipantType(participantType);
         message.setResponseTo(responseTo);
         message.setMessage("Participant Register Ack");
         message.setResult(true);
index 461c8b5..936bb14 100644 (file)
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.List;
 import org.junit.jupiter.api.Test;
 import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup;
 import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantStatusListener;
@@ -56,7 +57,7 @@ class MessageDispatcherActivatorTest {
 
         var publisherFirst = spy(mock(Publisher.class));
         var publisherSecond = spy(mock(Publisher.class));
-        var publishers = new Publisher[] {publisherFirst, publisherSecond};
+        var publishers = List.of(publisherFirst, publisherSecond);
 
         var listenerFirst = spy(mock(ParticipantStatusListener.class));
         when(listenerFirst.getType()).thenReturn(TOPIC_FIRST);
@@ -66,7 +67,7 @@ class MessageDispatcherActivatorTest {
         when(listenerSecond.getType()).thenReturn(TOPIC_SECOND);
         when(listenerSecond.getScoListener()).thenReturn(listenerSecond);
 
-        var listeners = new Listener[] {listenerFirst, listenerSecond};
+        List<Listener> listeners = List.of(listenerFirst, listenerSecond);
 
         try (var activator = new MessageDispatcherActivator(parameterGroup, publishers, listeners)) {