2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.clamp.acm.participant.intermediary.handler;
23 import io.opentelemetry.context.Context;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import lombok.RequiredArgsConstructor;
27 import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantMessagePublisher;
28 import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.AutomationCompositionMsg;
29 import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.CacheProvider;
30 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantReqSync;
31 import org.springframework.stereotype.Component;
34 @RequiredArgsConstructor
35 public class MsgExecutor {
37 private final ExecutorService executor = Context.taskWrapping(Executors.newSingleThreadExecutor());
39 private final CacheProvider cacheProvider;
40 private final ParticipantMessagePublisher publisher;
43 * Execute the message if all data are present or put on Hold if something is missing.
45 * @param message the message
47 public void execute(AutomationCompositionMsg<?> message) {
48 if (validExecution(message)) {
51 cacheProvider.getMessagesOnHold().put(message.getKey(), message);
52 var participantReqSync = new ParticipantReqSync();
53 participantReqSync.setParticipantId(cacheProvider.getParticipantId());
54 participantReqSync.setReplicaId(cacheProvider.getReplicaId());
55 participantReqSync.setCompositionId(message.getCompositionId());
56 participantReqSync.setAutomationCompositionId(message.getInstanceId());
57 participantReqSync.setCompositionTargetId(message.getCompositionTargetId());
58 publisher.sendParticipantReqSync(participantReqSync);
63 * Check if messages on hold can be executed.
66 executor.submit(this::checkAndExecute);
69 private void checkAndExecute() {
70 var executable = cacheProvider.getMessagesOnHold().values().stream()
71 .filter(this::validExecution).toList();
72 executable.forEach(AutomationCompositionMsg::execute);
73 executable.forEach(msg -> cacheProvider.getMessagesOnHold().remove(msg.getKey()));
76 private boolean validExecution(AutomationCompositionMsg<?> message) {
78 if (message.getCompositionId() != null) {
79 var valid = cacheProvider.isCompositionDefinitionUpdated(message.getCompositionId(),
80 message.getRevisionIdComposition());
82 message.setCompositionId(null);
83 message.setRevisionIdComposition(null);
88 if (message.getCompositionTargetId() != null) {
89 var valid = cacheProvider.isCompositionDefinitionUpdated(message.getCompositionTargetId(),
90 message.getRevisionIdCompositionTarget());
92 message.setCompositionTargetId(null);
93 message.setRevisionIdCompositionTarget(null);
98 if (message.getInstanceId() != null) {
99 var valid = cacheProvider.isInstanceUpdated(message.getInstanceId(), message.getRevisionIdInstance());
101 message.setInstanceId(null);
102 message.setRevisionIdInstance(null);