Remove deprecated DMAAP dependency
[policy/drools-applications.git] / controlloop / common / eventmanager / src / main / java / org / onap / policy / controlloop / eventmanager / ControlLoopEventManager.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2024 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.controlloop.eventmanager;
23
24 import java.io.Serial;
25 import java.io.Serializable;
26 import java.time.Instant;
27 import java.util.Deque;
28 import java.util.HashMap;
29 import java.util.Map;
30 import java.util.UUID;
31 import java.util.concurrent.CompletableFuture;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.ConcurrentLinkedDeque;
34 import java.util.concurrent.Executor;
35 import java.util.concurrent.ExecutorService;
36 import java.util.concurrent.ForkJoinPool;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicLong;
39 import lombok.AccessLevel;
40 import lombok.Getter;
41 import lombok.ToString;
42 import org.onap.policy.controlloop.ControlLoopException;
43 import org.onap.policy.controlloop.ControlLoopOperation;
44 import org.onap.policy.controlloop.actorserviceprovider.ActorService;
45 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
46 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
47 import org.onap.policy.controlloop.drl.legacy.ControlLoopParams;
48 import org.onap.policy.controlloop.ophistory.OperationHistoryDataManager;
49 import org.onap.policy.controlloop.ophistory.OperationHistoryDataManagerStub;
50 import org.onap.policy.controlloop.processor.ControlLoopProcessor;
51 import org.onap.policy.drools.core.lock.LockCallback;
52 import org.onap.policy.drools.system.PolicyEngineConstants;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 /**
57  * Manager for a single event. Once this has been created, the event can be retracted from
58  * working memory. Invoke {@link #isActive()} to determine if the manager is active (i.e.,
59  * hasn't been replicated from another server). When the manager is no longer needed,
60  * {@link #destroy()} should be invoked.
61  */
62 @ToString(onlyExplicitlyIncluded = true)
63 public class ControlLoopEventManager implements StepContext, Serializable {
64
65     private static final Logger logger = LoggerFactory.getLogger(ControlLoopEventManager.class);
66     @Serial
67     private static final long serialVersionUID = -1216568161322872641L;
68
69     /**
70      * Data manager used when the policy engine's guard.disabled property is "true".
71      */
72     private static final OperationHistoryDataManager STUB_DATA_MANAGER = new OperationHistoryDataManagerStub();
73
74     public static final String GUARD_DISABLED_PROPERTY = "guard.disabled";
75
76     /**
77      * Counts the number of these objects that have been created. This is used by junit
78      * tests.
79      */
80     private static final AtomicLong createCount = new AtomicLong(0);
81
82     /**
83      * {@code True} if this object was created by this JVM instance, {@code false}
84      * otherwise. This will be {@code false} if this object is reconstituted from a
85      * persistent store or by transfer from another server.
86      */
87     private final transient boolean createdByThisJvmInstance;
88
89     private final transient EventManagerServices services;
90
91     @Getter
92     @ToString.Include
93     public final String closedLoopControlName;
94     @Getter
95     @ToString.Include
96     private final UUID requestId;
97
98     /**
99      * Time, in milliseconds, when the control loop will time out.
100      */
101     @Getter
102     private final long endTimeMs;
103
104     // fields extracted from the ControlLoopParams
105     @Getter
106     private final String policyName;
107     @Getter
108     private final String policyVersion;
109
110     /**
111      * Maps a target entity to its lock.
112      */
113     private final transient Map<String, LockData> target2lock = new HashMap<>();
114
115     @Getter(AccessLevel.PROTECTED)
116     private final ControlLoopProcessor processor;
117
118     /**
119      * Set of properties used while processing the event.
120      */
121     private final Map<String, Serializable> properties = new ConcurrentHashMap<>();
122
123     /**
124      * Unprocessed outcomes from the operations. Outcomes are added to this each time the
125      * "start" or "complete" callback is invoked, typically by an operation running in a
126      * background thread, thus it must be thread safe.
127      */
128     @Getter
129     private final transient Deque<OperationOutcome> outcomes = new ConcurrentLinkedDeque<>();
130
131
132     /**
133      * Constructs the object.
134      *
135      * @param services services the manager should use when processing the event
136      * @param params control loop parameters
137      * @param requestId event request ID
138      * @throws ControlLoopException if the event is invalid or if a YAML processor cannot
139      *         be created
140      */
141     public ControlLoopEventManager(EventManagerServices services, ControlLoopParams params, UUID requestId)
142                     throws ControlLoopException {
143
144         createCount.incrementAndGet();
145
146         this.createdByThisJvmInstance = true;
147         this.services = services;
148         this.closedLoopControlName = params.getClosedLoopControlName();
149         this.requestId = requestId;
150         this.policyName = params.getPolicyName();
151         this.policyVersion = params.getPolicyVersion();
152         this.processor = new ControlLoopProcessor(params.getToscaPolicy());
153         this.endTimeMs = System.currentTimeMillis() + detmControlLoopTimeoutMs();
154     }
155
156     /**
157      * Gets the number of manager objects that have been created.
158      *
159      * @return the number of manager objects that have been created
160      */
161     public static long getCreateCount() {
162         return createCount.get();
163     }
164
165     /**
166      * Determines if the manager is still active.
167      *
168      * @return {@code true} if the manager is still active, {@code false} otherwise
169      */
170     public boolean isActive() {
171         return createdByThisJvmInstance;
172     }
173
174     /**
175      * Cancels the current operation and frees all locks.
176      */
177     public void destroy() {
178         if (isActive()) {
179             getBlockingExecutor().execute(this::freeAllLocks);
180         }
181     }
182
183     /**
184      * Frees all locks.
185      */
186     private void freeAllLocks() {
187         target2lock.values().forEach(LockData::free);
188     }
189
190     /**
191      * Determines the overall control loop timeout.
192      *
193      * @return the policy timeout, in milliseconds, if specified, a default timeout
194      *         otherwise
195      */
196     private long detmControlLoopTimeoutMs() {
197         // validation checks preclude null or 0 timeout values in the policy
198         int timeout = processor.getPolicy().getProperties().getTimeout();
199         return TimeUnit.MILLISECONDS.convert(timeout, TimeUnit.SECONDS);
200     }
201
202     @Override
203     public synchronized CompletableFuture<OperationOutcome> requestLock(String targetEntity) {
204
205         long remainingMs = endTimeMs - System.currentTimeMillis();
206         int remainingSec = 15 + Math.max(0, (int) TimeUnit.SECONDS.convert(remainingMs, TimeUnit.MILLISECONDS));
207
208         LockData data = target2lock.computeIfAbsent(targetEntity, key -> {
209             var data2 = new LockData(key, requestId);
210             makeLock(targetEntity, requestId.toString(), remainingSec, data2);
211
212             data2.addUnavailableCallback(this::onComplete);
213
214             return data2;
215         });
216
217         return data.getFuture();
218     }
219
220     @Override
221     public synchronized CompletableFuture<OperationOutcome> releaseLock(String targetEntity) {
222         LockData data = target2lock.remove(targetEntity);
223
224         if (data == null) {
225             // lock did not exist - immediately return a success
226             OperationOutcome outcome = makeUnlockOutcome(targetEntity);
227             outcome.setEnd(outcome.getStart());
228             onComplete(outcome);
229
230             return CompletableFuture.completedFuture(outcome);
231         }
232
233         /*
234          * previous lock operation may not have completed yet, thus we tack the unlock
235          * operation onto it.
236          *
237          * Note: we must invoke free(), asynchronously (i.e., using whenCompleteAsync()),
238          * as it may block
239          */
240
241         return data.getFuture().whenCompleteAsync((lockOutcome, thrown) -> {
242
243             OperationOutcome outcome = makeUnlockOutcome(targetEntity);
244
245             try {
246                 data.free();
247
248             } catch (RuntimeException e) {
249                 logger.warn("failed to unlock {}", targetEntity, e);
250                 outcome.setResult(OperationResult.FAILURE_EXCEPTION);
251                 outcome.setMessage(ControlLoopOperation.FAILED_MSG + ": " + e.getMessage());
252             }
253
254             outcome.setEnd(Instant.now());
255             onComplete(outcome);
256
257         }, getBlockingExecutor());
258     }
259
260     private OperationOutcome makeUnlockOutcome(String targetEntity) {
261         var outcome = new OperationOutcome();
262         outcome.setActor(ActorConstants.LOCK_ACTOR);
263         outcome.setOperation(ActorConstants.UNLOCK_OPERATION);
264         outcome.setTarget(targetEntity);
265         outcome.setResult(OperationResult.SUCCESS);
266         outcome.setMessage(ControlLoopOperation.SUCCESS_MSG);
267         outcome.setFinalOutcome(true);
268         outcome.setStart(Instant.now());
269         return outcome;
270     }
271
272     public void onStart(OperationOutcome outcome) {
273         outcomes.add(outcome);
274     }
275
276     public void onComplete(OperationOutcome outcome) {
277         outcomes.add(outcome);
278     }
279
280     /**
281      * Determines if the context contains a property.
282      *
283      * @param name name of the property of interest
284      * @return {@code true} if the context contains the property, {@code false} otherwise
285      */
286     @Override
287     public boolean contains(String name) {
288         return properties.containsKey(name);
289     }
290
291     /**
292      * Gets a property, casting it to the desired type.
293      *
294      * @param <T> desired type
295      * @param name name of the property whose value is to be retrieved
296      * @return the property's value, or {@code null} if it does not yet have a value
297      */
298     @Override
299     @SuppressWarnings("unchecked")
300     public <T> T getProperty(String name) {
301         return (T) properties.get(name);
302     }
303
304     /**
305      * Sets a property's value.
306      *
307      * @param name property name
308      * @param value new property value
309      */
310     @Override
311     public void setProperty(String name, Serializable value) {
312         logger.info("set property {}={} manager={}", name, value, this);
313         properties.put(name, value);
314     }
315
316     /**
317      * Removes a property.
318      *
319      * @param name property name
320      */
321     @Override
322     public void removeProperty(String name) {
323         properties.remove(name);
324     }
325
326     // the following methods may be overridden by junit tests
327
328     public Executor getExecutor() {
329         return ForkJoinPool.commonPool();
330     }
331
332     protected ExecutorService getBlockingExecutor() {
333         return PolicyEngineConstants.getManager().getExecutorService();
334     }
335
336     protected void makeLock(String targetEntity, String requestId, int holdSec, LockCallback callback) {
337         PolicyEngineConstants.getManager().createLock(targetEntity, requestId, holdSec, callback, false);
338     }
339
340     public ActorService getActorService() {
341         return services.getActorService();
342     }
343
344     public OperationHistoryDataManager getDataManager() {
345         boolean guardDisabled = "true".equalsIgnoreCase(getEnvironmentProperty(GUARD_DISABLED_PROPERTY));
346         return (guardDisabled ? STUB_DATA_MANAGER : services.getDataManager());
347     }
348
349     protected String getEnvironmentProperty(String propName) {
350         return PolicyEngineConstants.getManager().getEnvironmentProperty(propName);
351     }
352 }