2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2024 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.controlloop.eventmanager;
24 import java.io.Serial;
25 import java.io.Serializable;
26 import java.time.Instant;
27 import java.util.Deque;
28 import java.util.HashMap;
30 import java.util.UUID;
31 import java.util.concurrent.CompletableFuture;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.ConcurrentLinkedDeque;
34 import java.util.concurrent.Executor;
35 import java.util.concurrent.ExecutorService;
36 import java.util.concurrent.ForkJoinPool;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicLong;
39 import lombok.AccessLevel;
41 import lombok.ToString;
42 import org.onap.policy.controlloop.ControlLoopException;
43 import org.onap.policy.controlloop.ControlLoopOperation;
44 import org.onap.policy.controlloop.actorserviceprovider.ActorService;
45 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
46 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
47 import org.onap.policy.controlloop.drl.legacy.ControlLoopParams;
48 import org.onap.policy.controlloop.ophistory.OperationHistoryDataManager;
49 import org.onap.policy.controlloop.ophistory.OperationHistoryDataManagerStub;
50 import org.onap.policy.controlloop.processor.ControlLoopProcessor;
51 import org.onap.policy.drools.core.lock.LockCallback;
52 import org.onap.policy.drools.system.PolicyEngineConstants;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
57 * Manager for a single event. Once this has been created, the event can be retracted from
58 * working memory. Invoke {@link #isActive()} to determine if the manager is active (i.e.,
59 * hasn't been replicated from another server). When the manager is no longer needed,
60 * {@link #destroy()} should be invoked.
62 @ToString(onlyExplicitlyIncluded = true)
63 public class ControlLoopEventManager implements StepContext, Serializable {
65 private static final Logger logger = LoggerFactory.getLogger(ControlLoopEventManager.class);
67 private static final long serialVersionUID = -1216568161322872641L;
70 * Data manager used when the policy engine's guard.disabled property is "true".
72 private static final OperationHistoryDataManager STUB_DATA_MANAGER = new OperationHistoryDataManagerStub();
74 public static final String GUARD_DISABLED_PROPERTY = "guard.disabled";
77 * Counts the number of these objects that have been created. This is used by junit
80 private static final AtomicLong createCount = new AtomicLong(0);
83 * {@code True} if this object was created by this JVM instance, {@code false}
84 * otherwise. This will be {@code false} if this object is reconstituted from a
85 * persistent store or by transfer from another server.
87 private final transient boolean createdByThisJvmInstance;
89 private final transient EventManagerServices services;
93 public final String closedLoopControlName;
96 private final UUID requestId;
99 * Time, in milliseconds, when the control loop will time out.
102 private final long endTimeMs;
104 // fields extracted from the ControlLoopParams
106 private final String policyName;
108 private final String policyVersion;
111 * Maps a target entity to its lock.
113 private final transient Map<String, LockData> target2lock = new HashMap<>();
115 @Getter(AccessLevel.PROTECTED)
116 private final ControlLoopProcessor processor;
119 * Set of properties used while processing the event.
121 private final Map<String, Serializable> properties = new ConcurrentHashMap<>();
124 * Unprocessed outcomes from the operations. Outcomes are added to this each time the
125 * "start" or "complete" callback is invoked, typically by an operation running in a
126 * background thread, thus it must be thread safe.
129 private final transient Deque<OperationOutcome> outcomes = new ConcurrentLinkedDeque<>();
133 * Constructs the object.
135 * @param services services the manager should use when processing the event
136 * @param params control loop parameters
137 * @param requestId event request ID
138 * @throws ControlLoopException if the event is invalid or if a YAML processor cannot
141 public ControlLoopEventManager(EventManagerServices services, ControlLoopParams params, UUID requestId)
142 throws ControlLoopException {
144 createCount.incrementAndGet();
146 this.createdByThisJvmInstance = true;
147 this.services = services;
148 this.closedLoopControlName = params.getClosedLoopControlName();
149 this.requestId = requestId;
150 this.policyName = params.getPolicyName();
151 this.policyVersion = params.getPolicyVersion();
152 this.processor = new ControlLoopProcessor(params.getToscaPolicy());
153 this.endTimeMs = System.currentTimeMillis() + detmControlLoopTimeoutMs();
157 * Gets the number of manager objects that have been created.
159 * @return the number of manager objects that have been created
161 public static long getCreateCount() {
162 return createCount.get();
166 * Determines if the manager is still active.
168 * @return {@code true} if the manager is still active, {@code false} otherwise
170 public boolean isActive() {
171 return createdByThisJvmInstance;
175 * Cancels the current operation and frees all locks.
177 public void destroy() {
179 getBlockingExecutor().execute(this::freeAllLocks);
186 private void freeAllLocks() {
187 target2lock.values().forEach(LockData::free);
191 * Determines the overall control loop timeout.
193 * @return the policy timeout, in milliseconds, if specified, a default timeout
196 private long detmControlLoopTimeoutMs() {
197 // validation checks preclude null or 0 timeout values in the policy
198 int timeout = processor.getPolicy().getProperties().getTimeout();
199 return TimeUnit.MILLISECONDS.convert(timeout, TimeUnit.SECONDS);
203 public synchronized CompletableFuture<OperationOutcome> requestLock(String targetEntity) {
205 long remainingMs = endTimeMs - System.currentTimeMillis();
206 int remainingSec = 15 + Math.max(0, (int) TimeUnit.SECONDS.convert(remainingMs, TimeUnit.MILLISECONDS));
208 LockData data = target2lock.computeIfAbsent(targetEntity, key -> {
209 var data2 = new LockData(key, requestId);
210 makeLock(targetEntity, requestId.toString(), remainingSec, data2);
212 data2.addUnavailableCallback(this::onComplete);
217 return data.getFuture();
221 public synchronized CompletableFuture<OperationOutcome> releaseLock(String targetEntity) {
222 LockData data = target2lock.remove(targetEntity);
225 // lock did not exist - immediately return a success
226 OperationOutcome outcome = makeUnlockOutcome(targetEntity);
227 outcome.setEnd(outcome.getStart());
230 return CompletableFuture.completedFuture(outcome);
234 * previous lock operation may not have completed yet, thus we tack the unlock
237 * Note: we must invoke free(), asynchronously (i.e., using whenCompleteAsync()),
241 return data.getFuture().whenCompleteAsync((lockOutcome, thrown) -> {
243 OperationOutcome outcome = makeUnlockOutcome(targetEntity);
248 } catch (RuntimeException e) {
249 logger.warn("failed to unlock {}", targetEntity, e);
250 outcome.setResult(OperationResult.FAILURE_EXCEPTION);
251 outcome.setMessage(ControlLoopOperation.FAILED_MSG + ": " + e.getMessage());
254 outcome.setEnd(Instant.now());
257 }, getBlockingExecutor());
260 private OperationOutcome makeUnlockOutcome(String targetEntity) {
261 var outcome = new OperationOutcome();
262 outcome.setActor(ActorConstants.LOCK_ACTOR);
263 outcome.setOperation(ActorConstants.UNLOCK_OPERATION);
264 outcome.setTarget(targetEntity);
265 outcome.setResult(OperationResult.SUCCESS);
266 outcome.setMessage(ControlLoopOperation.SUCCESS_MSG);
267 outcome.setFinalOutcome(true);
268 outcome.setStart(Instant.now());
272 public void onStart(OperationOutcome outcome) {
273 outcomes.add(outcome);
276 public void onComplete(OperationOutcome outcome) {
277 outcomes.add(outcome);
281 * Determines if the context contains a property.
283 * @param name name of the property of interest
284 * @return {@code true} if the context contains the property, {@code false} otherwise
287 public boolean contains(String name) {
288 return properties.containsKey(name);
292 * Gets a property, casting it to the desired type.
294 * @param <T> desired type
295 * @param name name of the property whose value is to be retrieved
296 * @return the property's value, or {@code null} if it does not yet have a value
299 @SuppressWarnings("unchecked")
300 public <T> T getProperty(String name) {
301 return (T) properties.get(name);
305 * Sets a property's value.
307 * @param name property name
308 * @param value new property value
311 public void setProperty(String name, Serializable value) {
312 logger.info("set property {}={} manager={}", name, value, this);
313 properties.put(name, value);
317 * Removes a property.
319 * @param name property name
322 public void removeProperty(String name) {
323 properties.remove(name);
326 // the following methods may be overridden by junit tests
328 public Executor getExecutor() {
329 return ForkJoinPool.commonPool();
332 protected ExecutorService getBlockingExecutor() {
333 return PolicyEngineConstants.getManager().getExecutorService();
336 protected void makeLock(String targetEntity, String requestId, int holdSec, LockCallback callback) {
337 PolicyEngineConstants.getManager().createLock(targetEntity, requestId, holdSec, callback, false);
340 public ActorService getActorService() {
341 return services.getActorService();
344 public OperationHistoryDataManager getDataManager() {
345 boolean guardDisabled = "true".equalsIgnoreCase(getEnvironmentProperty(GUARD_DISABLED_PROPERTY));
346 return (guardDisabled ? STUB_DATA_MANAGER : services.getDataManager());
349 protected String getEnvironmentProperty(String propName) {
350 return PolicyEngineConstants.getManager().getEnvironmentProperty(propName);