ba223d2735b5f361b606405d01670bbcb311e627
[policy/clamp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.clamp.acm.runtime.supervision;
22
23 import io.micrometer.core.annotation.Timed;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.UUID;
28 import java.util.function.UnaryOperator;
29 import java.util.stream.Collectors;
30 import lombok.AllArgsConstructor;
31 import org.apache.commons.collections4.MapUtils;
32 import org.onap.policy.clamp.acm.runtime.main.utils.EncryptionUtils;
33 import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantDeregisterAckPublisher;
34 import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRegisterAckPublisher;
35 import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
36 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
37 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
38 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
39 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement;
40 import org.onap.policy.clamp.models.acm.concepts.DeployState;
41 import org.onap.policy.clamp.models.acm.concepts.NodeTemplateState;
42 import org.onap.policy.clamp.models.acm.concepts.Participant;
43 import org.onap.policy.clamp.models.acm.concepts.ParticipantReplica;
44 import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
45 import org.onap.policy.clamp.models.acm.concepts.ParticipantSupportedElementType;
46 import org.onap.policy.clamp.models.acm.concepts.ParticipantUtils;
47 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantDeregister;
48 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRegister;
49 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantReqSync;
50 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatus;
51 import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
52 import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider;
53 import org.onap.policy.clamp.models.acm.persistence.provider.MessageProvider;
54 import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider;
55 import org.onap.policy.clamp.models.acm.utils.TimestampHelper;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58 import org.springframework.stereotype.Component;
59
60 /**
61  * This class handles supervision of participant status.
62  */
63 @Component
64 @AllArgsConstructor
65 public class SupervisionParticipantHandler {
66     private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionParticipantHandler.class);
67
68     private final ParticipantProvider participantProvider;
69     private final ParticipantRegisterAckPublisher participantRegisterAckPublisher;
70     private final ParticipantDeregisterAckPublisher participantDeregisterAckPublisher;
71     private final AutomationCompositionProvider automationCompositionProvider;
72     private final AcDefinitionProvider acDefinitionProvider;
73     private final ParticipantSyncPublisher participantSyncPublisher;
74     private final MessageProvider messageProvider;
75     private final EncryptionUtils encryptionUtils;
76
77     /**
78      * Handle a ParticipantRegister message from a participant.
79      *
80      * @param participantRegisterMsg the ParticipantRegister message received from a participant
81      */
82     @Timed(value = "listener.participant_register", description = "PARTICIPANT_REGISTER messages received")
83     public void handleParticipantMessage(ParticipantRegister participantRegisterMsg) {
84         saveIfNotPresent(participantRegisterMsg.getReplicaId(), participantRegisterMsg.getParticipantId(),
85                 participantRegisterMsg.getParticipantSupportedElementType(), true);
86
87         participantRegisterAckPublisher.send(participantRegisterMsg.getMessageId(),
88                 participantRegisterMsg.getParticipantId(), participantRegisterMsg.getReplicaId());
89     }
90
91     /**
92      * Handle a ParticipantDeregister message from a participant.
93      *
94      * @param participantDeregisterMsg the ParticipantDeregister message received from a participant
95      */
96     @Timed(value = "listener.participant_deregister", description = "PARTICIPANT_DEREGISTER messages received")
97     public void handleParticipantMessage(ParticipantDeregister participantDeregisterMsg) {
98         var replicaId = participantDeregisterMsg.getReplicaId() != null
99                 ? participantDeregisterMsg.getReplicaId() : participantDeregisterMsg.getParticipantId();
100         var replicaOpt = participantProvider.findParticipantReplica(replicaId);
101         if (replicaOpt.isPresent()) {
102             participantProvider.deleteParticipantReplica(replicaId);
103         }
104
105         participantDeregisterAckPublisher.send(participantDeregisterMsg.getMessageId());
106     }
107
108     /**
109      * Handle a ParticipantStatus message from a participant.
110      *
111      * @param participantStatusMsg the ParticipantStatus message received from a participant
112      */
113     @MessageIntercept
114     @Timed(value = "listener.participant_status", description = "PARTICIPANT_STATUS messages received")
115     public void handleParticipantMessage(ParticipantStatus participantStatusMsg) {
116         saveIfNotPresent(participantStatusMsg.getReplicaId(), participantStatusMsg.getParticipantId(),
117                 participantStatusMsg.getParticipantSupportedElementType(), false);
118
119         if (!participantStatusMsg.getAutomationCompositionInfoList().isEmpty()) {
120             messageProvider.saveInstanceOutProperties(participantStatusMsg);
121         }
122         if (!participantStatusMsg.getParticipantDefinitionUpdates().isEmpty()
123                 && participantStatusMsg.getCompositionId() != null) {
124             var acDefinition = acDefinitionProvider.findAcDefinition(participantStatusMsg.getCompositionId());
125             if (acDefinition.isPresent()) {
126                 var map = acDefinition.get().getElementStateMap().values().stream()
127                         .collect(Collectors.toMap(NodeTemplateState::getNodeTemplateId, UnaryOperator.identity()));
128                 messageProvider.saveCompositionOutProperties(participantStatusMsg, map);
129             } else {
130                 LOGGER.error("Not valid ParticipantStatus message");
131             }
132         }
133     }
134
135     private void saveIfNotPresent(UUID msgReplicaId, UUID participantId,
136             List<ParticipantSupportedElementType> participantSupportedElementType, boolean registration) {
137         var replicaId = msgReplicaId != null ? msgReplicaId : participantId;
138         var replicaOpt = participantProvider.findParticipantReplica(replicaId);
139         if (replicaOpt.isPresent()) {
140             var replica = replicaOpt.get();
141             checkOnline(replica);
142         } else {
143             var participant = getParticipant(participantId, listToMap(participantSupportedElementType));
144             participant.getReplicas().put(replicaId, createReplica(replicaId));
145             participantProvider.saveParticipant(participant);
146         }
147         if (registration) {
148             handleRestart(participantId, replicaId);
149         }
150     }
151
152     private Participant getParticipant(UUID participantId,
153             Map<UUID, ParticipantSupportedElementType> participantSupportedElementType) {
154         var participantOpt = participantProvider.findParticipant(participantId);
155         return participantOpt.orElseGet(() -> createParticipant(participantId, participantSupportedElementType));
156     }
157
158     private ParticipantReplica createReplica(UUID replicaId) {
159         var replica = new ParticipantReplica();
160         replica.setReplicaId(replicaId);
161         replica.setParticipantState(ParticipantState.ON_LINE);
162         replica.setLastMsg(TimestampHelper.now());
163         return replica;
164
165     }
166
167     private void checkOnline(ParticipantReplica replica) {
168         if (ParticipantState.OFF_LINE.equals(replica.getParticipantState())) {
169             replica.setParticipantState(ParticipantState.ON_LINE);
170         }
171         replica.setLastMsg(TimestampHelper.now());
172         participantProvider.saveParticipantReplica(replica);
173     }
174
175     /**
176      * Handle restart of a participant.
177      *
178      * @param participantId     ID of the participant to restart
179      * @param replicaId         ID of the participant replica
180      */
181     public void handleRestart(UUID participantId, UUID replicaId) {
182         var compositionIds = participantProvider.getCompositionIds(participantId);
183         for (var compositionId : compositionIds) {
184             var acDefinition = acDefinitionProvider.getAcDefinition(compositionId);
185             LOGGER.debug("Scan Composition {} for restart", acDefinition.getCompositionId());
186             handleSyncRestart(participantId, replicaId, acDefinition);
187         }
188     }
189
190     private void handleSyncRestart(final UUID participantId, UUID replicaId,
191             AutomationCompositionDefinition acDefinition) {
192         if (AcTypeState.COMMISSIONED.equals(acDefinition.getState())) {
193             LOGGER.debug("Composition {} COMMISSIONED", acDefinition.getCompositionId());
194             return;
195         }
196         LOGGER.debug("Composition to be send in Restart message {}", acDefinition.getCompositionId());
197         var automationCompositionList =
198                 automationCompositionProvider.getAcInstancesByCompositionId(acDefinition.getCompositionId());
199         encryptionUtils.decryptInstanceProperties(automationCompositionList);
200         var automationCompositions =
201                 automationCompositionList.stream().filter(ac -> isAcToBeSyncRestarted(participantId, ac)).toList();
202         participantSyncPublisher.sendRestartMsg(participantId, replicaId, acDefinition, automationCompositions);
203     }
204
205     /**
206      * Handle restart of all participants.
207      */
208     public void handleRestartOfAllParticipants() {
209         var participants = participantProvider.getParticipants();
210         for (var participant:participants) {
211             handleRestart(participant.getParticipantId(), null);
212         }
213     }
214
215     private boolean isAcToBeSyncRestarted(UUID participantId, AutomationComposition automationComposition) {
216         for (var element : automationComposition.getElements().values()) {
217             if (participantId.equals(element.getParticipantId())) {
218                 return true;
219             }
220         }
221         return false;
222     }
223
224     private Participant createParticipant(UUID participantId,
225             Map<UUID, ParticipantSupportedElementType> participantSupportedElementType) {
226         var participant = new Participant();
227         participant.setParticipantId(participantId);
228         participant.setParticipantSupportedElementTypes(participantSupportedElementType);
229         return participant;
230     }
231
232     private Map<UUID, ParticipantSupportedElementType> listToMap(List<ParticipantSupportedElementType> elementList) {
233         Map<UUID, ParticipantSupportedElementType> map = new HashMap<>();
234         MapUtils.populateMap(map, elementList, ParticipantSupportedElementType::getId);
235         return map;
236     }
237
238     /**
239      * Handle a participantReqSync message from a participant.
240      *
241      * @param participantReqSync the message received from a participant
242      */
243     @Timed(value = "listener.participant_req_sync", description = "PARTICIPANT_REQ_SYNC_MSG messages received")
244     public void handleParticipantReqSync(ParticipantReqSync participantReqSync) {
245         if (participantReqSync.getCompositionTargetId() != null) {
246             // outdated Composition Target
247             var acDefinition = acDefinitionProvider.getAcDefinition(participantReqSync.getCompositionTargetId());
248             participantSyncPublisher.sendRestartMsg(participantReqSync.getParticipantId(),
249                     participantReqSync.getReplicaId(), acDefinition, List.of());
250         }
251         if (participantReqSync.getCompositionId() == null
252                 && participantReqSync.getAutomationCompositionId() != null) {
253             // outdated AutomationComposition
254             var automationComposition =
255                     getAutomationCompositionForSync(participantReqSync.getAutomationCompositionId());
256             participantSyncPublisher.sendSync(automationComposition);
257         }
258         if (participantReqSync.getCompositionId() != null) {
259             // outdated Composition
260             var acDefinition = acDefinitionProvider.getAcDefinition(participantReqSync.getCompositionId());
261             var automationCompositions = participantReqSync.getAutomationCompositionId() != null
262                     ? List.of(getAutomationCompositionForSync(participantReqSync.getAutomationCompositionId())) :
263                     List.<AutomationComposition>of();
264             participantSyncPublisher.sendRestartMsg(participantReqSync.getParticipantId(),
265                     participantReqSync.getReplicaId(), acDefinition, automationCompositions);
266         }
267     }
268
269     private AutomationComposition getAutomationCompositionForSync(UUID automationCompositionId) {
270         var automationComposition = automationCompositionProvider.getAutomationComposition(automationCompositionId);
271         encryptionUtils.decryptInstanceProperties(automationComposition);
272         if (DeployState.MIGRATING.equals(automationComposition.getDeployState())) {
273             var acDefinition = acDefinitionProvider.getAcDefinition(automationComposition.getCompositionTargetId());
274             var stage = ParticipantUtils.getFirstStage(automationComposition, acDefinition.getServiceTemplate());
275             if (automationComposition.getPhase().equals(stage)) {
276                 // scenario first stage migration
277                 var rollback = automationCompositionProvider.getAutomationCompositionRollback(automationCompositionId);
278                 automationComposition.setElements(rollback.getElements().values().stream()
279                     .collect(Collectors.toMap(AutomationCompositionElement::getId, AutomationCompositionElement::new)));
280             }
281         }
282         return automationComposition;
283     }
284 }