2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2024 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.controlloop.eventmanager;
24 import java.io.Serializable;
25 import java.time.Instant;
26 import java.util.Deque;
27 import java.util.HashMap;
29 import java.util.UUID;
30 import java.util.concurrent.CompletableFuture;
31 import java.util.concurrent.ConcurrentHashMap;
32 import java.util.concurrent.ConcurrentLinkedDeque;
33 import java.util.concurrent.Executor;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.ForkJoinPool;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicLong;
38 import lombok.AccessLevel;
40 import lombok.ToString;
41 import org.onap.policy.controlloop.ControlLoopException;
42 import org.onap.policy.controlloop.ControlLoopOperation;
43 import org.onap.policy.controlloop.actorserviceprovider.ActorService;
44 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
45 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
46 import org.onap.policy.controlloop.drl.legacy.ControlLoopParams;
47 import org.onap.policy.controlloop.ophistory.OperationHistoryDataManager;
48 import org.onap.policy.controlloop.ophistory.OperationHistoryDataManagerStub;
49 import org.onap.policy.controlloop.processor.ControlLoopProcessor;
50 import org.onap.policy.drools.core.lock.LockCallback;
51 import org.onap.policy.drools.system.PolicyEngineConstants;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
56 * Manager for a single event. Once this has been created, the event can be retracted from
57 * working memory. Invoke {@link #isActive()} to determine if the manager is active (i.e.,
58 * hasn't been replicated from another server). When the manager is no longer needed,
59 * {@link #destroy()} should be invoked.
61 @ToString(onlyExplicitlyIncluded = true)
62 public class ControlLoopEventManager implements StepContext, Serializable {
64 private static final Logger logger = LoggerFactory.getLogger(ControlLoopEventManager.class);
65 private static final long serialVersionUID = -1216568161322872641L;
68 * Data manager used when the policy engine's guard.disabled property is "true".
70 private static final OperationHistoryDataManager STUB_DATA_MANAGER = new OperationHistoryDataManagerStub();
72 public static final String GUARD_DISABLED_PROPERTY = "guard.disabled";
75 * Counts the number of these objects that have been created. This is used by junit
78 private static final AtomicLong createCount = new AtomicLong(0);
81 * {@code True} if this object was created by this JVM instance, {@code false}
82 * otherwise. This will be {@code false} if this object is reconstituted from a
83 * persistent store or by transfer from another server.
85 private final transient boolean createdByThisJvmInstance;
87 private final transient EventManagerServices services;
91 public final String closedLoopControlName;
94 private final UUID requestId;
97 * Time, in milliseconds, when the control loop will time out.
100 private final long endTimeMs;
102 // fields extracted from the ControlLoopParams
104 private final String policyName;
106 private final String policyVersion;
109 * Maps a target entity to its lock.
111 private final transient Map<String, LockData> target2lock = new HashMap<>();
113 @Getter(AccessLevel.PROTECTED)
114 private final ControlLoopProcessor processor;
117 * Set of properties used while processing the event.
119 private final Map<String, Serializable> properties = new ConcurrentHashMap<>();
122 * Unprocessed outcomes from the operations. Outcomes are added to this each time the
123 * "start" or "complete" callback is invoked, typically by an operation running in a
124 * background thread, thus it must be thread safe.
127 private final transient Deque<OperationOutcome> outcomes = new ConcurrentLinkedDeque<>();
131 * Constructs the object.
133 * @param services services the manager should use when processing the event
134 * @param params control loop parameters
135 * @param requestId event request ID
136 * @throws ControlLoopException if the event is invalid or if a YAML processor cannot
139 public ControlLoopEventManager(EventManagerServices services, ControlLoopParams params, UUID requestId)
140 throws ControlLoopException {
142 createCount.incrementAndGet();
144 this.createdByThisJvmInstance = true;
145 this.services = services;
146 this.closedLoopControlName = params.getClosedLoopControlName();
147 this.requestId = requestId;
148 this.policyName = params.getPolicyName();
149 this.policyVersion = params.getPolicyVersion();
150 this.processor = new ControlLoopProcessor(params.getToscaPolicy());
151 this.endTimeMs = System.currentTimeMillis() + detmControlLoopTimeoutMs();
155 * Gets the number of manager objects that have been created.
157 * @return the number of manager objects that have been created
159 public static long getCreateCount() {
160 return createCount.get();
164 * Determines if the manager is still active.
166 * @return {@code true} if the manager is still active, {@code false} otherwise
168 public boolean isActive() {
169 return createdByThisJvmInstance;
173 * Cancels the current operation and frees all locks.
175 public void destroy() {
177 getBlockingExecutor().execute(this::freeAllLocks);
184 private void freeAllLocks() {
185 target2lock.values().forEach(LockData::free);
189 * Determines the overall control loop timeout.
191 * @return the policy timeout, in milliseconds, if specified, a default timeout
194 private long detmControlLoopTimeoutMs() {
195 // validation checks preclude null or 0 timeout values in the policy
196 int timeout = processor.getPolicy().getProperties().getTimeout();
197 return TimeUnit.MILLISECONDS.convert(timeout, TimeUnit.SECONDS);
201 public synchronized CompletableFuture<OperationOutcome> requestLock(String targetEntity) {
203 long remainingMs = endTimeMs - System.currentTimeMillis();
204 int remainingSec = 15 + Math.max(0, (int) TimeUnit.SECONDS.convert(remainingMs, TimeUnit.MILLISECONDS));
206 LockData data = target2lock.computeIfAbsent(targetEntity, key -> {
207 var data2 = new LockData(key, requestId);
208 makeLock(targetEntity, requestId.toString(), remainingSec, data2);
210 data2.addUnavailableCallback(this::onComplete);
215 return data.getFuture();
219 public synchronized CompletableFuture<OperationOutcome> releaseLock(String targetEntity) {
220 LockData data = target2lock.remove(targetEntity);
223 // lock did not exist - immediately return a success
224 OperationOutcome outcome = makeUnlockOutcome(targetEntity);
225 outcome.setEnd(outcome.getStart());
228 return CompletableFuture.completedFuture(outcome);
232 * previous lock operation may not have completed yet, thus we tack the unlock
235 * Note: we must invoke free(), asynchronously (i.e., using whenCompleteAsync()),
239 return data.getFuture().whenCompleteAsync((lockOutcome, thrown) -> {
241 OperationOutcome outcome = makeUnlockOutcome(targetEntity);
246 } catch (RuntimeException e) {
247 logger.warn("failed to unlock {}", targetEntity, e);
248 outcome.setResult(OperationResult.FAILURE_EXCEPTION);
249 outcome.setMessage(ControlLoopOperation.FAILED_MSG + ": " + e.getMessage());
252 outcome.setEnd(Instant.now());
255 }, getBlockingExecutor());
258 private OperationOutcome makeUnlockOutcome(String targetEntity) {
259 var outcome = new OperationOutcome();
260 outcome.setActor(ActorConstants.LOCK_ACTOR);
261 outcome.setOperation(ActorConstants.UNLOCK_OPERATION);
262 outcome.setTarget(targetEntity);
263 outcome.setResult(OperationResult.SUCCESS);
264 outcome.setMessage(ControlLoopOperation.SUCCESS_MSG);
265 outcome.setFinalOutcome(true);
266 outcome.setStart(Instant.now());
270 public void onStart(OperationOutcome outcome) {
271 outcomes.add(outcome);
274 public void onComplete(OperationOutcome outcome) {
275 outcomes.add(outcome);
279 * Determines if the context contains a property.
281 * @param name name of the property of interest
282 * @return {@code true} if the context contains the property, {@code false} otherwise
285 public boolean contains(String name) {
286 return properties.containsKey(name);
290 * Gets a property, casting it to the desired type.
292 * @param <T> desired type
293 * @param name name of the property whose value is to be retrieved
294 * @return the property's value, or {@code null} if it does not yet have a value
297 @SuppressWarnings("unchecked")
298 public <T> T getProperty(String name) {
299 return (T) properties.get(name);
303 * Sets a property's value.
305 * @param name property name
306 * @param value new property value
309 public void setProperty(String name, Serializable value) {
310 logger.info("set property {}={} manager={}", name, value, this);
311 properties.put(name, value);
315 * Removes a property.
317 * @param name property name
320 public void removeProperty(String name) {
321 properties.remove(name);
324 // the following methods may be overridden by junit tests
326 public Executor getExecutor() {
327 return ForkJoinPool.commonPool();
330 protected ExecutorService getBlockingExecutor() {
331 return PolicyEngineConstants.getManager().getExecutorService();
334 protected void makeLock(String targetEntity, String requestId, int holdSec, LockCallback callback) {
335 PolicyEngineConstants.getManager().createLock(targetEntity, requestId, holdSec, callback, false);
338 public ActorService getActorService() {
339 return services.getActorService();
342 public OperationHistoryDataManager getDataManager() {
343 boolean guardDisabled = "true".equalsIgnoreCase(getEnvironmentProperty(GUARD_DISABLED_PROPERTY));
344 return (guardDisabled ? STUB_DATA_MANAGER : services.getDataManager());
347 protected String getEnvironmentProperty(String propName) {
348 return PolicyEngineConstants.getManager().getEnvironmentProperty(propName);