5de6a4c74b1d615c8ebd26e52a7685e130abe6d9
[policy/clamp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2023-2024 Nordix Foundation.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.clamp.acm.runtime.supervision;
22
23 import io.micrometer.core.annotation.Timed;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.UUID;
28 import lombok.AllArgsConstructor;
29 import org.apache.commons.collections4.MapUtils;
30 import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
31 import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantDeregisterAckPublisher;
32 import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRegisterAckPublisher;
33 import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
34 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
35 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
36 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
37 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionInfo;
38 import org.onap.policy.clamp.models.acm.concepts.Participant;
39 import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition;
40 import org.onap.policy.clamp.models.acm.concepts.ParticipantReplica;
41 import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
42 import org.onap.policy.clamp.models.acm.concepts.ParticipantSupportedElementType;
43 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantDeregister;
44 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRegister;
45 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatus;
46 import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
47 import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider;
48 import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider;
49 import org.onap.policy.clamp.models.acm.utils.TimestampHelper;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52 import org.springframework.stereotype.Component;
53
54 /**
55  * This class handles supervision of participant status.
56  */
57 @Component
58 @AllArgsConstructor
59 public class SupervisionParticipantHandler {
60     private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionParticipantHandler.class);
61
62     private final ParticipantProvider participantProvider;
63     private final ParticipantRegisterAckPublisher participantRegisterAckPublisher;
64     private final ParticipantDeregisterAckPublisher participantDeregisterAckPublisher;
65     private final AutomationCompositionProvider automationCompositionProvider;
66     private final AcDefinitionProvider acDefinitionProvider;
67     private final ParticipantSyncPublisher participantSyncPublisher;
68     private final AcRuntimeParameterGroup acRuntimeParameterGroup;
69
70     /**
71      * Handle a ParticipantRegister message from a participant.
72      *
73      * @param participantRegisterMsg the ParticipantRegister message received from a participant
74      */
75     @Timed(value = "listener.participant_register", description = "PARTICIPANT_REGISTER messages received")
76     public void handleParticipantMessage(ParticipantRegister participantRegisterMsg) {
77         saveIfNotPresent(participantRegisterMsg.getReplicaId(),
78                 participantRegisterMsg.getParticipantId(),
79                 participantRegisterMsg.getParticipantSupportedElementType(), true);
80
81         participantRegisterAckPublisher.send(participantRegisterMsg.getMessageId(),
82                 participantRegisterMsg.getParticipantId(), participantRegisterMsg.getReplicaId());
83     }
84
85     /**
86      * Handle a ParticipantDeregister message from a participant.
87      *
88      * @param participantDeregisterMsg the ParticipantDeregister message received from a participant
89      */
90     @Timed(value = "listener.participant_deregister", description = "PARTICIPANT_DEREGISTER messages received")
91     public void handleParticipantMessage(ParticipantDeregister participantDeregisterMsg) {
92         var replicaId = participantDeregisterMsg.getReplicaId() != null
93                 ? participantDeregisterMsg.getReplicaId() : participantDeregisterMsg.getParticipantId();
94         var replicaOpt = participantProvider.findParticipantReplica(replicaId);
95         if (replicaOpt.isPresent()) {
96             participantProvider.deleteParticipantReplica(replicaId);
97         }
98
99         participantDeregisterAckPublisher.send(participantDeregisterMsg.getMessageId());
100     }
101
102     /**
103      * Handle a ParticipantStatus message from a participant.
104      *
105      * @param participantStatusMsg the ParticipantStatus message received from a participant
106      */
107     @Timed(value = "listener.participant_status", description = "PARTICIPANT_STATUS messages received")
108     public void handleParticipantMessage(ParticipantStatus participantStatusMsg) {
109         saveIfNotPresent(participantStatusMsg.getReplicaId(), participantStatusMsg.getParticipantId(),
110                 participantStatusMsg.getParticipantSupportedElementType(), false);
111
112         if (!participantStatusMsg.getAutomationCompositionInfoList().isEmpty()) {
113             updateAcOutProperties(participantStatusMsg.getAutomationCompositionInfoList());
114         }
115         if (!participantStatusMsg.getParticipantDefinitionUpdates().isEmpty()
116                 && participantStatusMsg.getCompositionId() != null) {
117             updateAcDefinitionOutProperties(participantStatusMsg.getCompositionId(),
118                 participantStatusMsg.getReplicaId(), participantStatusMsg.getParticipantDefinitionUpdates());
119         }
120     }
121
122     private void saveIfNotPresent(UUID msgReplicaId, UUID participantId,
123             List<ParticipantSupportedElementType> participantSupportedElementType, boolean registration) {
124         var replicaId = msgReplicaId != null ? msgReplicaId : participantId;
125         var replicaOpt = participantProvider.findParticipantReplica(replicaId);
126         if (replicaOpt.isPresent()) {
127             var replica = replicaOpt.get();
128             checkOnline(replica);
129         } else {
130             var participant = getParticipant(participantId, listToMap(participantSupportedElementType));
131             participant.getReplicas().put(replicaId, createReplica(replicaId));
132             participantProvider.saveParticipant(participant);
133         }
134         if (registration) {
135             handleRestart(participantId, replicaId);
136         }
137     }
138
139     private Participant getParticipant(UUID participantId,
140             Map<UUID, ParticipantSupportedElementType> participantSupportedElementType) {
141         var participantOpt = participantProvider.findParticipant(participantId);
142         return participantOpt.orElseGet(() -> createParticipant(participantId, participantSupportedElementType));
143     }
144
145     private ParticipantReplica createReplica(UUID replicaId) {
146         var replica = new ParticipantReplica();
147         replica.setReplicaId(replicaId);
148         replica.setParticipantState(ParticipantState.ON_LINE);
149         replica.setLastMsg(TimestampHelper.now());
150         return replica;
151
152     }
153
154     private void updateAcOutProperties(List<AutomationCompositionInfo> automationCompositionInfoList) {
155         automationCompositionProvider.upgradeStates(automationCompositionInfoList);
156         for (var acInfo : automationCompositionInfoList) {
157             var ac = automationCompositionProvider.getAutomationComposition(acInfo.getAutomationCompositionId());
158             participantSyncPublisher.sendSync(ac);
159         }
160     }
161
162     private void updateAcDefinitionOutProperties(UUID compositionId, UUID replicaId, List<ParticipantDefinition> list) {
163         var acDefinitionOpt = acDefinitionProvider.findAcDefinition(compositionId);
164         if (acDefinitionOpt.isEmpty()) {
165             LOGGER.error("Ac Definition with id {} not found", compositionId);
166             return;
167         }
168         var acDefinition = acDefinitionOpt.get();
169         for (var acElements : list) {
170             for (var element : acElements.getAutomationCompositionElementDefinitionList()) {
171                 var state = acDefinition.getElementStateMap().get(element.getAcElementDefinitionId().getName());
172                 if (state != null) {
173                     state.setOutProperties(element.getOutProperties());
174                 }
175             }
176         }
177         acDefinitionProvider.updateAcDefinition(acDefinition,
178                 acRuntimeParameterGroup.getAcmParameters().getToscaCompositionName());
179         participantSyncPublisher.sendSync(acDefinition, replicaId);
180     }
181
182     private void checkOnline(ParticipantReplica replica) {
183         if (ParticipantState.OFF_LINE.equals(replica.getParticipantState())) {
184             replica.setParticipantState(ParticipantState.ON_LINE);
185         }
186         replica.setLastMsg(TimestampHelper.now());
187         participantProvider.saveParticipantReplica(replica);
188     }
189
190     private void handleRestart(UUID participantId, UUID replicaId) {
191         var compositionIds = participantProvider.getCompositionIds(participantId);
192         for (var compositionId : compositionIds) {
193             var acDefinition = acDefinitionProvider.getAcDefinition(compositionId);
194             LOGGER.debug("Scan Composition {} for restart", acDefinition.getCompositionId());
195             handleSyncRestart(participantId, replicaId, acDefinition);
196         }
197     }
198
199     private void handleSyncRestart(final UUID participantId, UUID replicaId,
200             AutomationCompositionDefinition acDefinition) {
201         if (AcTypeState.COMMISSIONED.equals(acDefinition.getState())) {
202             LOGGER.debug("Composition {} COMMISSIONED", acDefinition.getCompositionId());
203             return;
204         }
205         LOGGER.debug("Composition to be send in Restart message {}", acDefinition.getCompositionId());
206         var automationCompositionList =
207                 automationCompositionProvider.getAcInstancesByCompositionId(acDefinition.getCompositionId());
208         var automationCompositions = automationCompositionList.stream()
209                 .filter(ac -> isAcToBeSyncRestarted(participantId, ac)).toList();
210         participantSyncPublisher.sendRestartMsg(participantId, replicaId, acDefinition, automationCompositions);
211     }
212
213     private boolean isAcToBeSyncRestarted(UUID participantId, AutomationComposition automationComposition) {
214         for (var element : automationComposition.getElements().values()) {
215             if (participantId.equals(element.getParticipantId())) {
216                 return true;
217             }
218         }
219         return false;
220     }
221
222     private Participant createParticipant(UUID participantId,
223             Map<UUID, ParticipantSupportedElementType> participantSupportedElementType) {
224         var participant = new Participant();
225         participant.setParticipantId(participantId);
226         participant.setParticipantSupportedElementTypes(participantSupportedElementType);
227         return participant;
228     }
229
230     private Map<UUID, ParticipantSupportedElementType> listToMap(List<ParticipantSupportedElementType> elementList) {
231         Map<UUID, ParticipantSupportedElementType> map = new HashMap<>();
232         MapUtils.populateMap(map, elementList, ParticipantSupportedElementType::getId);
233         return map;
234     }
235 }