2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.clamp.acm.participant.intermediary.handler;
23 import io.opentelemetry.context.Context;
24 import java.io.Closeable;
25 import java.io.IOException;
27 import java.util.UUID;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.Future;
32 import org.onap.policy.clamp.acm.participant.intermediary.api.AutomationCompositionElementListener;
33 import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionDto;
34 import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElementDto;
35 import org.onap.policy.clamp.acm.participant.intermediary.api.InstanceElementDto;
36 import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi;
37 import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantParameters;
38 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
39 import org.onap.policy.clamp.models.acm.concepts.DeployState;
40 import org.onap.policy.clamp.models.acm.concepts.LockState;
41 import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
42 import org.onap.policy.models.base.PfModelException;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45 import org.springframework.stereotype.Component;
48 public class ThreadHandler implements Closeable {
49 private static final Logger LOGGER = LoggerFactory.getLogger(ThreadHandler.class);
51 private final AutomationCompositionElementListener listener;
52 private final ParticipantIntermediaryApi intermediaryApi;
53 private final CacheProvider cacheProvider;
55 private final Map<UUID, Future<?>> executionMap = new ConcurrentHashMap<>();
57 private final ExecutorService executor;
62 * @param listener the AutomationComposition ElementListener
63 * @param intermediaryApi the intermediaryApi
64 * @param cacheProvider the CacheProvider
65 * @param parameters the parameters
67 public ThreadHandler(AutomationCompositionElementListener listener, ParticipantIntermediaryApi intermediaryApi,
68 CacheProvider cacheProvider, ParticipantParameters parameters) {
69 this.listener = listener;
70 this.intermediaryApi = intermediaryApi;
71 this.cacheProvider = cacheProvider;
72 executor = Context.taskWrapping(Executors.newFixedThreadPool(
73 parameters.getIntermediaryParameters().getThreadPoolSize()));
77 * Handle a deploy on a automation composition element.
79 * @param messageId the messageId
80 * @param compositionElement the information of the Automation Composition Definition Element
81 * @param instanceElement the information of the Automation Composition Instance Element
83 public void deploy(UUID messageId, CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
84 cleanExecution(instanceElement.elementId(), messageId);
85 var result = executor.submit(() -> this.deployProcess(compositionElement, instanceElement));
86 executionMap.put(instanceElement.elementId(), result);
89 private void deployProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
91 listener.deploy(compositionElement, instanceElement);
92 } catch (PfModelException e) {
93 LOGGER.error("Automation composition element deploy failed {} {}", instanceElement.elementId(),
95 intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(),
96 instanceElement.elementId(), DeployState.UNDEPLOYED, null, StateChangeResult.FAILED,
97 "Automation composition element deploy failed");
99 executionMap.remove(instanceElement.elementId());
103 * Handle an udeploy on a automation composition element.
105 * @param messageId the messageId
106 * @param compositionElement the information of the Automation Composition Definition Element
107 * @param instanceElement the information of the Automation Composition Instance Element
109 public void undeploy(UUID messageId, CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
110 cleanExecution(instanceElement.elementId(), messageId);
111 var result = executor.submit(() -> this.undeployProcess(compositionElement, instanceElement));
112 executionMap.put(instanceElement.elementId(), result);
115 private void undeployProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
117 listener.undeploy(compositionElement, instanceElement);
118 } catch (PfModelException e) {
120 "Automation composition element undeploy failed {} {}", instanceElement.elementId(), e.getMessage());
121 intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(),
122 instanceElement.elementId(), DeployState.DEPLOYED, null,
123 StateChangeResult.FAILED, "Automation composition element undeploy failed");
125 executionMap.remove(instanceElement.elementId());
129 * Handle a automation composition element lock.
131 * @param messageId the messageId
132 * @param compositionElement the information of the Automation Composition Definition Element
133 * @param instanceElement the information of the Automation Composition Instance Element
135 public void lock(UUID messageId, CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
136 cleanExecution(instanceElement.elementId(), messageId);
137 var result = executor.submit(() -> this.lockProcess(compositionElement, instanceElement));
138 executionMap.put(instanceElement.elementId(), result);
141 private void lockProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
143 listener.lock(compositionElement, instanceElement);
144 } catch (PfModelException e) {
145 LOGGER.error("Automation composition element lock failed {} {}",
146 instanceElement.elementId(), e.getMessage());
147 intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(),
148 instanceElement.elementId(), null, LockState.UNLOCKED, StateChangeResult.FAILED,
149 "Automation composition element lock failed");
151 executionMap.remove(instanceElement.elementId());
155 * Handle a automation composition element unlock.
157 * @param messageId the messageId
158 * @param compositionElement the information of the Automation Composition Definition Element
159 * @param instanceElement the information of the Automation Composition Instance Element
161 public void unlock(UUID messageId, CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
162 cleanExecution(instanceElement.elementId(), messageId);
163 var result = executor.submit(() -> this.unlockProcess(compositionElement, instanceElement));
164 executionMap.put(instanceElement.elementId(), result);
167 private void unlockProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
169 listener.unlock(compositionElement, instanceElement);
170 } catch (PfModelException e) {
171 LOGGER.error("Automation composition element unlock failed {} {}",
172 instanceElement.elementId(), e.getMessage());
173 intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(),
174 instanceElement.elementId(), null, LockState.LOCKED, StateChangeResult.FAILED,
175 "Automation composition element unlock failed");
177 executionMap.remove(instanceElement.elementId());
181 * Handle a automation composition element delete.
183 * @param messageId the messageId
184 * @param compositionElement the information of the Automation Composition Definition Element
185 * @param instanceElement the information of the Automation Composition Instance Element
187 public void delete(UUID messageId, CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
188 cleanExecution(instanceElement.elementId(), messageId);
189 var result = executor.submit(() -> this.deleteProcess(compositionElement, instanceElement));
190 executionMap.put(instanceElement.elementId(), result);
193 private void deleteProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
195 listener.delete(compositionElement, instanceElement);
196 } catch (PfModelException e) {
197 LOGGER.error("Automation composition element delete failed {} {}",
198 instanceElement.elementId(), e.getMessage());
199 intermediaryApi.updateAutomationCompositionElementState(
200 instanceElement.instanceId(), instanceElement.elementId(), DeployState.UNDEPLOYED, null,
201 StateChangeResult.FAILED, "Automation composition element delete failed");
203 executionMap.remove(instanceElement.elementId());
207 * Handle a automation composition element properties update.
209 * @param messageId the messageId
210 * @param compositionElement the information of the Automation Composition Definition Element
211 * @param instanceElement the information of the Automation Composition Instance Element
212 * @param instanceElementUpdated the information of the Automation Composition Instance Element updated
214 public void update(UUID messageId, CompositionElementDto compositionElement, InstanceElementDto instanceElement,
215 InstanceElementDto instanceElementUpdated) {
216 cleanExecution(instanceElement.elementId(), messageId);
217 var result = executor.submit(() ->
218 this.updateProcess(compositionElement, instanceElement, instanceElementUpdated));
219 executionMap.put(instanceElement.elementId(), result);
222 private void updateProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement,
223 InstanceElementDto instanceElementUpdated) {
225 listener.update(compositionElement, instanceElement, instanceElementUpdated);
226 } catch (PfModelException e) {
227 LOGGER.error("Automation composition element update failed {} {}",
228 instanceElement.elementId(), e.getMessage());
229 intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(),
230 instanceElement.elementId(), DeployState.DEPLOYED, null,
231 StateChangeResult.FAILED, "Automation composition element update failed");
233 executionMap.remove(instanceElement.elementId());
239 * @param execIdentificationId the identification Id
240 * @param messageId the messageId
242 public void cleanExecution(UUID execIdentificationId, UUID messageId) {
243 var process = executionMap.get(execIdentificationId);
244 if (process != null) {
245 if (!process.isDone()) {
246 process.cancel(true);
248 executionMap.remove(execIdentificationId);
250 cacheProvider.getMsgIdentification().put(execIdentificationId, messageId);
254 * Handles prime a Composition Definition.
256 * @param messageId the messageId
257 * @param composition the composition
259 public void prime(UUID messageId, CompositionDto composition) {
260 cleanExecution(composition.compositionId(), messageId);
261 var result = executor.submit(() -> this.primeProcess(composition));
262 executionMap.put(composition.compositionId(), result);
265 private void primeProcess(CompositionDto composition) {
267 listener.prime(composition);
268 executionMap.remove(composition.compositionId());
269 } catch (PfModelException e) {
270 LOGGER.error("Composition Defintion prime failed {} {}", composition.compositionId(), e.getMessage());
271 intermediaryApi.updateCompositionState(composition.compositionId(), AcTypeState.COMMISSIONED,
272 StateChangeResult.FAILED, "Composition Defintion prime failed");
277 * Handles deprime a Composition Definition.
279 * @param messageId the messageId
280 * @param composition the composition
282 public void deprime(UUID messageId, CompositionDto composition) {
283 cleanExecution(composition.compositionId(), messageId);
284 var result = executor.submit(() -> this.deprimeProcess(composition));
285 executionMap.put(composition.compositionId(), result);
288 private void deprimeProcess(CompositionDto composition) {
290 listener.deprime(composition);
291 executionMap.remove(composition.compositionId());
292 } catch (PfModelException e) {
293 LOGGER.error("Composition Defintion deprime failed {} {}", composition.compositionId(), e.getMessage());
294 intermediaryApi.updateCompositionState(composition.compositionId(), AcTypeState.PRIMED,
295 StateChangeResult.FAILED, "Composition Defintion deprime failed");
300 * Closes this stream and releases any system resources associated
301 * with it. If the stream is already closed then invoking this
302 * method has no effect.
304 * @throws IOException if an I/O error occurs
307 public void close() throws IOException {
312 * Handles AutomationComposition Migration.
314 * @param messageId the messageId
315 * @param compositionElement the information of the Automation Composition Definition Element
316 * @param compositionElementTarget the information of the Automation Composition Definition Element Target
317 * @param instanceElement the information of the Automation Composition Instance Element
318 * @param instanceElementMigrate the information of the Automation Composition Instance Element updated
319 * @param stage the stage
321 public void migrate(UUID messageId, CompositionElementDto compositionElement,
322 CompositionElementDto compositionElementTarget, InstanceElementDto instanceElement,
323 InstanceElementDto instanceElementMigrate, int stage) {
324 cleanExecution(instanceElement.elementId(), messageId);
325 var result = executor.submit(() ->
326 this.migrateProcess(compositionElement, compositionElementTarget,
327 instanceElement, instanceElementMigrate, stage));
328 executionMap.put(instanceElement.elementId(), result);
331 private void migrateProcess(CompositionElementDto compositionElement,
332 CompositionElementDto compositionElementTarget, InstanceElementDto instanceElement,
333 InstanceElementDto instanceElementMigrate, int stage) {
335 listener.migrate(compositionElement, compositionElementTarget,
336 instanceElement, instanceElementMigrate, stage);
337 } catch (PfModelException e) {
338 LOGGER.error("Automation composition element migrate failed {} {}",
339 instanceElement.elementId(), e.getMessage());
340 intermediaryApi.updateAutomationCompositionElementState(
341 instanceElement.instanceId(), instanceElement.elementId(), DeployState.DEPLOYED,
342 null, StateChangeResult.FAILED, "Automation composition element migrate failed");
344 executionMap.remove(instanceElement.elementId());
348 * Handles AutomationComposition Migration Precheck.
350 * @param messageId the messageId
351 * @param compositionElement the information of the Automation Composition Definition Element
352 * @param compositionElementTarget the information of the Automation Composition Definition Element Target
353 * @param instanceElement the information of the Automation Composition Instance Element
354 * @param instanceElementMigrate the information of the Automation Composition Instance Element updated
356 public void migratePrecheck(UUID messageId, CompositionElementDto compositionElement,
357 CompositionElementDto compositionElementTarget, InstanceElementDto instanceElement,
358 InstanceElementDto instanceElementMigrate) {
359 cleanExecution(instanceElement.elementId(), messageId);
360 var result = executor.submit(() ->
361 this.migratePrecheckProcess(compositionElement, compositionElementTarget, instanceElement,
362 instanceElementMigrate));
363 executionMap.put(instanceElement.elementId(), result);
366 private void migratePrecheckProcess(CompositionElementDto compositionElement,
367 CompositionElementDto compositionElementTarget, InstanceElementDto instanceElement,
368 InstanceElementDto instanceElementMigrate) {
370 listener.migratePrecheck(compositionElement, compositionElementTarget, instanceElement,
371 instanceElementMigrate);
372 } catch (PfModelException e) {
373 LOGGER.error("Automation composition element migrate precheck failed {} {}",
374 instanceElement.elementId(), e.getMessage());
375 intermediaryApi.updateAutomationCompositionElementState(
376 instanceElement.instanceId(), instanceElement.elementId(), DeployState.DEPLOYED,
377 null, StateChangeResult.FAILED, "Automation composition element migrate precheck failed");
379 executionMap.remove(instanceElement.elementId());
383 * Handles AutomationComposition Prepare Post Deploy.
385 * @param messageId the messageId
386 * @param compositionElement the information of the Automation Composition Definition Element
387 * @param instanceElement the information of the Automation Composition Instance Element
389 public void review(UUID messageId, CompositionElementDto compositionElement,
390 InstanceElementDto instanceElement) {
391 cleanExecution(instanceElement.elementId(), messageId);
392 var result = executor.submit(() -> this.reviewProcess(compositionElement, instanceElement));
393 executionMap.put(instanceElement.elementId(), result);
396 private void reviewProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
398 listener.review(compositionElement, instanceElement);
399 } catch (PfModelException e) {
400 LOGGER.error("Automation composition element Review failed {} {}",
401 instanceElement.elementId(), e.getMessage());
402 intermediaryApi.updateAutomationCompositionElementState(
403 instanceElement.instanceId(), instanceElement.elementId(), DeployState.DEPLOYED,
404 null, StateChangeResult.FAILED, "Automation composition element Review failed");
406 executionMap.remove(instanceElement.elementId());
410 * Handles AutomationComposition Prepare Pre Deploy.
412 * @param messageId the messageId
413 * @param compositionElement the information of the Automation Composition Definition Element
414 * @param instanceElement the information of the Automation Composition Instance Element
415 * @param stage the stage
417 public void prepare(UUID messageId, CompositionElementDto compositionElement,
418 InstanceElementDto instanceElement, int stage) {
419 cleanExecution(instanceElement.elementId(), messageId);
420 var result = executor.submit(() -> this.prepareProcess(compositionElement, instanceElement, stage));
421 executionMap.put(instanceElement.elementId(), result);
424 private void prepareProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement,
427 listener.prepare(compositionElement, instanceElement, stage);
428 } catch (PfModelException e) {
429 LOGGER.error("Automation composition element prepare Pre Deploy failed {} {}",
430 instanceElement.elementId(), e.getMessage());
431 intermediaryApi.updateAutomationCompositionElementState(
432 instanceElement.instanceId(), instanceElement.elementId(), DeployState.UNDEPLOYED,
433 null, StateChangeResult.FAILED, "Automation composition element prepare Pre Deploy failed");
435 executionMap.remove(instanceElement.elementId());