2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2023 Nordix Foundation.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.clamp.acm.participant.intermediary.handler;
23 import java.io.Closeable;
24 import java.io.IOException;
25 import java.util.List;
27 import java.util.UUID;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.Future;
32 import lombok.RequiredArgsConstructor;
33 import org.onap.policy.clamp.acm.participant.intermediary.api.AutomationCompositionElementListener;
34 import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi;
35 import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
36 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
37 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElementDefinition;
38 import org.onap.policy.clamp.models.acm.concepts.DeployState;
39 import org.onap.policy.clamp.models.acm.concepts.LockState;
40 import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc;
41 import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
42 import org.onap.policy.models.base.PfModelException;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45 import org.springframework.stereotype.Component;
48 @RequiredArgsConstructor
49 public class ThreadHandler implements Closeable {
50 private static final Logger LOGGER = LoggerFactory.getLogger(ThreadHandler.class);
52 private final AutomationCompositionElementListener listener;
53 private final ParticipantIntermediaryApi intermediaryApi;
54 private final CacheProvider cacheProvider;
56 private final Map<UUID, Future> executionMap = new ConcurrentHashMap<>();
58 private final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
61 * Handle an update on a automation composition element.
63 * @param messageId the messageId
64 * @param instanceId the automationComposition Id
65 * @param element the information on the automation composition element
66 * @param properties properties Map
68 public void deploy(UUID messageId, UUID instanceId, AcElementDeploy element, Map<String, Object> properties) {
69 cleanExecution(element.getId(), messageId);
70 var result = executor.submit(() -> this.deployProcess(instanceId, element, properties));
71 executionMap.put(element.getId(), result);
74 private void deployProcess(UUID instanceId, AcElementDeploy element, Map<String, Object> properties) {
76 listener.deploy(instanceId, element, properties);
77 } catch (PfModelException e) {
78 LOGGER.error("Automation composition element deploy failed {} {}", instanceId, e.getMessage());
79 intermediaryApi.updateAutomationCompositionElementState(instanceId, element.getId(), DeployState.UNDEPLOYED,
80 null, StateChangeResult.FAILED, "Automation composition element deploy failed");
82 executionMap.remove(element.getId());
86 * Handle a automation composition element state change.
88 * @param messageId the messageId
89 * @param instanceId the automationComposition Id
90 * @param elementId the ID of the automation composition element
92 public void undeploy(UUID messageId, UUID instanceId, UUID elementId) {
93 cleanExecution(elementId, messageId);
94 var result = executor.submit(() -> this.undeployProcess(instanceId, elementId));
95 executionMap.put(elementId, result);
98 private void undeployProcess(UUID instanceId, UUID elementId) {
100 listener.undeploy(instanceId, elementId);
101 } catch (PfModelException e) {
102 LOGGER.error("Automation composition element undeploy failed {} {}", instanceId, e.getMessage());
103 intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, DeployState.DEPLOYED, null,
104 StateChangeResult.FAILED, "Automation composition element undeploy failed");
106 executionMap.remove(elementId);
110 * Handle a automation composition element lock.
112 * @param messageId the messageId
113 * @param instanceId the automationComposition Id
114 * @param elementId the ID of the automation composition element
116 public void lock(UUID messageId, UUID instanceId, UUID elementId) {
117 cleanExecution(elementId, messageId);
118 var result = executor.submit(() -> this.lockProcess(instanceId, elementId));
119 executionMap.put(elementId, result);
122 private void lockProcess(UUID instanceId, UUID elementId) {
124 listener.lock(instanceId, elementId);
125 } catch (PfModelException e) {
126 LOGGER.error("Automation composition element lock failed {} {}", instanceId, e.getMessage());
127 intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, null, LockState.UNLOCKED,
128 StateChangeResult.FAILED, "Automation composition element lock failed");
130 executionMap.remove(elementId);
134 * Handle a automation composition element unlock.
136 * @param messageId the messageId
137 * @param instanceId the automationComposition Id
138 * @param elementId the ID of the automation composition element
140 public void unlock(UUID messageId, UUID instanceId, UUID elementId) {
141 cleanExecution(elementId, messageId);
142 var result = executor.submit(() -> this.unlockProcess(instanceId, elementId));
143 executionMap.put(elementId, result);
146 private void unlockProcess(UUID instanceId, UUID elementId) {
148 listener.unlock(instanceId, elementId);
149 } catch (PfModelException e) {
150 LOGGER.error("Automation composition element unlock failed {} {}", instanceId, e.getMessage());
151 intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, null, LockState.LOCKED,
152 StateChangeResult.FAILED, "Automation composition element unlock failed");
154 executionMap.remove(elementId);
158 * Handle a automation composition element delete.
160 * @param messageId the messageId
161 * @param instanceId the automationComposition Id
162 * @param elementId the ID of the automation composition element
164 public void delete(UUID messageId, UUID instanceId, UUID elementId) {
165 cleanExecution(elementId, messageId);
166 var result = executor.submit(() -> this.deleteProcess(instanceId, elementId));
167 executionMap.put(elementId, result);
170 private void deleteProcess(UUID instanceId, UUID elementId) {
172 listener.delete(instanceId, elementId);
173 } catch (PfModelException e) {
174 LOGGER.error("Automation composition element delete failed {} {}", instanceId, e.getMessage());
175 intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, DeployState.UNDEPLOYED, null,
176 StateChangeResult.FAILED, "Automation composition element delete failed");
178 executionMap.remove(elementId);
182 * Handle a automation composition element properties update.
184 * @param messageId the messageId
185 * @param instanceId the automationComposition Id
186 * @param element the information on the automation composition element
187 * @param properties properties Map
189 public void update(UUID messageId, UUID instanceId, AcElementDeploy element, Map<String, Object> properties) {
190 cleanExecution(element.getId(), messageId);
191 var result = executor.submit(() -> this.updateProcess(instanceId, element, properties));
192 executionMap.put(element.getId(), result);
195 private void updateProcess(UUID instanceId, AcElementDeploy element, Map<String, Object> properties) {
197 listener.update(instanceId, element, properties);
198 } catch (PfModelException e) {
199 LOGGER.error("Automation composition element update failed {} {}", instanceId, e.getMessage());
200 intermediaryApi.updateAutomationCompositionElementState(instanceId, element.getId(), DeployState.DEPLOYED,
201 null, StateChangeResult.FAILED, "Automation composition element update failed");
203 executionMap.remove(element.getId());
206 private void cleanExecution(UUID execIdentificationId, UUID messageId) {
207 var process = executionMap.get(execIdentificationId);
208 if (process != null) {
209 if (!process.isDone()) {
210 process.cancel(true);
212 executionMap.remove(execIdentificationId);
214 cacheProvider.getMsgIdentification().put(execIdentificationId, messageId);
218 * Handles prime a Composition Definition.
220 * @param messageId the messageId
221 * @param compositionId the compositionId
222 * @param list the list of AutomationCompositionElementDefinition
224 public void prime(UUID messageId, UUID compositionId, List<AutomationCompositionElementDefinition> list) {
225 cleanExecution(compositionId, messageId);
226 var result = executor.submit(() -> this.primeProcess(compositionId, list));
227 executionMap.put(compositionId, result);
230 private void primeProcess(UUID compositionId, List<AutomationCompositionElementDefinition> list) {
232 listener.prime(compositionId, list);
233 executionMap.remove(compositionId);
234 } catch (PfModelException e) {
235 LOGGER.error("Composition Defintion prime failed {} {}", compositionId, e.getMessage());
236 intermediaryApi.updateCompositionState(compositionId, AcTypeState.COMMISSIONED, StateChangeResult.FAILED,
237 "Composition Defintion prime failed");
242 * Handles deprime a Composition Definition.
244 * @param messageId the messageId
245 * @param compositionId the compositionId
247 public void deprime(UUID messageId, UUID compositionId) {
248 cleanExecution(compositionId, messageId);
249 var result = executor.submit(() -> this.deprimeProcess(compositionId));
250 executionMap.put(compositionId, result);
253 private void deprimeProcess(UUID compositionId) {
255 listener.deprime(compositionId);
256 executionMap.remove(compositionId);
257 } catch (PfModelException e) {
258 LOGGER.error("Composition Defintion deprime failed {} {}", compositionId, e.getMessage());
259 intermediaryApi.updateCompositionState(compositionId, AcTypeState.PRIMED, StateChangeResult.FAILED,
260 "Composition Defintion deprime failed");
265 * Handles restarted scenario.
267 * @param messageId the messageId
268 * @param compositionId the compositionId
269 * @param list the list of AutomationCompositionElementDefinition
270 * @param state the state of the composition
271 * @param automationCompositionList list of ParticipantRestartAc
273 public void restarted(UUID messageId, UUID compositionId, List<AutomationCompositionElementDefinition> list,
274 AcTypeState state, List<ParticipantRestartAc> automationCompositionList) {
279 * Closes this stream and releases any system resources associated
280 * with it. If the stream is already closed then invoking this
281 * method has no effect.
283 * @throws IOException if an I/O error occurs
286 public void close() throws IOException {