Merge "Refactor policy/drools-applications csit tests"
[policy/drools-applications.git] / controlloop / common / eventmanager / src / main / java / org / onap / policy / controlloop / eventmanager / ControlLoopEventManager.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.eventmanager;
22
23 import java.io.Serializable;
24 import java.time.Instant;
25 import java.util.Deque;
26 import java.util.HashMap;
27 import java.util.Map;
28 import java.util.UUID;
29 import java.util.concurrent.CompletableFuture;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.ConcurrentLinkedDeque;
32 import java.util.concurrent.Executor;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.ForkJoinPool;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicLong;
37 import lombok.AccessLevel;
38 import lombok.Getter;
39 import lombok.ToString;
40 import org.onap.policy.controlloop.ControlLoopException;
41 import org.onap.policy.controlloop.ControlLoopOperation;
42 import org.onap.policy.controlloop.actorserviceprovider.ActorService;
43 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
44 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
45 import org.onap.policy.controlloop.drl.legacy.ControlLoopParams;
46 import org.onap.policy.controlloop.ophistory.OperationHistoryDataManager;
47 import org.onap.policy.controlloop.ophistory.OperationHistoryDataManagerStub;
48 import org.onap.policy.controlloop.processor.ControlLoopProcessor;
49 import org.onap.policy.drools.core.lock.LockCallback;
50 import org.onap.policy.drools.system.PolicyEngineConstants;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 /**
55  * Manager for a single event. Once this has been created, the event can be retracted from
56  * working memory. Invoke {@link #isActive()} to determine if the manager is active (i.e.,
57  * hasn't been replicated from another server). When the manager is no longer needed,
58  * {@link #destroy()} should be invoked.
59  */
60 @ToString(onlyExplicitlyIncluded = true)
61 public class ControlLoopEventManager implements StepContext, Serializable {
62
63     private static final Logger logger = LoggerFactory.getLogger(ControlLoopEventManager.class);
64     private static final long serialVersionUID = -1216568161322872641L;
65
66     /**
67      * Data manager used when the policy engine's guard.disabled property is "true".
68      */
69     private static final OperationHistoryDataManager STUB_DATA_MANAGER = new OperationHistoryDataManagerStub();
70
71     private static final String GUARD_DISABLED_PROPERTY = "guard.disabled";
72
73     /**
74      * Counts the number of these objects that have been created. This is used by junit
75      * tests.
76      */
77     private static final AtomicLong createCount = new AtomicLong(0);
78
79     /**
80      * {@code True} if this object was created by this JVM instance, {@code false}
81      * otherwise. This will be {@code false} if this object is reconstituted from a
82      * persistent store or by transfer from another server.
83      */
84     private transient boolean createdByThisJvmInstance;
85
86     private final transient EventManagerServices services;
87
88     @Getter
89     @ToString.Include
90     public final String closedLoopControlName;
91     @Getter
92     @ToString.Include
93     private final UUID requestId;
94
95     /**
96      * Time, in milliseconds, when the control loop will time out.
97      */
98     @Getter
99     private final long endTimeMs;
100
101     // fields extracted from the ControlLoopParams
102     @Getter
103     private final String policyName;
104     @Getter
105     private final String policyVersion;
106
107     /**
108      * Maps a target entity to its lock.
109      */
110     private final transient Map<String, LockData> target2lock = new HashMap<>();
111
112     @Getter(AccessLevel.PROTECTED)
113     private final ControlLoopProcessor processor;
114
115     /**
116      * Set of properties used while processing the event.
117      */
118     private Map<String, Serializable> properties = new ConcurrentHashMap<>();
119
120     /**
121      * Unprocessed outcomes from the operations. Outcomes are added to this each time the
122      * "start" or "complete" callback is invoked, typically by an operation running in a
123      * background thread, thus it must be thread safe.
124      */
125     @Getter
126     private final transient Deque<OperationOutcome> outcomes = new ConcurrentLinkedDeque<>();
127
128
129     /**
130      * Constructs the object.
131      *
132      * @param services services the manager should use when processing the event
133      * @param params control loop parameters
134      * @param requestId event request ID
135      * @throws ControlLoopException if the event is invalid or if a YAML processor cannot
136      *         be created
137      */
138     public ControlLoopEventManager(EventManagerServices services, ControlLoopParams params, UUID requestId)
139                     throws ControlLoopException {
140
141         createCount.incrementAndGet();
142
143         this.createdByThisJvmInstance = true;
144         this.services = services;
145         this.closedLoopControlName = params.getClosedLoopControlName();
146         this.requestId = requestId;
147         this.policyName = params.getPolicyName();
148         this.policyVersion = params.getPolicyVersion();
149         this.processor = new ControlLoopProcessor(params.getToscaPolicy());
150         this.endTimeMs = System.currentTimeMillis() + detmControlLoopTimeoutMs();
151     }
152
153     /**
154      * Gets the number of manager objects that have been created.
155      *
156      * @return the number of manager objects that have been created
157      */
158     public static long getCreateCount() {
159         return createCount.get();
160     }
161
162     /**
163      * Determines if the manager is still active.
164      *
165      * @return {@code true} if the manager is still active, {@code false} otherwise
166      */
167     public boolean isActive() {
168         return createdByThisJvmInstance;
169     }
170
171     /**
172      * Cancels the current operation and frees all locks.
173      */
174     public void destroy() {
175         if (isActive()) {
176             getBlockingExecutor().execute(this::freeAllLocks);
177         }
178     }
179
180     /**
181      * Frees all locks.
182      */
183     private void freeAllLocks() {
184         target2lock.values().forEach(LockData::free);
185     }
186
187     /**
188      * Determines the overall control loop timeout.
189      *
190      * @return the policy timeout, in milliseconds, if specified, a default timeout
191      *         otherwise
192      */
193     private long detmControlLoopTimeoutMs() {
194         // validation checks preclude null or 0 timeout values in the policy
195         Integer timeout = processor.getPolicy().getProperties().getTimeout();
196         return TimeUnit.MILLISECONDS.convert(timeout, TimeUnit.SECONDS);
197     }
198
199     @Override
200     public synchronized CompletableFuture<OperationOutcome> requestLock(String targetEntity) {
201
202         long remainingMs = endTimeMs - System.currentTimeMillis();
203         int remainingSec = 15 + Math.max(0, (int) TimeUnit.SECONDS.convert(remainingMs, TimeUnit.MILLISECONDS));
204
205         LockData data = target2lock.computeIfAbsent(targetEntity, key -> {
206             var data2 = new LockData(key, requestId);
207             makeLock(targetEntity, requestId.toString(), remainingSec, data2);
208
209             data2.addUnavailableCallback(this::onComplete);
210
211             return data2;
212         });
213
214         return data.getFuture();
215     }
216
217     @Override
218     public synchronized CompletableFuture<OperationOutcome> releaseLock(String targetEntity) {
219         LockData data = target2lock.remove(targetEntity);
220
221         if (data == null) {
222             // lock did not exist - immediately return a success
223             OperationOutcome outcome = makeUnlockOutcome(targetEntity);
224             outcome.setEnd(outcome.getStart());
225             onComplete(outcome);
226
227             return CompletableFuture.completedFuture(outcome);
228         }
229
230         /*
231          * previous lock operation may not have completed yet, thus we tack the unlock
232          * operation onto it.
233          *
234          * Note: we must invoke free(), asynchronously (i.e., using whenCompleteAsync()),
235          * as it may block
236          */
237
238         return data.getFuture().whenCompleteAsync((lockOutcome, thrown) -> {
239
240             OperationOutcome outcome = makeUnlockOutcome(targetEntity);
241
242             try {
243                 data.free();
244
245             } catch (RuntimeException e) {
246                 logger.warn("failed to unlock {}", targetEntity, e);
247                 outcome.setResult(OperationResult.FAILURE_EXCEPTION);
248                 outcome.setMessage(ControlLoopOperation.FAILED_MSG + ": " + e.getMessage());
249             }
250
251             outcome.setEnd(Instant.now());
252             onComplete(outcome);
253
254         }, getBlockingExecutor());
255     }
256
257     private OperationOutcome makeUnlockOutcome(String targetEntity) {
258         var outcome = new OperationOutcome();
259         outcome.setActor(ActorConstants.LOCK_ACTOR);
260         outcome.setOperation(ActorConstants.UNLOCK_OPERATION);
261         outcome.setTarget(targetEntity);
262         outcome.setResult(OperationResult.SUCCESS);
263         outcome.setMessage(ControlLoopOperation.SUCCESS_MSG);
264         outcome.setFinalOutcome(true);
265         outcome.setStart(Instant.now());
266         return outcome;
267     }
268
269     public void onStart(OperationOutcome outcome) {
270         outcomes.add(outcome);
271     }
272
273     public void onComplete(OperationOutcome outcome) {
274         outcomes.add(outcome);
275     }
276
277     /**
278      * Determines if the context contains a property.
279      *
280      * @param name name of the property of interest
281      * @return {@code true} if the context contains the property, {@code false} otherwise
282      */
283     @Override
284     public boolean contains(String name) {
285         return properties.containsKey(name);
286     }
287
288     /**
289      * Gets a property, casting it to the desired type.
290      *
291      * @param <T> desired type
292      * @param name name of the property whose value is to be retrieved
293      * @return the property's value, or {@code null} if it does not yet have a value
294      */
295     @Override
296     @SuppressWarnings("unchecked")
297     public <T> T getProperty(String name) {
298         return (T) properties.get(name);
299     }
300
301     /**
302      * Sets a property's value.
303      *
304      * @param name property name
305      * @param value new property value
306      */
307     @Override
308     public void setProperty(String name, Serializable value) {
309         logger.info("set property {}={} manager={}", name, value, this);
310         properties.put(name, value);
311     }
312
313     /**
314      * Removes a property.
315      *
316      * @param name property name
317      */
318     @Override
319     public void removeProperty(String name) {
320         properties.remove(name);
321     }
322
323     // the following methods may be overridden by junit tests
324
325     public Executor getExecutor() {
326         return ForkJoinPool.commonPool();
327     }
328
329     protected ExecutorService getBlockingExecutor() {
330         return PolicyEngineConstants.getManager().getExecutorService();
331     }
332
333     protected void makeLock(String targetEntity, String requestId, int holdSec, LockCallback callback) {
334         PolicyEngineConstants.getManager().createLock(targetEntity, requestId, holdSec, callback, false);
335     }
336
337     public ActorService getActorService() {
338         return services.getActorService();
339     }
340
341     public OperationHistoryDataManager getDataManager() {
342         boolean guardDisabled = "true".equalsIgnoreCase(getEnvironmentProperty(GUARD_DISABLED_PROPERTY));
343         return (guardDisabled ? STUB_DATA_MANAGER : services.getDataManager());
344     }
345
346     protected String getEnvironmentProperty(String propName) {
347         return PolicyEngineConstants.getManager().getEnvironmentProperty(propName);
348     }
349 }