2  * ============LICENSE_START=======================================================
 
   3  *  Copyright (C) 2023-2024 Nordix Foundation.
 
   4  * ================================================================================
 
   5  * Licensed under the Apache License, Version 2.0 (the "License");
 
   6  * you may not use this file except in compliance with the License.
 
   7  * You may obtain a copy of the License at
 
   9  *      http://www.apache.org/licenses/LICENSE-2.0
 
  11  * Unless required by applicable law or agreed to in writing, software
 
  12  * distributed under the License is distributed on an "AS IS" BASIS,
 
  13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  14  * See the License for the specific language governing permissions and
 
  15  * limitations under the License.
 
  17  * SPDX-License-Identifier: Apache-2.0
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.policy.clamp.acm.participant.intermediary.handler;
 
  23 import io.opentelemetry.context.Context;
 
  24 import java.io.Closeable;
 
  25 import java.io.IOException;
 
  26 import java.util.HashMap;
 
  27 import java.util.List;
 
  29 import java.util.UUID;
 
  30 import java.util.concurrent.ConcurrentHashMap;
 
  31 import java.util.concurrent.ExecutorService;
 
  32 import java.util.concurrent.Executors;
 
  33 import java.util.concurrent.Future;
 
  34 import lombok.RequiredArgsConstructor;
 
  35 import org.onap.policy.clamp.acm.participant.intermediary.api.AutomationCompositionElementListener;
 
  36 import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi;
 
  37 import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
 
  38 import org.onap.policy.clamp.models.acm.concepts.AcElementRestart;
 
  39 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 
  40 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElementDefinition;
 
  41 import org.onap.policy.clamp.models.acm.concepts.DeployState;
 
  42 import org.onap.policy.clamp.models.acm.concepts.LockState;
 
  43 import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc;
 
  44 import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
 
  45 import org.onap.policy.models.base.PfModelException;
 
  46 import org.slf4j.Logger;
 
  47 import org.slf4j.LoggerFactory;
 
  48 import org.springframework.stereotype.Component;
 
  51 @RequiredArgsConstructor
 
  52 public class ThreadHandler implements Closeable {
 
  53     private static final Logger LOGGER = LoggerFactory.getLogger(ThreadHandler.class);
 
  55     private final AutomationCompositionElementListener listener;
 
  56     private final ParticipantIntermediaryApi intermediaryApi;
 
  57     private final CacheProvider cacheProvider;
 
  59     private final Map<UUID, Future> executionMap = new ConcurrentHashMap<>();
 
  61     private final ExecutorService executor =
 
  62             Context.taskWrapping(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
 
  65      * Handle an update on a automation composition element.
 
  67      * @param messageId the messageId
 
  68      * @param instanceId the automationComposition Id
 
  69      * @param element the information on the automation composition element
 
  70      * @param properties properties Map
 
  72     public void deploy(UUID messageId, UUID instanceId, AcElementDeploy element, Map<String, Object> properties) {
 
  73         cleanExecution(element.getId(), messageId);
 
  74         var result = executor.submit(() -> this.deployProcess(instanceId, element, properties));
 
  75         executionMap.put(element.getId(), result);
 
  78     private void deployProcess(UUID instanceId, AcElementDeploy element, Map<String, Object> properties) {
 
  80             listener.deploy(instanceId, element, properties);
 
  81         } catch (PfModelException e) {
 
  82             LOGGER.error("Automation composition element deploy failed {} {}", instanceId, e.getMessage());
 
  83             intermediaryApi.updateAutomationCompositionElementState(instanceId, element.getId(), DeployState.UNDEPLOYED,
 
  84                     null, StateChangeResult.FAILED, "Automation composition element deploy failed");
 
  86         executionMap.remove(element.getId());
 
  90      * Handle a automation composition element state change.
 
  92      * @param messageId the messageId
 
  93      * @param instanceId the automationComposition Id
 
  94      * @param elementId the ID of the automation composition element
 
  96     public void undeploy(UUID messageId, UUID instanceId, UUID elementId) {
 
  97         cleanExecution(elementId, messageId);
 
  98         var result = executor.submit(() -> this.undeployProcess(instanceId, elementId));
 
  99         executionMap.put(elementId, result);
 
 102     private void undeployProcess(UUID instanceId, UUID elementId) {
 
 104             listener.undeploy(instanceId, elementId);
 
 105         } catch (PfModelException e) {
 
 106             LOGGER.error("Automation composition element undeploy failed {} {}", instanceId, e.getMessage());
 
 107             intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, DeployState.DEPLOYED, null,
 
 108                     StateChangeResult.FAILED, "Automation composition element undeploy failed");
 
 110         executionMap.remove(elementId);
 
 114      * Handle a automation composition element lock.
 
 116      * @param messageId the messageId
 
 117      * @param instanceId the automationComposition Id
 
 118      * @param elementId the ID of the automation composition element
 
 120     public void lock(UUID messageId, UUID instanceId, UUID elementId) {
 
 121         cleanExecution(elementId, messageId);
 
 122         var result = executor.submit(() -> this.lockProcess(instanceId, elementId));
 
 123         executionMap.put(elementId, result);
 
 126     private void lockProcess(UUID instanceId, UUID elementId) {
 
 128             listener.lock(instanceId, elementId);
 
 129         } catch (PfModelException e) {
 
 130             LOGGER.error("Automation composition element lock failed {} {}", instanceId, e.getMessage());
 
 131             intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, null, LockState.UNLOCKED,
 
 132                     StateChangeResult.FAILED, "Automation composition element lock failed");
 
 134         executionMap.remove(elementId);
 
 138      * Handle a automation composition element unlock.
 
 140      * @param messageId the messageId
 
 141      * @param instanceId the automationComposition Id
 
 142      * @param elementId the ID of the automation composition element
 
 144     public void unlock(UUID messageId, UUID instanceId, UUID elementId) {
 
 145         cleanExecution(elementId, messageId);
 
 146         var result = executor.submit(() -> this.unlockProcess(instanceId, elementId));
 
 147         executionMap.put(elementId, result);
 
 150     private void unlockProcess(UUID instanceId, UUID elementId) {
 
 152             listener.unlock(instanceId, elementId);
 
 153         } catch (PfModelException e) {
 
 154             LOGGER.error("Automation composition element unlock failed {} {}", instanceId, e.getMessage());
 
 155             intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, null, LockState.LOCKED,
 
 156                     StateChangeResult.FAILED, "Automation composition element unlock failed");
 
 158         executionMap.remove(elementId);
 
 162      * Handle a automation composition element delete.
 
 164      * @param messageId the messageId
 
 165      * @param instanceId the automationComposition Id
 
 166      * @param elementId the ID of the automation composition element
 
 168     public void delete(UUID messageId, UUID instanceId, UUID elementId) {
 
 169         cleanExecution(elementId, messageId);
 
 170         var result = executor.submit(() -> this.deleteProcess(instanceId, elementId));
 
 171         executionMap.put(elementId, result);
 
 174     private void deleteProcess(UUID instanceId, UUID elementId) {
 
 176             listener.delete(instanceId, elementId);
 
 177         } catch (PfModelException e) {
 
 178             LOGGER.error("Automation composition element delete failed {} {}", instanceId, e.getMessage());
 
 179             intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, DeployState.UNDEPLOYED, null,
 
 180                     StateChangeResult.FAILED, "Automation composition element delete failed");
 
 182         executionMap.remove(elementId);
 
 186      * Handle a automation composition element properties update.
 
 188      * @param messageId the messageId
 
 189      * @param instanceId the automationComposition Id
 
 190      * @param element the information on the automation composition element
 
 191      * @param properties properties Map
 
 193     public void update(UUID messageId, UUID instanceId, AcElementDeploy element, Map<String, Object> properties) {
 
 194         cleanExecution(element.getId(), messageId);
 
 195         var result = executor.submit(() -> this.updateProcess(instanceId, element, properties));
 
 196         executionMap.put(element.getId(), result);
 
 199     private void updateProcess(UUID instanceId, AcElementDeploy element, Map<String, Object> properties) {
 
 201             listener.update(instanceId, element, properties);
 
 202         } catch (PfModelException e) {
 
 203             LOGGER.error("Automation composition element update failed {} {}", instanceId, e.getMessage());
 
 204             intermediaryApi.updateAutomationCompositionElementState(instanceId, element.getId(), DeployState.DEPLOYED,
 
 205                     null, StateChangeResult.FAILED, "Automation composition element update failed");
 
 207         executionMap.remove(element.getId());
 
 210     private void cleanExecution(UUID execIdentificationId, UUID messageId) {
 
 211         var process = executionMap.get(execIdentificationId);
 
 212         if (process != null) {
 
 213             if (!process.isDone()) {
 
 214                 process.cancel(true);
 
 216             executionMap.remove(execIdentificationId);
 
 218         cacheProvider.getMsgIdentification().put(execIdentificationId, messageId);
 
 222      * Handles prime a Composition Definition.
 
 224      * @param messageId the messageId
 
 225      * @param compositionId the compositionId
 
 226      * @param list the list of AutomationCompositionElementDefinition
 
 228     public void prime(UUID messageId, UUID compositionId, List<AutomationCompositionElementDefinition> list) {
 
 229         cleanExecution(compositionId, messageId);
 
 230         var result = executor.submit(() -> this.primeProcess(compositionId, list));
 
 231         executionMap.put(compositionId, result);
 
 234     private void primeProcess(UUID compositionId, List<AutomationCompositionElementDefinition> list) {
 
 236             listener.prime(compositionId, list);
 
 237             executionMap.remove(compositionId);
 
 238         } catch (PfModelException e) {
 
 239             LOGGER.error("Composition Defintion prime failed {} {}", compositionId, e.getMessage());
 
 240             intermediaryApi.updateCompositionState(compositionId, AcTypeState.COMMISSIONED, StateChangeResult.FAILED,
 
 241                     "Composition Defintion prime failed");
 
 246      * Handles deprime a Composition Definition.
 
 248      * @param messageId the messageId
 
 249      * @param compositionId the compositionId
 
 251     public void deprime(UUID messageId, UUID compositionId) {
 
 252         cleanExecution(compositionId, messageId);
 
 253         var result = executor.submit(() -> this.deprimeProcess(compositionId));
 
 254         executionMap.put(compositionId, result);
 
 257     private void deprimeProcess(UUID compositionId) {
 
 259             listener.deprime(compositionId);
 
 260             executionMap.remove(compositionId);
 
 261         } catch (PfModelException e) {
 
 262             LOGGER.error("Composition Defintion deprime failed {} {}", compositionId, e.getMessage());
 
 263             intermediaryApi.updateCompositionState(compositionId, AcTypeState.PRIMED, StateChangeResult.FAILED,
 
 264                     "Composition Defintion deprime failed");
 
 269      * Handles restarted scenario.
 
 271      * @param messageId the messageId
 
 272      * @param compositionId the compositionId
 
 273      * @param list the list of AutomationCompositionElementDefinition
 
 274      * @param state the state of the composition
 
 275      * @param automationCompositionList list of ParticipantRestartAc
 
 277     public void restarted(UUID messageId, UUID compositionId, List<AutomationCompositionElementDefinition> list,
 
 278             AcTypeState state, List<ParticipantRestartAc> automationCompositionList) {
 
 280             listener.handleRestartComposition(compositionId, list, state);
 
 281         } catch (PfModelException e) {
 
 282             LOGGER.error("Composition Defintion restarted failed {} {}", compositionId, e.getMessage());
 
 283             intermediaryApi.updateCompositionState(compositionId, state, StateChangeResult.FAILED,
 
 284                     "Composition Defintion restarted failed");
 
 287         for (var automationComposition : automationCompositionList) {
 
 288             for (var element : automationComposition.getAcElementList()) {
 
 289                 cleanExecution(element.getId(), messageId);
 
 290                 var result = executor.submit(() -> this
 
 291                         .restartedInstanceProcess(automationComposition.getAutomationCompositionId(), element));
 
 292                 executionMap.put(element.getId(), result);
 
 297     private void restartedInstanceProcess(UUID instanceId, AcElementRestart element) {
 
 299             var map = new HashMap<>(cacheProvider.getCommonProperties(instanceId, element.getId()));
 
 300             map.putAll(element.getProperties());
 
 302             listener.handleRestartInstance(instanceId, getAcElementDeploy(element), map, element.getDeployState(),
 
 303                     element.getLockState());
 
 304             executionMap.remove(element.getId());
 
 305         } catch (PfModelException e) {
 
 306             LOGGER.error("Automation composition element deploy failed {} {}", instanceId, e.getMessage());
 
 307             intermediaryApi.updateAutomationCompositionElementState(instanceId, element.getId(),
 
 308                     element.getDeployState(), element.getLockState(), StateChangeResult.FAILED,
 
 309                     "Automation composition element restart failed");
 
 313     private AcElementDeploy getAcElementDeploy(AcElementRestart element) {
 
 314         var acElementDeploy = new AcElementDeploy();
 
 315         acElementDeploy.setId(element.getId());
 
 316         acElementDeploy.setDefinition(element.getDefinition());
 
 317         acElementDeploy.setProperties(element.getProperties());
 
 318         acElementDeploy.setToscaServiceTemplateFragment(element.getToscaServiceTemplateFragment());
 
 319         return acElementDeploy;
 
 323      * Closes this stream and releases any system resources associated
 
 324      * with it. If the stream is already closed then invoking this
 
 325      * method has no effect.
 
 327      * @throws IOException if an I/O error occurs
 
 330     public void close() throws IOException {
 
 335      * Handles AutomationComposition Migration.
 
 337      * @param messageId the messageId
 
 338      * @param instanceId the automationComposition Id
 
 339      * @param element the information on the automation composition element
 
 340      * @param compositionTargetId the composition to migrate
 
 342     public void migrate(UUID messageId, UUID instanceId, AcElementDeploy element, UUID compositionTargetId,
 
 343             Map<String, Object> properties) {
 
 344         cleanExecution(element.getId(), messageId);
 
 345         var result = executor.submit(() -> this.migrateProcess(instanceId, element, compositionTargetId, properties));
 
 346         executionMap.put(element.getId(), result);
 
 349     private void migrateProcess(UUID instanceId, AcElementDeploy element, UUID compositionTargetId,
 
 350             Map<String, Object> properties) {
 
 352             listener.migrate(instanceId, element, compositionTargetId, properties);
 
 353         } catch (PfModelException e) {
 
 354             LOGGER.error("Automation composition element migrate failed {} {}", instanceId, e.getMessage());
 
 355             intermediaryApi.updateAutomationCompositionElementState(instanceId, element.getId(), DeployState.DEPLOYED,
 
 356                     null, StateChangeResult.FAILED, "Automation composition element migrate failed");
 
 358         executionMap.remove(element.getId());