2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.clamp.acm.runtime.supervision;
23 import io.micrometer.core.annotation.Timed;
24 import java.util.HashMap;
25 import java.util.List;
27 import java.util.UUID;
28 import java.util.function.UnaryOperator;
29 import java.util.stream.Collectors;
30 import lombok.AllArgsConstructor;
31 import org.apache.commons.collections4.MapUtils;
32 import org.onap.policy.clamp.acm.runtime.main.utils.EncryptionUtils;
33 import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantDeregisterAckPublisher;
34 import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRegisterAckPublisher;
35 import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
36 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
37 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
38 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
39 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement;
40 import org.onap.policy.clamp.models.acm.concepts.DeployState;
41 import org.onap.policy.clamp.models.acm.concepts.NodeTemplateState;
42 import org.onap.policy.clamp.models.acm.concepts.Participant;
43 import org.onap.policy.clamp.models.acm.concepts.ParticipantReplica;
44 import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
45 import org.onap.policy.clamp.models.acm.concepts.ParticipantSupportedElementType;
46 import org.onap.policy.clamp.models.acm.concepts.ParticipantUtils;
47 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantDeregister;
48 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRegister;
49 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantReqSync;
50 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatus;
51 import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
52 import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider;
53 import org.onap.policy.clamp.models.acm.persistence.provider.MessageProvider;
54 import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider;
55 import org.onap.policy.clamp.models.acm.utils.TimestampHelper;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58 import org.springframework.stereotype.Component;
61 * This class handles supervision of participant status.
65 public class SupervisionParticipantHandler {
66 private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionParticipantHandler.class);
68 private final ParticipantProvider participantProvider;
69 private final ParticipantRegisterAckPublisher participantRegisterAckPublisher;
70 private final ParticipantDeregisterAckPublisher participantDeregisterAckPublisher;
71 private final AutomationCompositionProvider automationCompositionProvider;
72 private final AcDefinitionProvider acDefinitionProvider;
73 private final ParticipantSyncPublisher participantSyncPublisher;
74 private final MessageProvider messageProvider;
75 private final EncryptionUtils encryptionUtils;
78 * Handle a ParticipantRegister message from a participant.
80 * @param participantRegisterMsg the ParticipantRegister message received from a participant
82 @Timed(value = "listener.participant_register", description = "PARTICIPANT_REGISTER messages received")
83 public void handleParticipantMessage(ParticipantRegister participantRegisterMsg) {
84 saveIfNotPresent(participantRegisterMsg.getReplicaId(), participantRegisterMsg.getParticipantId(),
85 participantRegisterMsg.getParticipantSupportedElementType(), true);
87 participantRegisterAckPublisher.send(participantRegisterMsg.getMessageId(),
88 participantRegisterMsg.getParticipantId(), participantRegisterMsg.getReplicaId());
92 * Handle a ParticipantDeregister message from a participant.
94 * @param participantDeregisterMsg the ParticipantDeregister message received from a participant
96 @Timed(value = "listener.participant_deregister", description = "PARTICIPANT_DEREGISTER messages received")
97 public void handleParticipantMessage(ParticipantDeregister participantDeregisterMsg) {
98 var replicaId = participantDeregisterMsg.getReplicaId() != null
99 ? participantDeregisterMsg.getReplicaId() : participantDeregisterMsg.getParticipantId();
100 var replicaOpt = participantProvider.findParticipantReplica(replicaId);
101 if (replicaOpt.isPresent()) {
102 participantProvider.deleteParticipantReplica(replicaId);
105 participantDeregisterAckPublisher.send(participantDeregisterMsg.getMessageId());
109 * Handle a ParticipantStatus message from a participant.
111 * @param participantStatusMsg the ParticipantStatus message received from a participant
114 @Timed(value = "listener.participant_status", description = "PARTICIPANT_STATUS messages received")
115 public void handleParticipantMessage(ParticipantStatus participantStatusMsg) {
116 saveIfNotPresent(participantStatusMsg.getReplicaId(), participantStatusMsg.getParticipantId(),
117 participantStatusMsg.getParticipantSupportedElementType(), false);
119 if (!participantStatusMsg.getAutomationCompositionInfoList().isEmpty()) {
120 messageProvider.saveInstanceOutProperties(participantStatusMsg);
122 if (!participantStatusMsg.getParticipantDefinitionUpdates().isEmpty()
123 && participantStatusMsg.getCompositionId() != null) {
124 var acDefinition = acDefinitionProvider.findAcDefinition(participantStatusMsg.getCompositionId());
125 if (acDefinition.isPresent()) {
126 var map = acDefinition.get().getElementStateMap().values().stream()
127 .collect(Collectors.toMap(NodeTemplateState::getNodeTemplateId, UnaryOperator.identity()));
128 messageProvider.saveCompositionOutProperties(participantStatusMsg, map);
130 LOGGER.error("Not valid ParticipantStatus message");
135 private void saveIfNotPresent(UUID msgReplicaId, UUID participantId,
136 List<ParticipantSupportedElementType> participantSupportedElementType, boolean registration) {
137 var replicaId = msgReplicaId != null ? msgReplicaId : participantId;
138 var replicaOpt = participantProvider.findParticipantReplica(replicaId);
139 if (replicaOpt.isPresent()) {
140 var replica = replicaOpt.get();
141 checkOnline(replica);
143 var participant = getParticipant(participantId, listToMap(participantSupportedElementType));
144 participant.getReplicas().put(replicaId, createReplica(replicaId));
145 participantProvider.saveParticipant(participant);
148 handleRestart(participantId, replicaId);
152 private Participant getParticipant(UUID participantId,
153 Map<UUID, ParticipantSupportedElementType> participantSupportedElementType) {
154 var participantOpt = participantProvider.findParticipant(participantId);
155 return participantOpt.orElseGet(() -> createParticipant(participantId, participantSupportedElementType));
158 private ParticipantReplica createReplica(UUID replicaId) {
159 var replica = new ParticipantReplica();
160 replica.setReplicaId(replicaId);
161 replica.setParticipantState(ParticipantState.ON_LINE);
162 replica.setLastMsg(TimestampHelper.now());
167 private void checkOnline(ParticipantReplica replica) {
168 if (ParticipantState.OFF_LINE.equals(replica.getParticipantState())) {
169 replica.setParticipantState(ParticipantState.ON_LINE);
171 replica.setLastMsg(TimestampHelper.now());
172 participantProvider.saveParticipantReplica(replica);
176 * Handle restart of a participant.
178 * @param participantId ID of the participant to restart
179 * @param replicaId ID of the participant replica
181 public void handleRestart(UUID participantId, UUID replicaId) {
182 var compositionIds = participantProvider.getCompositionIds(participantId);
183 for (var compositionId : compositionIds) {
184 var acDefinition = acDefinitionProvider.getAcDefinition(compositionId);
185 LOGGER.debug("Scan Composition {} for restart", acDefinition.getCompositionId());
186 handleSyncRestart(participantId, replicaId, acDefinition);
190 private void handleSyncRestart(final UUID participantId, UUID replicaId,
191 AutomationCompositionDefinition acDefinition) {
192 if (AcTypeState.COMMISSIONED.equals(acDefinition.getState())) {
193 LOGGER.debug("Composition {} COMMISSIONED", acDefinition.getCompositionId());
196 LOGGER.debug("Composition to be send in Restart message {}", acDefinition.getCompositionId());
197 var automationCompositionList =
198 automationCompositionProvider.getAcInstancesByCompositionId(acDefinition.getCompositionId());
199 encryptionUtils.decryptInstanceProperties(automationCompositionList);
200 var automationCompositions =
201 automationCompositionList.stream().filter(ac -> isAcToBeSyncRestarted(participantId, ac)).toList();
202 participantSyncPublisher.sendRestartMsg(participantId, replicaId, acDefinition, automationCompositions);
206 * Handle restart of all participants.
208 public void handleRestartOfAllParticipants() {
209 var participants = participantProvider.getParticipants();
210 for (var participant:participants) {
211 handleRestart(participant.getParticipantId(), null);
215 private boolean isAcToBeSyncRestarted(UUID participantId, AutomationComposition automationComposition) {
216 for (var element : automationComposition.getElements().values()) {
217 if (participantId.equals(element.getParticipantId())) {
224 private Participant createParticipant(UUID participantId,
225 Map<UUID, ParticipantSupportedElementType> participantSupportedElementType) {
226 var participant = new Participant();
227 participant.setParticipantId(participantId);
228 participant.setParticipantSupportedElementTypes(participantSupportedElementType);
232 private Map<UUID, ParticipantSupportedElementType> listToMap(List<ParticipantSupportedElementType> elementList) {
233 Map<UUID, ParticipantSupportedElementType> map = new HashMap<>();
234 MapUtils.populateMap(map, elementList, ParticipantSupportedElementType::getId);
239 * Handle a participantReqSync message from a participant.
241 * @param participantReqSync the message received from a participant
243 @Timed(value = "listener.participant_req_sync", description = "PARTICIPANT_REQ_SYNC_MSG messages received")
244 public void handleParticipantReqSync(ParticipantReqSync participantReqSync) {
245 if (participantReqSync.getCompositionTargetId() != null) {
246 // outdated Composition Target
247 var acDefinition = acDefinitionProvider.getAcDefinition(participantReqSync.getCompositionTargetId());
248 participantSyncPublisher.sendRestartMsg(participantReqSync.getParticipantId(),
249 participantReqSync.getReplicaId(), acDefinition, List.of());
251 if (participantReqSync.getCompositionId() == null
252 && participantReqSync.getAutomationCompositionId() != null) {
253 // outdated AutomationComposition
254 var automationComposition =
255 getAutomationCompositionForSync(participantReqSync.getAutomationCompositionId());
256 participantSyncPublisher.sendSync(automationComposition);
258 if (participantReqSync.getCompositionId() != null) {
259 // outdated Composition
260 var acDefinition = acDefinitionProvider.getAcDefinition(participantReqSync.getCompositionId());
261 var automationCompositions = participantReqSync.getAutomationCompositionId() != null
262 ? List.of(getAutomationCompositionForSync(participantReqSync.getAutomationCompositionId())) :
263 List.<AutomationComposition>of();
264 participantSyncPublisher.sendRestartMsg(participantReqSync.getParticipantId(),
265 participantReqSync.getReplicaId(), acDefinition, automationCompositions);
269 private AutomationComposition getAutomationCompositionForSync(UUID automationCompositionId) {
270 var automationComposition = automationCompositionProvider.getAutomationComposition(automationCompositionId);
271 encryptionUtils.decryptInstanceProperties(automationComposition);
272 if (DeployState.MIGRATING.equals(automationComposition.getDeployState())) {
273 var acDefinition = acDefinitionProvider.getAcDefinition(automationComposition.getCompositionTargetId());
274 var stage = ParticipantUtils.getFirstStage(automationComposition, acDefinition.getServiceTemplate());
275 if (automationComposition.getPhase().equals(stage)) {
276 // scenario first stage migration
277 var rollback = automationCompositionProvider.getAutomationCompositionRollback(automationCompositionId);
278 automationComposition.setElements(rollback.getElements().values().stream()
279 .collect(Collectors.toMap(AutomationCompositionElement::getId, AutomationCompositionElement::new)));
282 return automationComposition;