2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2023-2024 Nordix Foundation.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.clamp.acm.participant.intermediary.handler;
23 import io.opentelemetry.context.Context;
24 import java.io.Closeable;
25 import java.io.IOException;
26 import java.util.List;
28 import java.util.UUID;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.Future;
33 import lombok.RequiredArgsConstructor;
34 import org.onap.policy.clamp.acm.participant.intermediary.api.AutomationCompositionElementListener;
35 import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionDto;
36 import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElementDto;
37 import org.onap.policy.clamp.acm.participant.intermediary.api.InstanceElementDto;
38 import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi;
39 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
40 import org.onap.policy.clamp.models.acm.concepts.DeployState;
41 import org.onap.policy.clamp.models.acm.concepts.LockState;
42 import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc;
43 import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
44 import org.onap.policy.models.base.PfModelException;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47 import org.springframework.stereotype.Component;
50 @RequiredArgsConstructor
51 public class ThreadHandler implements Closeable {
52 private static final Logger LOGGER = LoggerFactory.getLogger(ThreadHandler.class);
54 private final AutomationCompositionElementListener listener;
55 private final ParticipantIntermediaryApi intermediaryApi;
56 private final CacheProvider cacheProvider;
58 private final Map<UUID, Future> executionMap = new ConcurrentHashMap<>();
60 private final ExecutorService executor =
61 Context.taskWrapping(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
64 * Handle a deploy on a automation composition element.
66 * @param messageId the messageId
67 * @param compositionElement the information of the Automation Composition Definition Element
68 * @param instanceElement the information of the Automation Composition Instance Element
70 public void deploy(UUID messageId, CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
71 cleanExecution(instanceElement.elementId(), messageId);
72 var result = executor.submit(() -> this.deployProcess(compositionElement, instanceElement));
73 executionMap.put(instanceElement.elementId(), result);
76 private void deployProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
78 listener.deploy(compositionElement, instanceElement);
79 } catch (PfModelException e) {
80 LOGGER.error("Automation composition element deploy failed {} {}", instanceElement.elementId(),
82 intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(),
83 instanceElement.elementId(), DeployState.UNDEPLOYED, null, StateChangeResult.FAILED,
84 "Automation composition element deploy failed");
86 executionMap.remove(instanceElement.elementId());
90 * Handle an udeploy on a automation composition element.
92 * @param messageId the messageId
93 * @param compositionElement the information of the Automation Composition Definition Element
94 * @param instanceElement the information of the Automation Composition Instance Element
96 public void undeploy(UUID messageId, CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
97 cleanExecution(instanceElement.elementId(), messageId);
98 var result = executor.submit(() -> this.undeployProcess(compositionElement, instanceElement));
99 executionMap.put(instanceElement.elementId(), result);
102 private void undeployProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
104 listener.undeploy(compositionElement, instanceElement);
105 } catch (PfModelException e) {
107 "Automation composition element undeploy failed {} {}", instanceElement.elementId(), e.getMessage());
108 intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(),
109 instanceElement.elementId(), DeployState.DEPLOYED, null,
110 StateChangeResult.FAILED, "Automation composition element undeploy failed");
112 executionMap.remove(instanceElement.elementId());
116 * Handle a automation composition element lock.
118 * @param messageId the messageId
119 * @param compositionElement the information of the Automation Composition Definition Element
120 * @param instanceElement the information of the Automation Composition Instance Element
122 public void lock(UUID messageId, CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
123 cleanExecution(instanceElement.elementId(), messageId);
124 var result = executor.submit(() -> this.lockProcess(compositionElement, instanceElement));
125 executionMap.put(instanceElement.elementId(), result);
128 private void lockProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
130 listener.lock(compositionElement, instanceElement);
131 } catch (PfModelException e) {
132 LOGGER.error("Automation composition element lock failed {} {}",
133 instanceElement.elementId(), e.getMessage());
134 intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(),
135 instanceElement.elementId(), null, LockState.UNLOCKED, StateChangeResult.FAILED,
136 "Automation composition element lock failed");
138 executionMap.remove(instanceElement.elementId());
142 * Handle a automation composition element unlock.
144 * @param messageId the messageId
145 * @param compositionElement the information of the Automation Composition Definition Element
146 * @param instanceElement the information of the Automation Composition Instance Element
148 public void unlock(UUID messageId, CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
149 cleanExecution(instanceElement.elementId(), messageId);
150 var result = executor.submit(() -> this.unlockProcess(compositionElement, instanceElement));
151 executionMap.put(instanceElement.elementId(), result);
154 private void unlockProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
156 listener.unlock(compositionElement, instanceElement);
157 } catch (PfModelException e) {
158 LOGGER.error("Automation composition element unlock failed {} {}",
159 instanceElement.elementId(), e.getMessage());
160 intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(),
161 instanceElement.elementId(), null, LockState.LOCKED, StateChangeResult.FAILED,
162 "Automation composition element unlock failed");
164 executionMap.remove(instanceElement.elementId());
168 * Handle a automation composition element delete.
170 * @param messageId the messageId
171 * @param compositionElement the information of the Automation Composition Definition Element
172 * @param instanceElement the information of the Automation Composition Instance Element
174 public void delete(UUID messageId, CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
175 cleanExecution(instanceElement.elementId(), messageId);
176 var result = executor.submit(() -> this.deleteProcess(compositionElement, instanceElement));
177 executionMap.put(instanceElement.elementId(), result);
180 private void deleteProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
182 listener.delete(compositionElement, instanceElement);
183 } catch (PfModelException e) {
184 LOGGER.error("Automation composition element delete failed {} {}",
185 instanceElement.elementId(), e.getMessage());
186 intermediaryApi.updateAutomationCompositionElementState(
187 instanceElement.instanceId(), instanceElement.elementId(), DeployState.UNDEPLOYED, null,
188 StateChangeResult.FAILED, "Automation composition element delete failed");
190 executionMap.remove(instanceElement.elementId());
194 * Handle a automation composition element properties update.
196 * @param messageId the messageId
197 * @param compositionElement the information of the Automation Composition Definition Element
198 * @param instanceElement the information of the Automation Composition Instance Element
199 * @param instanceElementUpdated the information of the Automation Composition Instance Element updated
201 public void update(UUID messageId, CompositionElementDto compositionElement, InstanceElementDto instanceElement,
202 InstanceElementDto instanceElementUpdated) {
203 cleanExecution(instanceElement.elementId(), messageId);
204 var result = executor.submit(() ->
205 this.updateProcess(compositionElement, instanceElement, instanceElementUpdated));
206 executionMap.put(instanceElement.elementId(), result);
209 private void updateProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement,
210 InstanceElementDto instanceElementUpdated) {
212 listener.update(compositionElement, instanceElement, instanceElementUpdated);
213 } catch (PfModelException e) {
214 LOGGER.error("Automation composition element update failed {} {}",
215 instanceElement.elementId(), e.getMessage());
216 intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(),
217 instanceElement.elementId(), DeployState.DEPLOYED, null,
218 StateChangeResult.FAILED, "Automation composition element update failed");
220 executionMap.remove(instanceElement.elementId());
223 private void cleanExecution(UUID execIdentificationId, UUID messageId) {
224 var process = executionMap.get(execIdentificationId);
225 if (process != null) {
226 if (!process.isDone()) {
227 process.cancel(true);
229 executionMap.remove(execIdentificationId);
231 cacheProvider.getMsgIdentification().put(execIdentificationId, messageId);
235 * Handles prime a Composition Definition.
237 * @param messageId the messageId
238 * @param composition the composition
240 public void prime(UUID messageId, CompositionDto composition) {
241 cleanExecution(composition.compositionId(), messageId);
242 var result = executor.submit(() -> this.primeProcess(composition));
243 executionMap.put(composition.compositionId(), result);
246 private void primeProcess(CompositionDto composition) {
248 listener.prime(composition);
249 executionMap.remove(composition.compositionId());
250 } catch (PfModelException e) {
251 LOGGER.error("Composition Defintion prime failed {} {}", composition.compositionId(), e.getMessage());
252 intermediaryApi.updateCompositionState(composition.compositionId(), AcTypeState.COMMISSIONED,
253 StateChangeResult.FAILED, "Composition Defintion prime failed");
258 * Handles deprime a Composition Definition.
260 * @param messageId the messageId
261 * @param composition the composition
263 public void deprime(UUID messageId, CompositionDto composition) {
264 cleanExecution(composition.compositionId(), messageId);
265 var result = executor.submit(() -> this.deprimeProcess(composition));
266 executionMap.put(composition.compositionId(), result);
269 private void deprimeProcess(CompositionDto composition) {
271 listener.deprime(composition);
272 executionMap.remove(composition.compositionId());
273 } catch (PfModelException e) {
274 LOGGER.error("Composition Defintion deprime failed {} {}", composition.compositionId(), e.getMessage());
275 intermediaryApi.updateCompositionState(composition.compositionId(), AcTypeState.PRIMED,
276 StateChangeResult.FAILED, "Composition Defintion deprime failed");
281 * Handles restarted scenario.
283 * @param messageId the messageId
284 * @param composition the composition
285 * @param state the state of the composition
286 * @param automationCompositionList list of ParticipantRestartAc
288 public void restarted(UUID messageId, CompositionDto composition,
289 AcTypeState state, List<ParticipantRestartAc> automationCompositionList) {
291 listener.handleRestartComposition(composition, state);
292 } catch (PfModelException e) {
293 LOGGER.error("Composition Defintion restarted failed {} {}", composition.compositionId(), e.getMessage());
294 intermediaryApi.updateCompositionState(composition.compositionId(), state, StateChangeResult.FAILED,
295 "Composition Defintion restarted failed");
298 for (var automationComposition : automationCompositionList) {
299 for (var element : automationComposition.getAcElementList()) {
300 var compositionElement = new CompositionElementDto(composition.compositionId(),
301 element.getDefinition(), composition.inPropertiesMap().get(element.getDefinition()),
302 composition.outPropertiesMap().get(element.getDefinition()));
303 var instanceElementDto = new InstanceElementDto(automationComposition.getAutomationCompositionId(),
304 element.getId(), element.getToscaServiceTemplateFragment(),
305 element.getProperties(), element.getOutProperties());
306 cleanExecution(element.getId(), messageId);
307 var result = executor.submit(() ->
308 this.restartedInstanceProcess(compositionElement, instanceElementDto,
309 element.getDeployState(), element.getLockState()));
310 executionMap.put(element.getId(), result);
315 private void restartedInstanceProcess(CompositionElementDto compositionElement,
316 InstanceElementDto instanceElementDto, DeployState deployState, LockState lockState) {
318 listener.handleRestartInstance(compositionElement, instanceElementDto, deployState, lockState);
319 executionMap.remove(instanceElementDto.elementId());
320 } catch (PfModelException e) {
321 LOGGER.error("Automation composition element deploy failed {} {}",
322 instanceElementDto.elementId(), e.getMessage());
323 intermediaryApi.updateAutomationCompositionElementState(instanceElementDto.instanceId(),
324 instanceElementDto.elementId(), deployState, lockState, StateChangeResult.FAILED,
325 "Automation composition element restart failed");
330 * Closes this stream and releases any system resources associated
331 * with it. If the stream is already closed then invoking this
332 * method has no effect.
334 * @throws IOException if an I/O error occurs
337 public void close() throws IOException {
342 * Handles AutomationComposition Migration.
344 * @param messageId the messageId
345 * @param compositionElement the information of the Automation Composition Definition Element
346 * @param compositionElementTarget the information of the Automation Composition Definition Element Target
347 * @param instanceElement the information of the Automation Composition Instance Element
348 * @param instanceElementMigrate the information of the Automation Composition Instance Element updated
350 public void migrate(UUID messageId, CompositionElementDto compositionElement,
351 CompositionElementDto compositionElementTarget, InstanceElementDto instanceElement,
352 InstanceElementDto instanceElementMigrate) {
353 cleanExecution(instanceElement.elementId(), messageId);
354 var result = executor.submit(() ->
355 this.migrateProcess(compositionElement, compositionElementTarget, instanceElement, instanceElementMigrate));
356 executionMap.put(instanceElement.elementId(), result);
359 private void migrateProcess(CompositionElementDto compositionElement,
360 CompositionElementDto compositionElementTarget, InstanceElementDto instanceElement,
361 InstanceElementDto instanceElementMigrate) {
363 listener.migrate(compositionElement, compositionElementTarget, instanceElement, instanceElementMigrate);
364 } catch (PfModelException e) {
365 LOGGER.error("Automation composition element migrate failed {} {}",
366 instanceElement.elementId(), e.getMessage());
367 intermediaryApi.updateAutomationCompositionElementState(
368 instanceElement.instanceId(), instanceElement.elementId(), DeployState.DEPLOYED,
369 null, StateChangeResult.FAILED, "Automation composition element migrate failed");
371 executionMap.remove(instanceElement.elementId());