2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2023 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.controlloop.eventmanager;
24 import java.io.Serial;
25 import java.util.ArrayDeque;
26 import java.util.Deque;
27 import java.util.LinkedHashMap;
29 import java.util.UUID;
30 import lombok.AccessLevel;
32 import lombok.NonNull;
34 import lombok.ToString;
35 import org.drools.core.WorkingMemory;
36 import org.kie.api.runtime.rule.FactHandle;
37 import org.onap.policy.controlloop.ControlLoopException;
38 import org.onap.policy.controlloop.actorserviceprovider.OperationFinalResult;
39 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
40 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
41 import org.onap.policy.controlloop.actorserviceprovider.TargetType;
42 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
43 import org.onap.policy.controlloop.drl.legacy.ControlLoopParams;
44 import org.onap.policy.drools.domain.models.operational.Operation;
45 import org.onap.policy.drools.domain.models.operational.OperationalTarget;
46 import org.onap.policy.drools.system.PolicyEngine;
47 import org.onap.policy.drools.system.PolicyEngineConstants;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
52 * Manager for a single control loop event. Processing progresses through each policy,
53 * which involves at least one step. As a step is processed, additional preprocessor steps
54 * may be pushed onto the queue (e.g., locks, A&AI queries, guards).
56 @ToString(onlyExplicitlyIncluded = true)
57 public abstract class ClEventManagerWithSteps<T extends Step> extends ControlLoopEventManager implements StepContext {
59 private static final Logger logger = LoggerFactory.getLogger(ClEventManagerWithSteps.class);
61 private static final long serialVersionUID = -1216568161322872641L;
64 * Maximum number of steps, for a single policy, allowed in the queue at a time. This
65 * prevents an infinite loop occurring with calls to {@link #loadPreprocessorSteps()}.
67 public static final int MAX_STEPS = 30;
70 LOAD_POLICY, POLICY_LOADED, AWAITING_OUTCOME, DONE
74 * Request ID, as a String.
77 private final String requestIdStr;
84 * {@code True} if the event has been accepted (i.e., an "ACTIVE" notification has
85 * been delivered), {@code false} otherwise.
89 private boolean accepted;
92 * Queue of steps waiting to be performed.
95 private final transient Deque<T> steps = new ArrayDeque<>(6);
98 * Policy currently being processed.
100 @Getter(AccessLevel.PROTECTED)
101 private Operation policy;
104 * Result of the last policy operation. This is just a place where the rules can store
105 * the value for passing to {@link #loadNextPolicy(OperationResult)}.
109 private OperationResult result = OperationResult.SUCCESS;
113 private int numOnsets = 1;
116 private int numAbatements = 0;
119 private OperationFinalResult finalResult = null;
122 * Message to be placed into the final notification. Typically used when something
123 * causes processing to abort.
126 private String finalMessage = null;
128 private final transient WorkingMemory workMem;
129 private transient FactHandle factHandle;
133 * Constructs the object.
135 * @param services services the manager should use when processing the event
136 * @param params control loop parameters
137 * @param requestId event request ID
138 * @param workMem working memory to update if this changes
139 * @throws ControlLoopException if the event is invalid or if a YAML processor cannot
142 protected ClEventManagerWithSteps(EventManagerServices services, ControlLoopParams params, UUID requestId,
143 WorkingMemory workMem) throws ControlLoopException {
145 super(services, params, requestId);
147 if (requestId == null) {
148 throw new ControlLoopException("No request ID");
151 this.workMem = workMem;
152 this.requestIdStr = getRequestId().toString();
156 public void destroy() {
157 for (T step : getSteps()) {
165 * Starts the manager and loads the first policy.
167 * @throws ControlLoopException if the processor cannot get a policy
169 public void start() throws ControlLoopException {
171 throw new IllegalStateException("manager is no longer active");
174 if ((factHandle = workMem.getFactHandle(this)) == null) {
175 throw new IllegalStateException("manager is not in working memory");
178 if (!getSteps().isEmpty()) {
179 throw new IllegalStateException("manager already started");
186 * Indicates that processing has been aborted.
188 * @param finalState final state
189 * @param finalResult final result
190 * @param finalMessage final message
192 public void abort(@NonNull State finalState, OperationFinalResult finalResult, String finalMessage) {
193 this.state = finalState;
194 this.finalResult = finalResult;
195 this.finalMessage = finalMessage;
199 * Loads the next policy.
201 * @param lastResult result from the last policy
203 * @throws ControlLoopException if the processor cannot get a policy
205 public void loadNextPolicy(@NonNull OperationResult lastResult) throws ControlLoopException {
206 getProcessor().nextPolicyForResult(lastResult);
211 * Loads the current policy.
213 * @throws ControlLoopException if the processor cannot get a policy
215 protected void loadPolicy() throws ControlLoopException {
216 if ((finalResult = getProcessor().checkIsCurrentPolicyFinal()) != null) {
217 // final policy - nothing more to do
221 policy = getProcessor().getCurrentPolicy();
223 var actor = policy.getActorOperation();
225 OperationalTarget target = actor.getTarget();
226 String targetType = (target != null ? target.getTargetType() : null);
227 Map<String, String> entityIds = (target != null ? target.getEntityIds() : null);
229 // convert policy payload from Map<String,String> to Map<String,Object>
230 Map<String, Object> payload = new LinkedHashMap<>();
231 if (actor.getPayload() != null) {
232 payload.putAll(actor.getPayload());
236 ControlLoopOperationParams params = ControlLoopOperationParams.builder()
237 .actorService(getActorService())
238 .actor(actor.getActor())
239 .operation(actor.getOperation())
240 .requestId(getRequestId())
241 .executor(getExecutor())
242 .retry(policy.getRetries())
243 .timeoutSec(policy.getTimeout())
244 .targetType(TargetType.toTargetType(targetType))
245 .targetEntityIds(entityIds)
247 .startCallback(this::onStart)
248 .completeCallback(this::onComplete)
252 // load the policy's operation
253 loadPolicyStep(params);
257 * Makes the step associated with the given parameters.
259 * @param params operation's parameters
261 protected abstract void loadPolicyStep(ControlLoopOperationParams params);
264 * Loads the preprocessor steps needed by the step that's at the front of the queue.
266 public void loadPreprocessorSteps() {
267 if (getSteps().size() >= MAX_STEPS) {
268 throw new IllegalStateException("too many steps");
271 // initialize the step so we can query its properties
272 assert getSteps().peek() != null;
273 getSteps().peek().init();
277 * Executes the first step in the queue.
279 * @return {@code true} if the step was started, {@code false} if it is no longer
280 * needed (or if the queue is empty)
282 public boolean executeStep() {
283 T step = getSteps().peek();
288 return step.start(getEndTimeMs() - System.currentTimeMillis());
292 * Discards the current step, if any.
294 public void nextStep() {
299 * Delivers a notification to a topic.
301 * @param sinkName name of the topic sink
302 * @param notification notification to be published, or {@code null} if nothing is to
304 * @param notificationType type of notification, used when logging error messages
305 * @param ruleName name of the rule doing the publishing
307 public <N> void deliver(String sinkName, N notification, String notificationType, String ruleName) {
309 if (notification != null) {
310 getPolicyEngineManager().deliver(sinkName, notification);
313 } catch (RuntimeException e) {
314 logger.warn("{}: {}.{}: manager={} exception publishing {}", getClosedLoopControlName(), getPolicyName(),
315 ruleName, this, notificationType, e);
319 protected int bumpOffsets() {
323 protected int bumpAbatements() {
324 return numAbatements++;
328 public void onStart(OperationOutcome outcome) {
329 super.onStart(outcome);
330 workMem.update(factHandle, this);
334 public void onComplete(OperationOutcome outcome) {
335 super.onComplete(outcome);
336 workMem.update(factHandle, this);
339 // these following methods may be overridden by junit tests
341 protected PolicyEngine getPolicyEngineManager() {
342 return PolicyEngineConstants.getManager();