Make drools-pdp work with drools and java versions compatible
[policy/drools-applications.git] / controlloop / common / eventmanager / src / main / java / org / onap / policy / controlloop / eventmanager / ControlLoopEventManager.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2024 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.controlloop.eventmanager;
23
24 import java.io.Serializable;
25 import java.time.Instant;
26 import java.util.Deque;
27 import java.util.HashMap;
28 import java.util.Map;
29 import java.util.UUID;
30 import java.util.concurrent.CompletableFuture;
31 import java.util.concurrent.ConcurrentHashMap;
32 import java.util.concurrent.ConcurrentLinkedDeque;
33 import java.util.concurrent.Executor;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.ForkJoinPool;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicLong;
38 import lombok.AccessLevel;
39 import lombok.Getter;
40 import lombok.ToString;
41 import org.onap.policy.controlloop.ControlLoopException;
42 import org.onap.policy.controlloop.ControlLoopOperation;
43 import org.onap.policy.controlloop.actorserviceprovider.ActorService;
44 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
45 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
46 import org.onap.policy.controlloop.drl.legacy.ControlLoopParams;
47 import org.onap.policy.controlloop.ophistory.OperationHistoryDataManager;
48 import org.onap.policy.controlloop.ophistory.OperationHistoryDataManagerStub;
49 import org.onap.policy.controlloop.processor.ControlLoopProcessor;
50 import org.onap.policy.drools.core.lock.LockCallback;
51 import org.onap.policy.drools.system.PolicyEngineConstants;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 /**
56  * Manager for a single event. Once this has been created, the event can be retracted from
57  * working memory. Invoke {@link #isActive()} to determine if the manager is active (i.e.,
58  * hasn't been replicated from another server). When the manager is no longer needed,
59  * {@link #destroy()} should be invoked.
60  */
61 @ToString(onlyExplicitlyIncluded = true)
62 public class ControlLoopEventManager implements StepContext, Serializable {
63
64     private static final Logger logger = LoggerFactory.getLogger(ControlLoopEventManager.class);
65     private static final long serialVersionUID = -1216568161322872641L;
66
67     /**
68      * Data manager used when the policy engine's guard.disabled property is "true".
69      */
70     private static final OperationHistoryDataManager STUB_DATA_MANAGER = new OperationHistoryDataManagerStub();
71
72     public static final String GUARD_DISABLED_PROPERTY = "guard.disabled";
73
74     /**
75      * Counts the number of these objects that have been created. This is used by junit
76      * tests.
77      */
78     private static final AtomicLong createCount = new AtomicLong(0);
79
80     /**
81      * {@code True} if this object was created by this JVM instance, {@code false}
82      * otherwise. This will be {@code false} if this object is reconstituted from a
83      * persistent store or by transfer from another server.
84      */
85     private final transient boolean createdByThisJvmInstance;
86
87     private final transient EventManagerServices services;
88
89     @Getter
90     @ToString.Include
91     public final String closedLoopControlName;
92     @Getter
93     @ToString.Include
94     private final UUID requestId;
95
96     /**
97      * Time, in milliseconds, when the control loop will time out.
98      */
99     @Getter
100     private final long endTimeMs;
101
102     // fields extracted from the ControlLoopParams
103     @Getter
104     private final String policyName;
105     @Getter
106     private final String policyVersion;
107
108     /**
109      * Maps a target entity to its lock.
110      */
111     private final transient Map<String, LockData> target2lock = new HashMap<>();
112
113     @Getter(AccessLevel.PROTECTED)
114     private final ControlLoopProcessor processor;
115
116     /**
117      * Set of properties used while processing the event.
118      */
119     private final Map<String, Serializable> properties = new ConcurrentHashMap<>();
120
121     /**
122      * Unprocessed outcomes from the operations. Outcomes are added to this each time the
123      * "start" or "complete" callback is invoked, typically by an operation running in a
124      * background thread, thus it must be thread safe.
125      */
126     @Getter
127     private final transient Deque<OperationOutcome> outcomes = new ConcurrentLinkedDeque<>();
128
129
130     /**
131      * Constructs the object.
132      *
133      * @param services services the manager should use when processing the event
134      * @param params control loop parameters
135      * @param requestId event request ID
136      * @throws ControlLoopException if the event is invalid or if a YAML processor cannot
137      *         be created
138      */
139     public ControlLoopEventManager(EventManagerServices services, ControlLoopParams params, UUID requestId)
140                     throws ControlLoopException {
141
142         createCount.incrementAndGet();
143
144         this.createdByThisJvmInstance = true;
145         this.services = services;
146         this.closedLoopControlName = params.getClosedLoopControlName();
147         this.requestId = requestId;
148         this.policyName = params.getPolicyName();
149         this.policyVersion = params.getPolicyVersion();
150         this.processor = new ControlLoopProcessor(params.getToscaPolicy());
151         this.endTimeMs = System.currentTimeMillis() + detmControlLoopTimeoutMs();
152     }
153
154     /**
155      * Gets the number of manager objects that have been created.
156      *
157      * @return the number of manager objects that have been created
158      */
159     public static long getCreateCount() {
160         return createCount.get();
161     }
162
163     /**
164      * Determines if the manager is still active.
165      *
166      * @return {@code true} if the manager is still active, {@code false} otherwise
167      */
168     public boolean isActive() {
169         return createdByThisJvmInstance;
170     }
171
172     /**
173      * Cancels the current operation and frees all locks.
174      */
175     public void destroy() {
176         if (isActive()) {
177             getBlockingExecutor().execute(this::freeAllLocks);
178         }
179     }
180
181     /**
182      * Frees all locks.
183      */
184     private void freeAllLocks() {
185         target2lock.values().forEach(LockData::free);
186     }
187
188     /**
189      * Determines the overall control loop timeout.
190      *
191      * @return the policy timeout, in milliseconds, if specified, a default timeout
192      *         otherwise
193      */
194     private long detmControlLoopTimeoutMs() {
195         // validation checks preclude null or 0 timeout values in the policy
196         int timeout = processor.getPolicy().getProperties().getTimeout();
197         return TimeUnit.MILLISECONDS.convert(timeout, TimeUnit.SECONDS);
198     }
199
200     @Override
201     public synchronized CompletableFuture<OperationOutcome> requestLock(String targetEntity) {
202
203         long remainingMs = endTimeMs - System.currentTimeMillis();
204         int remainingSec = 15 + Math.max(0, (int) TimeUnit.SECONDS.convert(remainingMs, TimeUnit.MILLISECONDS));
205
206         LockData data = target2lock.computeIfAbsent(targetEntity, key -> {
207             var data2 = new LockData(key, requestId);
208             makeLock(targetEntity, requestId.toString(), remainingSec, data2);
209
210             data2.addUnavailableCallback(this::onComplete);
211
212             return data2;
213         });
214
215         return data.getFuture();
216     }
217
218     @Override
219     public synchronized CompletableFuture<OperationOutcome> releaseLock(String targetEntity) {
220         LockData data = target2lock.remove(targetEntity);
221
222         if (data == null) {
223             // lock did not exist - immediately return a success
224             OperationOutcome outcome = makeUnlockOutcome(targetEntity);
225             outcome.setEnd(outcome.getStart());
226             onComplete(outcome);
227
228             return CompletableFuture.completedFuture(outcome);
229         }
230
231         /*
232          * previous lock operation may not have completed yet, thus we tack the unlock
233          * operation onto it.
234          *
235          * Note: we must invoke free(), asynchronously (i.e., using whenCompleteAsync()),
236          * as it may block
237          */
238
239         return data.getFuture().whenCompleteAsync((lockOutcome, thrown) -> {
240
241             OperationOutcome outcome = makeUnlockOutcome(targetEntity);
242
243             try {
244                 data.free();
245
246             } catch (RuntimeException e) {
247                 logger.warn("failed to unlock {}", targetEntity, e);
248                 outcome.setResult(OperationResult.FAILURE_EXCEPTION);
249                 outcome.setMessage(ControlLoopOperation.FAILED_MSG + ": " + e.getMessage());
250             }
251
252             outcome.setEnd(Instant.now());
253             onComplete(outcome);
254
255         }, getBlockingExecutor());
256     }
257
258     private OperationOutcome makeUnlockOutcome(String targetEntity) {
259         var outcome = new OperationOutcome();
260         outcome.setActor(ActorConstants.LOCK_ACTOR);
261         outcome.setOperation(ActorConstants.UNLOCK_OPERATION);
262         outcome.setTarget(targetEntity);
263         outcome.setResult(OperationResult.SUCCESS);
264         outcome.setMessage(ControlLoopOperation.SUCCESS_MSG);
265         outcome.setFinalOutcome(true);
266         outcome.setStart(Instant.now());
267         return outcome;
268     }
269
270     public void onStart(OperationOutcome outcome) {
271         outcomes.add(outcome);
272     }
273
274     public void onComplete(OperationOutcome outcome) {
275         outcomes.add(outcome);
276     }
277
278     /**
279      * Determines if the context contains a property.
280      *
281      * @param name name of the property of interest
282      * @return {@code true} if the context contains the property, {@code false} otherwise
283      */
284     @Override
285     public boolean contains(String name) {
286         return properties.containsKey(name);
287     }
288
289     /**
290      * Gets a property, casting it to the desired type.
291      *
292      * @param <T> desired type
293      * @param name name of the property whose value is to be retrieved
294      * @return the property's value, or {@code null} if it does not yet have a value
295      */
296     @Override
297     @SuppressWarnings("unchecked")
298     public <T> T getProperty(String name) {
299         return (T) properties.get(name);
300     }
301
302     /**
303      * Sets a property's value.
304      *
305      * @param name property name
306      * @param value new property value
307      */
308     @Override
309     public void setProperty(String name, Serializable value) {
310         logger.info("set property {}={} manager={}", name, value, this);
311         properties.put(name, value);
312     }
313
314     /**
315      * Removes a property.
316      *
317      * @param name property name
318      */
319     @Override
320     public void removeProperty(String name) {
321         properties.remove(name);
322     }
323
324     // the following methods may be overridden by junit tests
325
326     public Executor getExecutor() {
327         return ForkJoinPool.commonPool();
328     }
329
330     protected ExecutorService getBlockingExecutor() {
331         return PolicyEngineConstants.getManager().getExecutorService();
332     }
333
334     protected void makeLock(String targetEntity, String requestId, int holdSec, LockCallback callback) {
335         PolicyEngineConstants.getManager().createLock(targetEntity, requestId, holdSec, callback, false);
336     }
337
338     public ActorService getActorService() {
339         return services.getActorService();
340     }
341
342     public OperationHistoryDataManager getDataManager() {
343         boolean guardDisabled = "true".equalsIgnoreCase(getEnvironmentProperty(GUARD_DISABLED_PROPERTY));
344         return (guardDisabled ? STUB_DATA_MANAGER : services.getDataManager());
345     }
346
347     protected String getEnvironmentProperty(String propName) {
348         return PolicyEngineConstants.getManager().getEnvironmentProperty(propName);
349     }
350 }