2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2023-2024 Nordix Foundation.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.clamp.acm.participant.intermediary.handler;
23 import io.opentelemetry.context.Context;
24 import java.io.Closeable;
25 import java.io.IOException;
26 import java.util.HashMap;
27 import java.util.List;
29 import java.util.UUID;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.Future;
34 import lombok.RequiredArgsConstructor;
35 import org.onap.policy.clamp.acm.participant.intermediary.api.AutomationCompositionElementListener;
36 import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi;
37 import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
38 import org.onap.policy.clamp.models.acm.concepts.AcElementRestart;
39 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
40 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElementDefinition;
41 import org.onap.policy.clamp.models.acm.concepts.DeployState;
42 import org.onap.policy.clamp.models.acm.concepts.LockState;
43 import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc;
44 import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
45 import org.onap.policy.models.base.PfModelException;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48 import org.springframework.stereotype.Component;
51 @RequiredArgsConstructor
52 public class ThreadHandler implements Closeable {
53 private static final Logger LOGGER = LoggerFactory.getLogger(ThreadHandler.class);
55 private final AutomationCompositionElementListener listener;
56 private final ParticipantIntermediaryApi intermediaryApi;
57 private final CacheProvider cacheProvider;
59 private final Map<UUID, Future> executionMap = new ConcurrentHashMap<>();
61 private final ExecutorService executor =
62 Context.taskWrapping(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
65 * Handle an update on a automation composition element.
67 * @param messageId the messageId
68 * @param instanceId the automationComposition Id
69 * @param element the information on the automation composition element
70 * @param properties properties Map
72 public void deploy(UUID messageId, UUID instanceId, AcElementDeploy element, Map<String, Object> properties) {
73 cleanExecution(element.getId(), messageId);
74 var result = executor.submit(() -> this.deployProcess(instanceId, element, properties));
75 executionMap.put(element.getId(), result);
78 private void deployProcess(UUID instanceId, AcElementDeploy element, Map<String, Object> properties) {
80 listener.deploy(instanceId, element, properties);
81 } catch (PfModelException e) {
82 LOGGER.error("Automation composition element deploy failed {} {}", instanceId, e.getMessage());
83 intermediaryApi.updateAutomationCompositionElementState(instanceId, element.getId(), DeployState.UNDEPLOYED,
84 null, StateChangeResult.FAILED, "Automation composition element deploy failed");
86 executionMap.remove(element.getId());
90 * Handle a automation composition element state change.
92 * @param messageId the messageId
93 * @param instanceId the automationComposition Id
94 * @param elementId the ID of the automation composition element
96 public void undeploy(UUID messageId, UUID instanceId, UUID elementId) {
97 cleanExecution(elementId, messageId);
98 var result = executor.submit(() -> this.undeployProcess(instanceId, elementId));
99 executionMap.put(elementId, result);
102 private void undeployProcess(UUID instanceId, UUID elementId) {
104 listener.undeploy(instanceId, elementId);
105 } catch (PfModelException e) {
106 LOGGER.error("Automation composition element undeploy failed {} {}", instanceId, e.getMessage());
107 intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, DeployState.DEPLOYED, null,
108 StateChangeResult.FAILED, "Automation composition element undeploy failed");
110 executionMap.remove(elementId);
114 * Handle a automation composition element lock.
116 * @param messageId the messageId
117 * @param instanceId the automationComposition Id
118 * @param elementId the ID of the automation composition element
120 public void lock(UUID messageId, UUID instanceId, UUID elementId) {
121 cleanExecution(elementId, messageId);
122 var result = executor.submit(() -> this.lockProcess(instanceId, elementId));
123 executionMap.put(elementId, result);
126 private void lockProcess(UUID instanceId, UUID elementId) {
128 listener.lock(instanceId, elementId);
129 } catch (PfModelException e) {
130 LOGGER.error("Automation composition element lock failed {} {}", instanceId, e.getMessage());
131 intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, null, LockState.UNLOCKED,
132 StateChangeResult.FAILED, "Automation composition element lock failed");
134 executionMap.remove(elementId);
138 * Handle a automation composition element unlock.
140 * @param messageId the messageId
141 * @param instanceId the automationComposition Id
142 * @param elementId the ID of the automation composition element
144 public void unlock(UUID messageId, UUID instanceId, UUID elementId) {
145 cleanExecution(elementId, messageId);
146 var result = executor.submit(() -> this.unlockProcess(instanceId, elementId));
147 executionMap.put(elementId, result);
150 private void unlockProcess(UUID instanceId, UUID elementId) {
152 listener.unlock(instanceId, elementId);
153 } catch (PfModelException e) {
154 LOGGER.error("Automation composition element unlock failed {} {}", instanceId, e.getMessage());
155 intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, null, LockState.LOCKED,
156 StateChangeResult.FAILED, "Automation composition element unlock failed");
158 executionMap.remove(elementId);
162 * Handle a automation composition element delete.
164 * @param messageId the messageId
165 * @param instanceId the automationComposition Id
166 * @param elementId the ID of the automation composition element
168 public void delete(UUID messageId, UUID instanceId, UUID elementId) {
169 cleanExecution(elementId, messageId);
170 var result = executor.submit(() -> this.deleteProcess(instanceId, elementId));
171 executionMap.put(elementId, result);
174 private void deleteProcess(UUID instanceId, UUID elementId) {
176 listener.delete(instanceId, elementId);
177 } catch (PfModelException e) {
178 LOGGER.error("Automation composition element delete failed {} {}", instanceId, e.getMessage());
179 intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, DeployState.UNDEPLOYED, null,
180 StateChangeResult.FAILED, "Automation composition element delete failed");
182 executionMap.remove(elementId);
186 * Handle a automation composition element properties update.
188 * @param messageId the messageId
189 * @param instanceId the automationComposition Id
190 * @param element the information on the automation composition element
191 * @param properties properties Map
193 public void update(UUID messageId, UUID instanceId, AcElementDeploy element, Map<String, Object> properties) {
194 cleanExecution(element.getId(), messageId);
195 var result = executor.submit(() -> this.updateProcess(instanceId, element, properties));
196 executionMap.put(element.getId(), result);
199 private void updateProcess(UUID instanceId, AcElementDeploy element, Map<String, Object> properties) {
201 listener.update(instanceId, element, properties);
202 } catch (PfModelException e) {
203 LOGGER.error("Automation composition element update failed {} {}", instanceId, e.getMessage());
204 intermediaryApi.updateAutomationCompositionElementState(instanceId, element.getId(), DeployState.DEPLOYED,
205 null, StateChangeResult.FAILED, "Automation composition element update failed");
207 executionMap.remove(element.getId());
210 private void cleanExecution(UUID execIdentificationId, UUID messageId) {
211 var process = executionMap.get(execIdentificationId);
212 if (process != null) {
213 if (!process.isDone()) {
214 process.cancel(true);
216 executionMap.remove(execIdentificationId);
218 cacheProvider.getMsgIdentification().put(execIdentificationId, messageId);
222 * Handles prime a Composition Definition.
224 * @param messageId the messageId
225 * @param compositionId the compositionId
226 * @param list the list of AutomationCompositionElementDefinition
228 public void prime(UUID messageId, UUID compositionId, List<AutomationCompositionElementDefinition> list) {
229 cleanExecution(compositionId, messageId);
230 var result = executor.submit(() -> this.primeProcess(compositionId, list));
231 executionMap.put(compositionId, result);
234 private void primeProcess(UUID compositionId, List<AutomationCompositionElementDefinition> list) {
236 listener.prime(compositionId, list);
237 executionMap.remove(compositionId);
238 } catch (PfModelException e) {
239 LOGGER.error("Composition Defintion prime failed {} {}", compositionId, e.getMessage());
240 intermediaryApi.updateCompositionState(compositionId, AcTypeState.COMMISSIONED, StateChangeResult.FAILED,
241 "Composition Defintion prime failed");
246 * Handles deprime a Composition Definition.
248 * @param messageId the messageId
249 * @param compositionId the compositionId
251 public void deprime(UUID messageId, UUID compositionId) {
252 cleanExecution(compositionId, messageId);
253 var result = executor.submit(() -> this.deprimeProcess(compositionId));
254 executionMap.put(compositionId, result);
257 private void deprimeProcess(UUID compositionId) {
259 listener.deprime(compositionId);
260 executionMap.remove(compositionId);
261 } catch (PfModelException e) {
262 LOGGER.error("Composition Defintion deprime failed {} {}", compositionId, e.getMessage());
263 intermediaryApi.updateCompositionState(compositionId, AcTypeState.PRIMED, StateChangeResult.FAILED,
264 "Composition Defintion deprime failed");
269 * Handles restarted scenario.
271 * @param messageId the messageId
272 * @param compositionId the compositionId
273 * @param list the list of AutomationCompositionElementDefinition
274 * @param state the state of the composition
275 * @param automationCompositionList list of ParticipantRestartAc
277 public void restarted(UUID messageId, UUID compositionId, List<AutomationCompositionElementDefinition> list,
278 AcTypeState state, List<ParticipantRestartAc> automationCompositionList) {
280 listener.handleRestartComposition(compositionId, list, state);
281 } catch (PfModelException e) {
282 LOGGER.error("Composition Defintion restarted failed {} {}", compositionId, e.getMessage());
283 intermediaryApi.updateCompositionState(compositionId, state, StateChangeResult.FAILED,
284 "Composition Defintion restarted failed");
287 for (var automationComposition : automationCompositionList) {
288 for (var element : automationComposition.getAcElementList()) {
289 cleanExecution(element.getId(), messageId);
290 var result = executor.submit(() -> this
291 .restartedInstanceProcess(automationComposition.getAutomationCompositionId(), element));
292 executionMap.put(element.getId(), result);
297 private void restartedInstanceProcess(UUID instanceId, AcElementRestart element) {
299 var map = new HashMap<>(cacheProvider.getCommonProperties(instanceId, element.getId()));
300 map.putAll(element.getProperties());
302 listener.handleRestartInstance(instanceId, getAcElementDeploy(element), map, element.getDeployState(),
303 element.getLockState());
304 executionMap.remove(element.getId());
305 } catch (PfModelException e) {
306 LOGGER.error("Automation composition element deploy failed {} {}", instanceId, e.getMessage());
307 intermediaryApi.updateAutomationCompositionElementState(instanceId, element.getId(),
308 element.getDeployState(), element.getLockState(), StateChangeResult.FAILED,
309 "Automation composition element restart failed");
313 private AcElementDeploy getAcElementDeploy(AcElementRestart element) {
314 var acElementDeploy = new AcElementDeploy();
315 acElementDeploy.setId(element.getId());
316 acElementDeploy.setDefinition(element.getDefinition());
317 acElementDeploy.setProperties(element.getProperties());
318 acElementDeploy.setToscaServiceTemplateFragment(element.getToscaServiceTemplateFragment());
319 return acElementDeploy;
323 * Closes this stream and releases any system resources associated
324 * with it. If the stream is already closed then invoking this
325 * method has no effect.
327 * @throws IOException if an I/O error occurs
330 public void close() throws IOException {
335 * Handles AutomationComposition Migration.
337 * @param messageId the messageId
338 * @param instanceId the automationComposition Id
339 * @param element the information on the automation composition element
340 * @param compositionTargetId the composition to migrate
342 public void migrate(UUID messageId, UUID instanceId, AcElementDeploy element, UUID compositionTargetId,
343 Map<String, Object> properties) {
344 cleanExecution(element.getId(), messageId);
345 var result = executor.submit(() -> this.migrateProcess(instanceId, element, compositionTargetId, properties));
346 executionMap.put(element.getId(), result);
349 private void migrateProcess(UUID instanceId, AcElementDeploy element, UUID compositionTargetId,
350 Map<String, Object> properties) {
352 listener.migrate(instanceId, element, compositionTargetId, properties);
353 } catch (PfModelException e) {
354 LOGGER.error("Automation composition element migrate failed {} {}", instanceId, e.getMessage());
355 intermediaryApi.updateAutomationCompositionElementState(instanceId, element.getId(), DeployState.DEPLOYED,
356 null, StateChangeResult.FAILED, "Automation composition element migrate failed");
358 executionMap.remove(element.getId());