3c93a357121486c9eb3725c55df46798696ceed6
[policy/clamp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2023-2024 Nordix Foundation.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.clamp.acm.runtime.supervision;
22
23 import io.micrometer.core.annotation.Timed;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.UUID;
28 import lombok.AllArgsConstructor;
29 import org.apache.commons.collections4.MapUtils;
30 import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
31 import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantDeregisterAckPublisher;
32 import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRegisterAckPublisher;
33 import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRestartPublisher;
34 import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
35 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
36 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
37 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
38 import org.onap.policy.clamp.models.acm.concepts.Participant;
39 import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition;
40 import org.onap.policy.clamp.models.acm.concepts.ParticipantReplica;
41 import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
42 import org.onap.policy.clamp.models.acm.concepts.ParticipantSupportedElementType;
43 import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
44 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantDeregister;
45 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRegister;
46 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatus;
47 import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
48 import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider;
49 import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider;
50 import org.onap.policy.clamp.models.acm.utils.TimestampHelper;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53 import org.springframework.stereotype.Component;
54
55 /**
56  * This class handles supervision of participant status.
57  */
58 @Component
59 @AllArgsConstructor
60 public class SupervisionParticipantHandler {
61     private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionParticipantHandler.class);
62
63     private final ParticipantProvider participantProvider;
64     private final ParticipantRegisterAckPublisher participantRegisterAckPublisher;
65     private final ParticipantDeregisterAckPublisher participantDeregisterAckPublisher;
66     private final AutomationCompositionProvider automationCompositionProvider;
67     private final AcDefinitionProvider acDefinitionProvider;
68     private final ParticipantRestartPublisher participantRestartPublisher;
69     private final ParticipantSyncPublisher participantSyncPublisher;
70     private final AcRuntimeParameterGroup acRuntimeParameterGroup;
71
72     /**
73      * Handle a ParticipantRegister message from a participant.
74      *
75      * @param participantRegisterMsg the ParticipantRegister message received from a participant
76      */
77     @Timed(value = "listener.participant_register", description = "PARTICIPANT_REGISTER messages received")
78     public void handleParticipantMessage(ParticipantRegister participantRegisterMsg) {
79         saveIfNotPresent(participantRegisterMsg.getReplicaId(),
80                 participantRegisterMsg.getParticipantId(),
81                 participantRegisterMsg.getParticipantSupportedElementType(), true);
82
83         participantRegisterAckPublisher.send(participantRegisterMsg.getMessageId(),
84                 participantRegisterMsg.getParticipantId(), participantRegisterMsg.getReplicaId());
85     }
86
87     /**
88      * Handle a ParticipantDeregister message from a participant.
89      *
90      * @param participantDeregisterMsg the ParticipantDeregister message received from a participant
91      */
92     @Timed(value = "listener.participant_deregister", description = "PARTICIPANT_DEREGISTER messages received")
93     public void handleParticipantMessage(ParticipantDeregister participantDeregisterMsg) {
94         var replicaId = participantDeregisterMsg.getReplicaId() != null
95                 ? participantDeregisterMsg.getReplicaId() : participantDeregisterMsg.getParticipantId();
96         var replicaOpt = participantProvider.findParticipantReplica(replicaId);
97         if (replicaOpt.isPresent()) {
98             participantProvider.deleteParticipantReplica(replicaId);
99         }
100
101         participantDeregisterAckPublisher.send(participantDeregisterMsg.getMessageId());
102     }
103
104     /**
105      * Handle a ParticipantStatus message from a participant.
106      *
107      * @param participantStatusMsg the ParticipantStatus message received from a participant
108      */
109     @Timed(value = "listener.participant_status", description = "PARTICIPANT_STATUS messages received")
110     public void handleParticipantMessage(ParticipantStatus participantStatusMsg) {
111         saveIfNotPresent(participantStatusMsg.getReplicaId(), participantStatusMsg.getParticipantId(),
112                 participantStatusMsg.getParticipantSupportedElementType(), false);
113
114         if (!participantStatusMsg.getAutomationCompositionInfoList().isEmpty()) {
115             automationCompositionProvider.upgradeStates(participantStatusMsg.getAutomationCompositionInfoList());
116         }
117         if (!participantStatusMsg.getParticipantDefinitionUpdates().isEmpty()
118                 && participantStatusMsg.getCompositionId() != null) {
119             updateAcDefinitionOutProperties(participantStatusMsg.getCompositionId(),
120                 participantStatusMsg.getReplicaId(), participantStatusMsg.getParticipantDefinitionUpdates());
121         }
122     }
123
124     private void saveIfNotPresent(UUID msgReplicaId, UUID participantId,
125             List<ParticipantSupportedElementType> participantSupportedElementType, boolean registration) {
126         var replicaId = msgReplicaId != null ? msgReplicaId : participantId;
127         var replicaOpt = participantProvider.findParticipantReplica(replicaId);
128         if (replicaOpt.isPresent()) {
129             var replica = replicaOpt.get();
130             checkOnline(replica);
131         } else {
132             var participant = getParticipant(participantId, listToMap(participantSupportedElementType));
133             participant.getReplicas().put(replicaId, createReplica(replicaId));
134             participantProvider.saveParticipant(participant);
135         }
136         if (registration) {
137             handleRestart(participantId, replicaId);
138         }
139     }
140
141     private Participant getParticipant(UUID participantId,
142             Map<UUID, ParticipantSupportedElementType> participantSupportedElementType) {
143         var participantOpt = participantProvider.findParticipant(participantId);
144         return participantOpt.orElseGet(() -> createParticipant(participantId, participantSupportedElementType));
145     }
146
147     private ParticipantReplica createReplica(UUID replicaId) {
148         var replica = new ParticipantReplica();
149         replica.setReplicaId(replicaId);
150         replica.setParticipantState(ParticipantState.ON_LINE);
151         replica.setLastMsg(TimestampHelper.now());
152         return replica;
153
154     }
155
156     private void updateAcDefinitionOutProperties(UUID compositionId, UUID replicaId, List<ParticipantDefinition> list) {
157         var acDefinitionOpt = acDefinitionProvider.findAcDefinition(compositionId);
158         if (acDefinitionOpt.isEmpty()) {
159             LOGGER.error("Ac Definition with id {} not found", compositionId);
160             return;
161         }
162         var acDefinition = acDefinitionOpt.get();
163         for (var acElements : list) {
164             for (var element : acElements.getAutomationCompositionElementDefinitionList()) {
165                 var state = acDefinition.getElementStateMap().get(element.getAcElementDefinitionId().getName());
166                 if (state != null) {
167                     state.setOutProperties(element.getOutProperties());
168                 }
169             }
170         }
171         acDefinitionProvider.updateAcDefinition(acDefinition,
172                 acRuntimeParameterGroup.getAcmParameters().getToscaCompositionName());
173         participantSyncPublisher.sendSync(acDefinition, replicaId);
174     }
175
176     private void checkOnline(ParticipantReplica replica) {
177         if (ParticipantState.OFF_LINE.equals(replica.getParticipantState())) {
178             replica.setParticipantState(ParticipantState.ON_LINE);
179         }
180         replica.setLastMsg(TimestampHelper.now());
181         participantProvider.saveParticipantReplica(replica);
182     }
183
184     private void handleRestart(UUID participantId, UUID replicaId) {
185         var compositionIds = participantProvider.getCompositionIds(participantId);
186         var oldParticipant = participantId.equals(replicaId);
187         for (var compositionId : compositionIds) {
188             var acDefinition = acDefinitionProvider.getAcDefinition(compositionId);
189             LOGGER.debug("Scan Composition {} for restart", acDefinition.getCompositionId());
190             if (oldParticipant) {
191                 handleRestart(participantId, acDefinition);
192             } else {
193                 handleSyncRestart(participantId, replicaId, acDefinition);
194             }
195         }
196     }
197
198     private void handleRestart(final UUID participantId, AutomationCompositionDefinition acDefinition) {
199         if (AcTypeState.COMMISSIONED.equals(acDefinition.getState())) {
200             LOGGER.debug("Composition {} COMMISSIONED", acDefinition.getCompositionId());
201             return;
202         }
203         LOGGER.debug("Composition to be send in Restart message {}", acDefinition.getCompositionId());
204         for (var elementState : acDefinition.getElementStateMap().values()) {
205             if (participantId.equals(elementState.getParticipantId())) {
206                 elementState.setRestarting(true);
207             }
208         }
209         // expected final state
210         if (StateChangeResult.TIMEOUT.equals(acDefinition.getStateChangeResult())) {
211             acDefinition.setStateChangeResult(StateChangeResult.NO_ERROR);
212         }
213         acDefinition.setRestarting(true);
214         acDefinitionProvider.updateAcDefinition(acDefinition,
215                 acRuntimeParameterGroup.getAcmParameters().getToscaCompositionName());
216
217         var automationCompositionList =
218                 automationCompositionProvider.getAcInstancesByCompositionId(acDefinition.getCompositionId());
219         var automationCompositions = automationCompositionList.stream()
220                 .filter(ac -> isAcToBeRestarted(participantId, ac)).toList();
221         participantRestartPublisher.send(participantId, acDefinition, automationCompositions);
222     }
223
224     private boolean isAcToBeRestarted(UUID participantId, AutomationComposition automationComposition) {
225         boolean toAdd = false;
226         for (var element : automationComposition.getElements().values()) {
227             if (participantId.equals(element.getParticipantId())) {
228                 element.setRestarting(true);
229                 toAdd = true;
230             }
231         }
232         if (toAdd) {
233             automationComposition.setRestarting(true);
234             // expected final state
235             if (StateChangeResult.TIMEOUT.equals(automationComposition.getStateChangeResult())) {
236                 automationComposition.setStateChangeResult(StateChangeResult.NO_ERROR);
237             }
238             automationCompositionProvider.updateAutomationComposition(automationComposition);
239         }
240         return toAdd;
241     }
242
243     private void handleSyncRestart(final UUID participantId, UUID replicaId,
244             AutomationCompositionDefinition acDefinition) {
245         if (AcTypeState.COMMISSIONED.equals(acDefinition.getState())) {
246             LOGGER.debug("Composition {} COMMISSIONED", acDefinition.getCompositionId());
247             return;
248         }
249         LOGGER.debug("Composition to be send in Restart message {}", acDefinition.getCompositionId());
250         var automationCompositionList =
251                 automationCompositionProvider.getAcInstancesByCompositionId(acDefinition.getCompositionId());
252         var automationCompositions = automationCompositionList.stream()
253                 .filter(ac -> isAcToBeSyncRestarted(participantId, ac)).toList();
254         participantSyncPublisher.sendRestartMsg(participantId, replicaId, acDefinition, automationCompositions);
255     }
256
257     private boolean isAcToBeSyncRestarted(UUID participantId, AutomationComposition automationComposition) {
258         for (var element : automationComposition.getElements().values()) {
259             if (participantId.equals(element.getParticipantId())) {
260                 return true;
261             }
262         }
263         return false;
264     }
265
266     private Participant createParticipant(UUID participantId,
267             Map<UUID, ParticipantSupportedElementType> participantSupportedElementType) {
268         var participant = new Participant();
269         participant.setParticipantId(participantId);
270         participant.setParticipantSupportedElementTypes(participantSupportedElementType);
271         return participant;
272     }
273
274     private Map<UUID, ParticipantSupportedElementType> listToMap(List<ParticipantSupportedElementType> elementList) {
275         Map<UUID, ParticipantSupportedElementType> map = new HashMap<>();
276         MapUtils.populateMap(map, elementList, ParticipantSupportedElementType::getId);
277         return map;
278     }
279 }