2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.eventmanager;
23 import java.io.Serializable;
24 import java.util.Deque;
25 import java.util.HashMap;
27 import java.util.UUID;
28 import java.util.concurrent.CompletableFuture;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ConcurrentLinkedDeque;
31 import java.util.concurrent.Executor;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.ForkJoinPool;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicLong;
36 import lombok.AccessLevel;
38 import lombok.ToString;
39 import org.onap.policy.controlloop.ControlLoopException;
40 import org.onap.policy.controlloop.actorserviceprovider.ActorService;
41 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
42 import org.onap.policy.controlloop.drl.legacy.ControlLoopParams;
43 import org.onap.policy.controlloop.ophistory.OperationHistoryDataManager;
44 import org.onap.policy.controlloop.processor.ControlLoopProcessor;
45 import org.onap.policy.drools.core.lock.LockCallback;
46 import org.onap.policy.drools.system.PolicyEngineConstants;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
51 * Manager for a single event. Once this has been created, the event can be retracted from
52 * working memory. Invoke {@link #isActive()} to determine if the manager is active (i.e.,
53 * hasn't been replicated from another server). When the manager is no longer needed,
54 * {@link #destroy()} should be invoked.
56 @ToString(onlyExplicitlyIncluded = true)
57 public class ControlLoopEventManager implements StepContext, Serializable {
59 private static final Logger logger = LoggerFactory.getLogger(ControlLoopEventManager.class);
60 private static final long serialVersionUID = -1216568161322872641L;
62 private static final String EVENT_MANAGER_SERVICE_CONFIG = "event-manager";
65 * Counts the number of these objects that have been created. This is used by junit
68 private static final AtomicLong createCount = new AtomicLong(0);
71 * {@code True} if this object was created by this JVM instance, {@code false}
72 * otherwise. This will be {@code false} if this object is reconstituted from a
73 * persistent store or by transfer from another server.
75 private transient boolean createdByThisJvmInstance;
79 public final String closedLoopControlName;
82 private final UUID requestId;
85 * Time, in milliseconds, when the control loop will time out.
88 private final long endTimeMs;
90 // fields extracted from the ControlLoopParams
92 private final String policyName;
94 private final String policyVersion;
97 * Maps a target entity to its lock.
99 private final transient Map<String, LockData> target2lock = new HashMap<>();
101 @Getter(AccessLevel.PROTECTED)
102 private final ControlLoopProcessor processor;
105 * Set of properties used while processing the event.
107 private Map<String, Serializable> properties = new ConcurrentHashMap<>();
110 * Unprocessed outcomes from the operations. Outcomes are added to this each time the
111 * "start" or "complete" callback is invoked, typically by an operation running in a
112 * background thread, thus it must be thread safe.
115 private final transient Deque<OperationOutcome> outcomes = new ConcurrentLinkedDeque<>();
119 * Constructs the object.
121 * @param params control loop parameters
122 * @param requestId event request ID
123 * @throws ControlLoopException if the event is invalid or if a YAML processor cannot
126 public ControlLoopEventManager(ControlLoopParams params, UUID requestId) throws ControlLoopException {
128 createCount.incrementAndGet();
130 this.createdByThisJvmInstance = true;
131 this.closedLoopControlName = params.getClosedLoopControlName();
132 this.requestId = requestId;
133 this.policyName = params.getPolicyName();
134 this.policyVersion = params.getPolicyVersion();
135 this.processor = new ControlLoopProcessor(params.getToscaPolicy());
136 this.endTimeMs = System.currentTimeMillis() + detmControlLoopTimeoutMs();
140 * Gets the number of manager objects that have been created.
142 * @return the number of manager objects that have been created
144 public static long getCreateCount() {
145 return createCount.get();
149 * Determines if the manager is still active.
151 * @return {@code true} if the manager is still active, {@code false} otherwise
153 public boolean isActive() {
154 return createdByThisJvmInstance;
158 * Cancels the current operation and frees all locks.
160 public void destroy() {
162 getBlockingExecutor().execute(this::freeAllLocks);
169 private void freeAllLocks() {
170 target2lock.values().forEach(LockData::free);
174 * Determines the overall control loop timeout.
176 * @return the policy timeout, in milliseconds, if specified, a default timeout
179 private long detmControlLoopTimeoutMs() {
180 // validation checks preclude null or 0 timeout values in the policy
181 Integer timeout = processor.getPolicy().getProperties().getTimeout();
182 return TimeUnit.MILLISECONDS.convert(timeout, TimeUnit.SECONDS);
186 * Requests a lock. This requests the lock for the time that remains before the
187 * timeout expires. This avoids having to extend the lock.
189 * @param targetEntity entity to be locked
190 * @return a future that can be used to await the lock
193 public synchronized CompletableFuture<OperationOutcome> requestLock(String targetEntity) {
195 long remainingMs = endTimeMs - System.currentTimeMillis();
196 int remainingSec = 15 + Math.max(0, (int) TimeUnit.SECONDS.convert(remainingMs, TimeUnit.MILLISECONDS));
198 LockData data = target2lock.computeIfAbsent(targetEntity, key -> {
199 LockData data2 = new LockData(key, requestId);
200 makeLock(targetEntity, requestId.toString(), remainingSec, data2);
202 data2.addUnavailableCallback(this::onComplete);
207 return data.getFuture();
210 public void onStart(OperationOutcome outcome) {
211 outcomes.add(outcome);
214 public void onComplete(OperationOutcome outcome) {
215 outcomes.add(outcome);
219 * Determines if the context contains a property.
221 * @param name name of the property of interest
222 * @return {@code true} if the context contains the property, {@code false} otherwise
225 public boolean contains(String name) {
226 return properties.containsKey(name);
230 * Gets a property, casting it to the desired type.
232 * @param <T> desired type
233 * @param name name of the property whose value is to be retrieved
234 * @return the property's value, or {@code null} if it does not yet have a value
237 @SuppressWarnings("unchecked")
238 public <T> T getProperty(String name) {
239 return (T) properties.get(name);
243 * Sets a property's value.
245 * @param name property name
246 * @param value new property value
249 public void setProperty(String name, Serializable value) {
250 logger.info("set property {}={} manager={}", name, value, this);
251 properties.put(name, value);
255 * Removes a property.
257 * @param name property name
260 public void removeProperty(String name) {
261 properties.remove(name);
265 * Initializes various components, on demand.
267 private static class LazyInitData {
268 private static final OperationHistoryDataManager DATA_MANAGER;
269 private static final ActorService ACTOR_SERVICE;
272 // TODO how to dynamically change data manager, depending whether or not
273 // guards are enabled?
274 EventManagerServices services = new EventManagerServices(EVENT_MANAGER_SERVICE_CONFIG);
275 ACTOR_SERVICE = services.getActorService();
276 DATA_MANAGER = services.getDataManager();
280 // the following methods may be overridden by junit tests
282 public Executor getExecutor() {
283 return ForkJoinPool.commonPool();
286 protected ExecutorService getBlockingExecutor() {
287 return PolicyEngineConstants.getManager().getExecutorService();
290 protected void makeLock(String targetEntity, String requestId, int holdSec, LockCallback callback) {
291 PolicyEngineConstants.getManager().createLock(targetEntity, requestId, holdSec, callback, false);
294 public ActorService getActorService() {
295 return LazyInitData.ACTOR_SERVICE;
298 public OperationHistoryDataManager getDataManager() {
299 return LazyInitData.DATA_MANAGER;