2c7e133af3d1327ef2cd4109901b01fb4fa21996
[policy/drools-applications.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.eventmanager;
22
23 import java.io.Serializable;
24 import java.time.Instant;
25 import java.util.Deque;
26 import java.util.HashMap;
27 import java.util.Map;
28 import java.util.UUID;
29 import java.util.concurrent.CompletableFuture;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.ConcurrentLinkedDeque;
32 import java.util.concurrent.Executor;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.ForkJoinPool;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicLong;
37 import lombok.AccessLevel;
38 import lombok.Getter;
39 import lombok.ToString;
40 import org.onap.policy.controlloop.ControlLoopException;
41 import org.onap.policy.controlloop.ControlLoopOperation;
42 import org.onap.policy.controlloop.actorserviceprovider.ActorService;
43 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
44 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
45 import org.onap.policy.controlloop.drl.legacy.ControlLoopParams;
46 import org.onap.policy.controlloop.ophistory.OperationHistoryDataManager;
47 import org.onap.policy.controlloop.ophistory.OperationHistoryDataManagerStub;
48 import org.onap.policy.controlloop.processor.ControlLoopProcessor;
49 import org.onap.policy.drools.core.lock.LockCallback;
50 import org.onap.policy.drools.system.PolicyEngineConstants;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 /**
55  * Manager for a single event. Once this has been created, the event can be retracted from
56  * working memory. Invoke {@link #isActive()} to determine if the manager is active (i.e.,
57  * hasn't been replicated from another server). When the manager is no longer needed,
58  * {@link #destroy()} should be invoked.
59  */
60 @ToString(onlyExplicitlyIncluded = true)
61 public class ControlLoopEventManager implements StepContext, Serializable {
62
63     private static final Logger logger = LoggerFactory.getLogger(ControlLoopEventManager.class);
64     private static final long serialVersionUID = -1216568161322872641L;
65
66     /**
67      * Data manager used when the policy engine's guard.disabled property is "true".
68      */
69     private static final OperationHistoryDataManager STUB_DATA_MANAGER = new OperationHistoryDataManagerStub();
70
71     private static final String GUARD_DISABLED_PROPERTY = "guard.disabled";
72     private static final String EVENT_MANAGER_SERVICE_CONFIG = "event-manager";
73
74     /**
75      * Counts the number of these objects that have been created. This is used by junit
76      * tests.
77      */
78     private static final AtomicLong createCount = new AtomicLong(0);
79
80     /**
81      * {@code True} if this object was created by this JVM instance, {@code false}
82      * otherwise. This will be {@code false} if this object is reconstituted from a
83      * persistent store or by transfer from another server.
84      */
85     private transient boolean createdByThisJvmInstance;
86
87     @Getter
88     @ToString.Include
89     public final String closedLoopControlName;
90     @Getter
91     @ToString.Include
92     private final UUID requestId;
93
94     /**
95      * Time, in milliseconds, when the control loop will time out.
96      */
97     @Getter
98     private final long endTimeMs;
99
100     // fields extracted from the ControlLoopParams
101     @Getter
102     private final String policyName;
103     @Getter
104     private final String policyVersion;
105
106     /**
107      * Maps a target entity to its lock.
108      */
109     private final transient Map<String, LockData> target2lock = new HashMap<>();
110
111     @Getter(AccessLevel.PROTECTED)
112     private final ControlLoopProcessor processor;
113
114     /**
115      * Set of properties used while processing the event.
116      */
117     private Map<String, Serializable> properties = new ConcurrentHashMap<>();
118
119     /**
120      * Unprocessed outcomes from the operations. Outcomes are added to this each time the
121      * "start" or "complete" callback is invoked, typically by an operation running in a
122      * background thread, thus it must be thread safe.
123      */
124     @Getter
125     private final transient Deque<OperationOutcome> outcomes = new ConcurrentLinkedDeque<>();
126
127
128     /**
129      * Constructs the object.
130      *
131      * @param params control loop parameters
132      * @param requestId event request ID
133      * @throws ControlLoopException if the event is invalid or if a YAML processor cannot
134      *         be created
135      */
136     public ControlLoopEventManager(ControlLoopParams params, UUID requestId) throws ControlLoopException {
137
138         createCount.incrementAndGet();
139
140         this.createdByThisJvmInstance = true;
141         this.closedLoopControlName = params.getClosedLoopControlName();
142         this.requestId = requestId;
143         this.policyName = params.getPolicyName();
144         this.policyVersion = params.getPolicyVersion();
145         this.processor = new ControlLoopProcessor(params.getToscaPolicy());
146         this.endTimeMs = System.currentTimeMillis() + detmControlLoopTimeoutMs();
147     }
148
149     /**
150      * Gets the number of manager objects that have been created.
151      *
152      * @return the number of manager objects that have been created
153      */
154     public static long getCreateCount() {
155         return createCount.get();
156     }
157
158     /**
159      * Determines if the manager is still active.
160      *
161      * @return {@code true} if the manager is still active, {@code false} otherwise
162      */
163     public boolean isActive() {
164         return createdByThisJvmInstance;
165     }
166
167     /**
168      * Cancels the current operation and frees all locks.
169      */
170     public void destroy() {
171         if (isActive()) {
172             getBlockingExecutor().execute(this::freeAllLocks);
173         }
174     }
175
176     /**
177      * Frees all locks.
178      */
179     private void freeAllLocks() {
180         target2lock.values().forEach(LockData::free);
181     }
182
183     /**
184      * Determines the overall control loop timeout.
185      *
186      * @return the policy timeout, in milliseconds, if specified, a default timeout
187      *         otherwise
188      */
189     private long detmControlLoopTimeoutMs() {
190         // validation checks preclude null or 0 timeout values in the policy
191         Integer timeout = processor.getPolicy().getProperties().getTimeout();
192         return TimeUnit.MILLISECONDS.convert(timeout, TimeUnit.SECONDS);
193     }
194
195     @Override
196     public synchronized CompletableFuture<OperationOutcome> requestLock(String targetEntity) {
197
198         long remainingMs = endTimeMs - System.currentTimeMillis();
199         int remainingSec = 15 + Math.max(0, (int) TimeUnit.SECONDS.convert(remainingMs, TimeUnit.MILLISECONDS));
200
201         LockData data = target2lock.computeIfAbsent(targetEntity, key -> {
202             LockData data2 = new LockData(key, requestId);
203             makeLock(targetEntity, requestId.toString(), remainingSec, data2);
204
205             data2.addUnavailableCallback(this::onComplete);
206
207             return data2;
208         });
209
210         return data.getFuture();
211     }
212
213     @Override
214     public synchronized CompletableFuture<OperationOutcome> releaseLock(String targetEntity) {
215         LockData data = target2lock.remove(targetEntity);
216
217         if (data == null) {
218             // lock did not exist - immediately return a success
219             OperationOutcome outcome = makeUnlockOutcome(targetEntity);
220             outcome.setEnd(outcome.getStart());
221             onComplete(outcome);
222
223             return CompletableFuture.completedFuture(outcome);
224         }
225
226         /*
227          * previous lock operation may not have completed yet, thus we tack the unlock
228          * operation onto it.
229          *
230          * Note: we must invoke free(), asynchronously (i.e., using whenCompleteAsync()),
231          * as it may block
232          */
233
234         return data.getFuture().whenCompleteAsync((lockOutcome, thrown) -> {
235
236             OperationOutcome outcome = makeUnlockOutcome(targetEntity);
237
238             try {
239                 data.free();
240
241             } catch (RuntimeException e) {
242                 logger.warn("failed to unlock {}", targetEntity, e);
243                 outcome.setResult(OperationResult.FAILURE_EXCEPTION);
244                 outcome.setMessage(ControlLoopOperation.FAILED_MSG + ": " + e.getMessage());
245             }
246
247             outcome.setEnd(Instant.now());
248             onComplete(outcome);
249
250         }, getBlockingExecutor());
251     }
252
253     private OperationOutcome makeUnlockOutcome(String targetEntity) {
254         OperationOutcome outcome = new OperationOutcome();
255         outcome.setActor(ActorConstants.LOCK_ACTOR);
256         outcome.setOperation(ActorConstants.UNLOCK_OPERATION);
257         outcome.setTarget(targetEntity);
258         outcome.setResult(OperationResult.SUCCESS);
259         outcome.setMessage(ControlLoopOperation.SUCCESS_MSG);
260         outcome.setFinalOutcome(true);
261         outcome.setStart(Instant.now());
262         return outcome;
263     }
264
265     public void onStart(OperationOutcome outcome) {
266         outcomes.add(outcome);
267     }
268
269     public void onComplete(OperationOutcome outcome) {
270         outcomes.add(outcome);
271     }
272
273     /**
274      * Determines if the context contains a property.
275      *
276      * @param name name of the property of interest
277      * @return {@code true} if the context contains the property, {@code false} otherwise
278      */
279     @Override
280     public boolean contains(String name) {
281         return properties.containsKey(name);
282     }
283
284     /**
285      * Gets a property, casting it to the desired type.
286      *
287      * @param <T> desired type
288      * @param name name of the property whose value is to be retrieved
289      * @return the property's value, or {@code null} if it does not yet have a value
290      */
291     @Override
292     @SuppressWarnings("unchecked")
293     public <T> T getProperty(String name) {
294         return (T) properties.get(name);
295     }
296
297     /**
298      * Sets a property's value.
299      *
300      * @param name property name
301      * @param value new property value
302      */
303     @Override
304     public void setProperty(String name, Serializable value) {
305         logger.info("set property {}={} manager={}", name, value, this);
306         properties.put(name, value);
307     }
308
309     /**
310      * Removes a property.
311      *
312      * @param name property name
313      */
314     @Override
315     public void removeProperty(String name) {
316         properties.remove(name);
317     }
318
319     /**
320      * Initializes various components, on demand.
321      */
322     private static class LazyInitData {
323         private static final OperationHistoryDataManager DATA_MANAGER;
324         private static final ActorService ACTOR_SERVICE;
325
326         static {
327             EventManagerServices services = new EventManagerServices(EVENT_MANAGER_SERVICE_CONFIG);
328             ACTOR_SERVICE = services.getActorService();
329             DATA_MANAGER = services.getDataManager();
330         }
331     }
332
333     // the following methods may be overridden by junit tests
334
335     public Executor getExecutor() {
336         return ForkJoinPool.commonPool();
337     }
338
339     protected ExecutorService getBlockingExecutor() {
340         return PolicyEngineConstants.getManager().getExecutorService();
341     }
342
343     protected void makeLock(String targetEntity, String requestId, int holdSec, LockCallback callback) {
344         PolicyEngineConstants.getManager().createLock(targetEntity, requestId, holdSec, callback, false);
345     }
346
347     public ActorService getActorService() {
348         return LazyInitData.ACTOR_SERVICE;
349     }
350
351     public OperationHistoryDataManager getDataManager() {
352         boolean guardDisabled = "true".equalsIgnoreCase(getEnvironmentProperty(GUARD_DISABLED_PROPERTY));
353         return (guardDisabled ? STUB_DATA_MANAGER : LazyInitData.DATA_MANAGER);
354     }
355
356     protected String getEnvironmentProperty(String propName) {
357         return PolicyEngineConstants.getManager().getEnvironmentProperty(propName);
358     }
359 }