Add support for unique replica id generation in the participants 11/138111/2
authorFrancescoFioraEst <francesco.fiora@est.tech>
Tue, 4 Jun 2024 08:11:31 +0000 (09:11 +0100)
committerFrancesco Fiora <francesco.fiora@est.tech>
Thu, 6 Jun 2024 08:30:37 +0000 (08:30 +0000)
Issue-ID: POLICY-5032
Change-Id: I9c36b87d1f03f03089d2c62308e0975e24f6e99a
Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
12 files changed:
models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantAckMessage.java
models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantMessage.java
models/src/test/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantAckMessageTest.java
models/src/test/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantMessageTest.java
models/src/test/java/org/onap/policy/clamp/models/acm/utils/CommonTestData.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcDefinitionHandler.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandler.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java
participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandlerTest.java
participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/main/parameters/CommonTestData.java

index fa3e3d0..7ea3310 100644 (file)
@@ -57,6 +57,8 @@ public class ParticipantAckMessage {
      */
     private UUID participantId;
 
+    private UUID replicaId;
+
     /**
      * Participant State, or {@code null} for messages from participants.
      */
@@ -82,7 +84,9 @@ public class ParticipantAckMessage {
         this.stateChangeResult = source.stateChangeResult;
         this.message = source.message;
         this.messageType = source.messageType;
+        this.compositionId = source.compositionId;
         this.participantId = source.participantId;
+        this.replicaId = source.replicaId;
         this.state = source.state;
     }
 
@@ -90,15 +94,17 @@ public class ParticipantAckMessage {
      * Determines if this message applies to this participant type.
      *
      * @param participantId id of the participant to match against
+     * @param replicaId id of the participant to match against
      * @return {@code true} if this message applies to this participant, {@code false} otherwise
      */
-    public boolean appliesTo(@NonNull final UUID participantId) {
+    public boolean appliesTo(@NonNull final UUID participantId, @NonNull final UUID replicaId) {
         // Broadcast message to all participants
-        if (this.participantId == null) {
+        if ((this.participantId == null)
+                || (participantId.equals(this.participantId) && this.replicaId == null)) {
             return true;
         }
 
         // Targeted message at this specific participant
-        return participantId.equals(this.participantId);
+        return replicaId.equals(this.replicaId);
     }
 }
index 304db8e..f8aea94 100644 (file)
@@ -51,6 +51,8 @@ public class ParticipantMessage {
      */
     private UUID participantId;
 
+    private UUID replicaId;
+
     /**
      * Automation Composition ID, or {@code null} for messages to participants.
      */
@@ -75,6 +77,7 @@ public class ParticipantMessage {
     public ParticipantMessage(final ParticipantMessage source) {
         this.messageType = source.messageType;
         this.participantId = source.participantId;
+        this.replicaId = source.replicaId;
         this.automationCompositionId = source.automationCompositionId;
         this.compositionId = source.compositionId;
     }
@@ -83,15 +86,17 @@ public class ParticipantMessage {
      * Determines if this message applies to this participant type.
      *
      * @param participantId id of the participant to match against
+     * @param replicaId id of the participant to match against
      * @return {@code true} if this message applies to this participant, {@code false} otherwise
      */
-    public boolean appliesTo(@NonNull final UUID participantId) {
+    public boolean appliesTo(@NonNull final UUID participantId, @NonNull final UUID replicaId) {
         // Broadcast message to all participants
-        if (this.participantId == null) {
+        if ((this.participantId == null)
+                || (participantId.equals(this.participantId) && this.replicaId == null)) {
             return true;
         }
 
         // Targeted message at this specific participant
-        return participantId.equals(this.participantId);
+        return replicaId.equals(this.replicaId);
     }
 }
index b6af01c..72e4efb 100644 (file)
@@ -57,7 +57,8 @@ class ParticipantAckMessageTest {
     @Test
     void testAppliesTo_NullParticipantId() {
         message = makeMessage();
-        assertThatThrownBy(() -> message.appliesTo(null)).isInstanceOf(NullPointerException.class);
+        assertThatThrownBy(() -> message.appliesTo(UUID.randomUUID(), null)).isInstanceOf(NullPointerException.class);
+        assertThatThrownBy(() -> message.appliesTo(null, UUID.randomUUID())).isInstanceOf(NullPointerException.class);
     }
 
     @Test
@@ -65,8 +66,8 @@ class ParticipantAckMessageTest {
         message = makeMessage();
 
         // ParticipantId matches
-        assertTrue(message.appliesTo(CommonTestData.getParticipantId()));
-        assertFalse(message.appliesTo(CommonTestData.getRndParticipantId()));
+        assertTrue(message.appliesTo(CommonTestData.getParticipantId(), CommonTestData.getReplicaId()));
+        assertFalse(message.appliesTo(CommonTestData.getRndParticipantId(), CommonTestData.getReplicaId()));
     }
 
     @Test
@@ -74,8 +75,8 @@ class ParticipantAckMessageTest {
         message = makeMessage();
 
         // ParticipantId does not match
-        assertFalse(message.appliesTo(CommonTestData.getRndParticipantId()));
-        assertTrue(message.appliesTo(CommonTestData.getParticipantId()));
+        assertFalse(message.appliesTo(CommonTestData.getRndParticipantId(), CommonTestData.getReplicaId()));
+        assertTrue(message.appliesTo(CommonTestData.getParticipantId(), CommonTestData.getReplicaId()));
     }
 
     private ParticipantAckMessage makeMessage() {
index 541d8ef..db31d0f 100644 (file)
@@ -61,7 +61,8 @@ class ParticipantMessageTest {
     void testAppliesTo_NullParticipantId() {
         message = makeMessage();
 
-        assertThatThrownBy(() -> message.appliesTo(null)).isInstanceOf(NullPointerException.class);
+        assertThatThrownBy(() -> message.appliesTo(UUID.randomUUID(), null)).isInstanceOf(NullPointerException.class);
+        assertThatThrownBy(() -> message.appliesTo(null, UUID.randomUUID())).isInstanceOf(NullPointerException.class);
     }
 
     @Test
@@ -69,15 +70,15 @@ class ParticipantMessageTest {
         message = makeMessage();
 
         // ParticipantId matches
-        assertTrue(message.appliesTo(CommonTestData.getParticipantId()));
-        assertFalse(message.appliesTo(CommonTestData.getRndParticipantId()));
+        assertTrue(message.appliesTo(CommonTestData.getParticipantId(), CommonTestData.getReplicaId()));
+        assertFalse(message.appliesTo(CommonTestData.getRndParticipantId(), CommonTestData.getReplicaId()));
     }
 
     @Test
     void testAppliesTo_ParticipantIdNoMatch() {
         message = makeMessage();
-        assertFalse(message.appliesTo(CommonTestData.getRndParticipantId()));
-        assertTrue(message.appliesTo(CommonTestData.getParticipantId()));
+        assertFalse(message.appliesTo(CommonTestData.getRndParticipantId(), CommonTestData.getReplicaId()));
+        assertTrue(message.appliesTo(CommonTestData.getParticipantId(), CommonTestData.getReplicaId()));
     }
 
     private ParticipantMessage makeMessage() {
index 131c8ee..b8075c3 100644 (file)
@@ -38,9 +38,9 @@ import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate;
 public class CommonTestData {
 
     public static final UUID PARTCICIPANT_ID = UUID.randomUUID();
+    public static final UUID REPLICA_ID = UUID.randomUUID();
     private static final StandardYamlCoder YAML_TRANSLATOR = new StandardYamlCoder();
 
-
     /**
      * Returns participantId for test cases.
      *
@@ -50,6 +50,15 @@ public class CommonTestData {
         return PARTCICIPANT_ID;
     }
 
+    /**
+     * Returns participantId for test cases.
+     *
+     * @return participant Id
+     */
+    public static UUID getReplicaId() {
+        return REPLICA_ID;
+    }
+
     /**
      * Returns participantId for test Jpa cases.
      *
index e1d4b09..d3ad4cf 100644 (file)
@@ -91,6 +91,7 @@ public class AcDefinitionHandler {
             participantPrimeAck.setCompositionState(AcTypeState.COMMISSIONED);
             participantPrimeAck.setStateChangeResult(StateChangeResult.NO_ERROR);
             participantPrimeAck.setParticipantId(cacheProvider.getParticipantId());
+            participantPrimeAck.setReplicaId(cacheProvider.getReplicaId());
             participantPrimeAck.setState(ParticipantState.ON_LINE);
             publisher.sendParticipantPrimeAck(participantPrimeAck);
             return;
index 5c54861..a3eafd8 100644 (file)
@@ -77,6 +77,7 @@ public class AutomationCompositionHandler {
                 var automationCompositionAck = new AutomationCompositionDeployAck(
                         ParticipantMessageType.AUTOMATION_COMPOSITION_STATECHANGE_ACK);
                 automationCompositionAck.setParticipantId(cacheProvider.getParticipantId());
+                automationCompositionAck.setReplicaId(cacheProvider.getReplicaId());
                 automationCompositionAck.setMessage("Already deleted or never used");
                 automationCompositionAck.setResult(true);
                 automationCompositionAck.setStateChangeResult(StateChangeResult.NO_ERROR);
index 0ed333e..1f4c036 100644 (file)
@@ -103,6 +103,7 @@ public class AutomationCompositionOutHandler {
         var automationCompositionStateChangeAck =
                 new AutomationCompositionDeployAck(ParticipantMessageType.AUTOMATION_COMPOSITION_STATECHANGE_ACK);
         automationCompositionStateChangeAck.setParticipantId(cacheProvider.getParticipantId());
+        automationCompositionStateChangeAck.setReplicaId(cacheProvider.getReplicaId());
         automationCompositionStateChangeAck.setMessage(message);
         automationCompositionStateChangeAck.setResponseTo(cacheProvider.getMsgIdentification().get(element.getId()));
         automationCompositionStateChangeAck.setStateChangeResult(stateChangeResult);
@@ -228,6 +229,7 @@ public class AutomationCompositionOutHandler {
         participantPrimeAck.setCompositionState(state);
         participantPrimeAck.setStateChangeResult(stateChangeResult);
         participantPrimeAck.setParticipantId(cacheProvider.getParticipantId());
+        participantPrimeAck.setReplicaId(cacheProvider.getReplicaId());
         participantPrimeAck.setState(ParticipantState.ON_LINE);
         publisher.sendParticipantPrimeAck(participantPrimeAck);
         cacheProvider.getMsgIdentification().remove(compositionId);
@@ -286,6 +288,7 @@ public class AutomationCompositionOutHandler {
     private ParticipantStatus createParticipantStatus() {
         var statusMsg = new ParticipantStatus();
         statusMsg.setParticipantId(cacheProvider.getParticipantId());
+        statusMsg.setReplicaId(cacheProvider.getReplicaId());
         statusMsg.setState(ParticipantState.ON_LINE);
         statusMsg.setParticipantSupportedElementType(cacheProvider.getSupportedAcElementTypes());
         return statusMsg;
index 343f8a9..b85a3c3 100644 (file)
@@ -53,6 +53,9 @@ public class CacheProvider {
     @Setter
     private boolean registered = false;
 
+    @Getter
+    private final UUID replicaId;
+
     private final List<ParticipantSupportedElementType> supportedAcElementTypes;
 
     @Getter
@@ -73,6 +76,7 @@ public class CacheProvider {
     public CacheProvider(ParticipantParameters parameters) {
         this.participantId = parameters.getIntermediaryParameters().getParticipantId();
         this.supportedAcElementTypes = parameters.getIntermediaryParameters().getParticipantSupportedElementTypes();
+        this.replicaId = UUID.randomUUID();
     }
 
     public List<ParticipantSupportedElementType> getSupportedAcElementTypes() {
index 0865dca..54a0591 100644 (file)
@@ -126,7 +126,7 @@ public class ParticipantHandler {
      * @return true if it applies, false otherwise
      */
     public boolean appliesTo(ParticipantMessage participantMsg) {
-        return participantMsg.appliesTo(cacheProvider.getParticipantId());
+        return participantMsg.appliesTo(cacheProvider.getParticipantId(), cacheProvider.getReplicaId());
     }
 
     /**
@@ -136,7 +136,7 @@ public class ParticipantHandler {
      * @return true if it applies, false otherwise
      */
     public boolean appliesTo(ParticipantAckMessage participantMsg) {
-        return participantMsg.appliesTo(cacheProvider.getParticipantId());
+        return participantMsg.appliesTo(cacheProvider.getParticipantId(), cacheProvider.getReplicaId());
     }
 
     /**
@@ -145,6 +145,7 @@ public class ParticipantHandler {
     public void sendParticipantRegister() {
         var participantRegister = new ParticipantRegister();
         participantRegister.setParticipantId(cacheProvider.getParticipantId());
+        participantRegister.setReplicaId(cacheProvider.getReplicaId());
         participantRegister.setParticipantSupportedElementType(cacheProvider.getSupportedAcElementTypes());
 
         publisher.sendParticipantRegister(participantRegister);
@@ -169,6 +170,7 @@ public class ParticipantHandler {
     public void sendParticipantDeregister() {
         var participantDeregister = new ParticipantDeregister();
         participantDeregister.setParticipantId(cacheProvider.getParticipantId());
+        participantDeregister.setReplicaId(cacheProvider.getReplicaId());
         publisher.sendParticipantDeregister(participantDeregister);
     }
 
@@ -225,6 +227,7 @@ public class ParticipantHandler {
     private ParticipantStatus makeHeartbeat() {
         var heartbeat = new ParticipantStatus();
         heartbeat.setParticipantId(cacheProvider.getParticipantId());
+        heartbeat.setReplicaId(cacheProvider.getReplicaId());
         heartbeat.setState(ParticipantState.ON_LINE);
         heartbeat.setParticipantSupportedElementType(cacheProvider.getSupportedAcElementTypes());
 
index cd28d41..eb1db47 100644 (file)
@@ -125,6 +125,7 @@ class ParticipantHandlerTest {
     void appliesToTest() {
         var cacheProvider = mock(CacheProvider.class);
         when(cacheProvider.getParticipantId()).thenReturn(CommonTestData.getParticipantId());
+        when(cacheProvider.getReplicaId()).thenReturn(CommonTestData.getReplicaId());
         var participantHandler = new ParticipantHandler(mock(AutomationCompositionHandler.class),
                 mock(AcLockHandler.class), mock(AcDefinitionHandler.class), mock(ParticipantMessagePublisher.class),
                 cacheProvider);
index 3011c91..1536a0b 100644 (file)
@@ -62,6 +62,7 @@ public class CommonTestData {
     public static final UUID AC_ID_0 = UUID.randomUUID();
     public static final UUID AC_ID_1 = UUID.randomUUID();
     public static final UUID PARTCICIPANT_ID = UUID.randomUUID();
+    public static final UUID REPLICA_ID = UUID.randomUUID();
 
     /**
      * Get ParticipantIntermediaryParameters.
@@ -160,6 +161,10 @@ public class CommonTestData {
         return PARTCICIPANT_ID;
     }
 
+    public static UUID getReplicaId() {
+        return REPLICA_ID;
+    }
+
     public static UUID getRndParticipantId() {
         return UUID.randomUUID();
     }