12128554f18acae40686f0a8b4db8f00e41bc5da
[policy/drools-applications.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.eventmanager;
22
23 import java.io.Serializable;
24 import java.util.Deque;
25 import java.util.HashMap;
26 import java.util.Map;
27 import java.util.UUID;
28 import java.util.concurrent.CompletableFuture;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ConcurrentLinkedDeque;
31 import java.util.concurrent.Executor;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.ForkJoinPool;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicLong;
36 import lombok.AccessLevel;
37 import lombok.Getter;
38 import lombok.ToString;
39 import org.onap.policy.controlloop.ControlLoopException;
40 import org.onap.policy.controlloop.actorserviceprovider.ActorService;
41 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
42 import org.onap.policy.controlloop.drl.legacy.ControlLoopParams;
43 import org.onap.policy.controlloop.ophistory.OperationHistoryDataManager;
44 import org.onap.policy.controlloop.processor.ControlLoopProcessor;
45 import org.onap.policy.drools.core.lock.LockCallback;
46 import org.onap.policy.drools.system.PolicyEngineConstants;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 /**
51  * Manager for a single event. Once this has been created, the event can be retracted from
52  * working memory. Invoke {@link #isActive()} to determine if the manager is active (i.e.,
53  * hasn't been replicated from another server). When the manager is no longer needed,
54  * {@link #destroy()} should be invoked.
55  */
56 @ToString(onlyExplicitlyIncluded = true)
57 public class ControlLoopEventManager implements StepContext, Serializable {
58
59     private static final Logger logger = LoggerFactory.getLogger(ControlLoopEventManager.class);
60     private static final long serialVersionUID = -1216568161322872641L;
61
62     private static final String EVENT_MANAGER_SERVICE_CONFIG = "event-manager";
63
64     /**
65      * Counts the number of these objects that have been created. This is used by junit
66      * tests.
67      */
68     private static final AtomicLong createCount = new AtomicLong(0);
69
70     /**
71      * {@code True} if this object was created by this JVM instance, {@code false}
72      * otherwise. This will be {@code false} if this object is reconstituted from a
73      * persistent store or by transfer from another server.
74      */
75     private transient boolean createdByThisJvmInstance;
76
77     @Getter
78     @ToString.Include
79     public final String closedLoopControlName;
80     @Getter
81     @ToString.Include
82     private final UUID requestId;
83
84     /**
85      * Time, in milliseconds, when the control loop will time out.
86      */
87     @Getter
88     private final long endTimeMs;
89
90     // fields extracted from the ControlLoopParams
91     @Getter
92     private final String policyName;
93     @Getter
94     private final String policyVersion;
95
96     /**
97      * Maps a target entity to its lock.
98      */
99     private final transient Map<String, LockData> target2lock = new HashMap<>();
100
101     @Getter(AccessLevel.PROTECTED)
102     private final ControlLoopProcessor processor;
103
104     /**
105      * Set of properties used while processing the event.
106      */
107     private Map<String, Serializable> properties = new ConcurrentHashMap<>();
108
109     /**
110      * Unprocessed outcomes from the operations. Outcomes are added to this each time the
111      * "start" or "complete" callback is invoked, typically by an operation running in a
112      * background thread, thus it must be thread safe.
113      */
114     @Getter
115     private final transient Deque<OperationOutcome> outcomes = new ConcurrentLinkedDeque<>();
116
117
118     /**
119      * Constructs the object.
120      *
121      * @param params control loop parameters
122      * @param requestId event request ID
123      * @throws ControlLoopException if the event is invalid or if a YAML processor cannot
124      *         be created
125      */
126     public ControlLoopEventManager(ControlLoopParams params, UUID requestId) throws ControlLoopException {
127
128         createCount.incrementAndGet();
129
130         this.createdByThisJvmInstance = true;
131         this.closedLoopControlName = params.getClosedLoopControlName();
132         this.requestId = requestId;
133         this.policyName = params.getPolicyName();
134         this.policyVersion = params.getPolicyVersion();
135         this.processor = new ControlLoopProcessor(params.getToscaPolicy());
136         this.endTimeMs = System.currentTimeMillis() + detmControlLoopTimeoutMs();
137     }
138
139     /**
140      * Gets the number of manager objects that have been created.
141      *
142      * @return the number of manager objects that have been created
143      */
144     public static long getCreateCount() {
145         return createCount.get();
146     }
147
148     /**
149      * Determines if the manager is still active.
150      *
151      * @return {@code true} if the manager is still active, {@code false} otherwise
152      */
153     public boolean isActive() {
154         return createdByThisJvmInstance;
155     }
156
157     /**
158      * Cancels the current operation and frees all locks.
159      */
160     public void destroy() {
161         if (isActive()) {
162             getBlockingExecutor().execute(this::freeAllLocks);
163         }
164     }
165
166     /**
167      * Frees all locks.
168      */
169     private void freeAllLocks() {
170         target2lock.values().forEach(LockData::free);
171     }
172
173     /**
174      * Determines the overall control loop timeout.
175      *
176      * @return the policy timeout, in milliseconds, if specified, a default timeout
177      *         otherwise
178      */
179     private long detmControlLoopTimeoutMs() {
180         // validation checks preclude null or 0 timeout values in the policy
181         Integer timeout = processor.getPolicy().getProperties().getTimeout();
182         return TimeUnit.MILLISECONDS.convert(timeout, TimeUnit.SECONDS);
183     }
184
185     /**
186      * Requests a lock. This requests the lock for the time that remains before the
187      * timeout expires. This avoids having to extend the lock.
188      *
189      * @param targetEntity entity to be locked
190      * @return a future that can be used to await the lock
191      */
192     @Override
193     public synchronized CompletableFuture<OperationOutcome> requestLock(String targetEntity) {
194
195         long remainingMs = endTimeMs - System.currentTimeMillis();
196         int remainingSec = 15 + Math.max(0, (int) TimeUnit.SECONDS.convert(remainingMs, TimeUnit.MILLISECONDS));
197
198         LockData data = target2lock.computeIfAbsent(targetEntity, key -> {
199             LockData data2 = new LockData(key, requestId);
200             makeLock(targetEntity, requestId.toString(), remainingSec, data2);
201
202             data2.addUnavailableCallback(this::onComplete);
203
204             return data2;
205         });
206
207         return data.getFuture();
208     }
209
210     public void onStart(OperationOutcome outcome) {
211         outcomes.add(outcome);
212     }
213
214     public void onComplete(OperationOutcome outcome) {
215         outcomes.add(outcome);
216     }
217
218     /**
219      * Determines if the context contains a property.
220      *
221      * @param name name of the property of interest
222      * @return {@code true} if the context contains the property, {@code false} otherwise
223      */
224     @Override
225     public boolean contains(String name) {
226         return properties.containsKey(name);
227     }
228
229     /**
230      * Gets a property, casting it to the desired type.
231      *
232      * @param <T> desired type
233      * @param name name of the property whose value is to be retrieved
234      * @return the property's value, or {@code null} if it does not yet have a value
235      */
236     @Override
237     @SuppressWarnings("unchecked")
238     public <T> T getProperty(String name) {
239         return (T) properties.get(name);
240     }
241
242     /**
243      * Sets a property's value.
244      *
245      * @param name property name
246      * @param value new property value
247      */
248     @Override
249     public void setProperty(String name, Serializable value) {
250         logger.info("set property {}={} manager={}", name, value, this);
251         properties.put(name, value);
252     }
253
254     /**
255      * Removes a property.
256      *
257      * @param name property name
258      */
259     @Override
260     public void removeProperty(String name) {
261         properties.remove(name);
262     }
263
264     /**
265      * Initializes various components, on demand.
266      */
267     private static class LazyInitData {
268         private static final OperationHistoryDataManager DATA_MANAGER;
269         private static final ActorService ACTOR_SERVICE;
270
271         static {
272             // TODO how to dynamically change data manager, depending whether or not
273             // guards are enabled?
274             EventManagerServices services = new EventManagerServices(EVENT_MANAGER_SERVICE_CONFIG);
275             ACTOR_SERVICE = services.getActorService();
276             DATA_MANAGER = services.getDataManager();
277         }
278     }
279
280     // the following methods may be overridden by junit tests
281
282     public Executor getExecutor() {
283         return ForkJoinPool.commonPool();
284     }
285
286     protected ExecutorService getBlockingExecutor() {
287         return PolicyEngineConstants.getManager().getExecutorService();
288     }
289
290     protected void makeLock(String targetEntity, String requestId, int holdSec, LockCallback callback) {
291         PolicyEngineConstants.getManager().createLock(targetEntity, requestId, holdSec, callback, false);
292     }
293
294     public ActorService getActorService() {
295         return LazyInitData.ACTOR_SERVICE;
296     }
297
298     public OperationHistoryDataManager getDataManager() {
299         return LazyInitData.DATA_MANAGER;
300     }
301 }