511d93b9d5cea063c0258942592b0c5351a67f3b
[policy/drools-applications.git] / controlloop / common / eventmanager / src / main / java / org / onap / policy / controlloop / eventmanager / ClEventManagerWithSteps.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2023 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.controlloop.eventmanager;
23
24 import java.io.Serial;
25 import java.util.ArrayDeque;
26 import java.util.Deque;
27 import java.util.LinkedHashMap;
28 import java.util.Map;
29 import java.util.UUID;
30 import lombok.AccessLevel;
31 import lombok.Getter;
32 import lombok.NonNull;
33 import lombok.Setter;
34 import lombok.ToString;
35 import org.drools.core.WorkingMemory;
36 import org.kie.api.runtime.rule.FactHandle;
37 import org.onap.policy.controlloop.ControlLoopException;
38 import org.onap.policy.controlloop.actorserviceprovider.OperationFinalResult;
39 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
40 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
41 import org.onap.policy.controlloop.actorserviceprovider.TargetType;
42 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
43 import org.onap.policy.controlloop.drl.legacy.ControlLoopParams;
44 import org.onap.policy.drools.domain.models.operational.Operation;
45 import org.onap.policy.drools.domain.models.operational.OperationalTarget;
46 import org.onap.policy.drools.system.PolicyEngine;
47 import org.onap.policy.drools.system.PolicyEngineConstants;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 /**
52  * Manager for a single control loop event. Processing progresses through each policy,
53  * which involves at least one step. As a step is processed, additional preprocessor steps
54  * may be pushed onto the queue (e.g., locks, A&AI queries, guards).
55  */
56 @ToString(onlyExplicitlyIncluded = true)
57 public abstract class ClEventManagerWithSteps<T extends Step> extends ControlLoopEventManager implements StepContext {
58
59     private static final Logger logger = LoggerFactory.getLogger(ClEventManagerWithSteps.class);
60     @Serial
61     private static final long serialVersionUID = -1216568161322872641L;
62
63     /**
64      * Maximum number of steps, for a single policy, allowed in the queue at a time. This
65      * prevents an infinite loop occurring with calls to {@link #loadPreprocessorSteps()}.
66      */
67     public static final int MAX_STEPS = 30;
68
69     public enum State {
70         LOAD_POLICY, POLICY_LOADED, AWAITING_OUTCOME, DONE
71     }
72
73     /**
74      * Request ID, as a String.
75      */
76     @Getter
77     private final String requestIdStr;
78
79     @Getter
80     @Setter
81     private State state;
82
83     /**
84      * {@code True} if the event has been accepted (i.e., an "ACTIVE" notification has
85      * been delivered), {@code false} otherwise.
86      */
87     @Getter
88     @Setter
89     private boolean accepted;
90
91     /**
92      * Queue of steps waiting to be performed.
93      */
94     @Getter
95     private final transient Deque<T> steps = new ArrayDeque<>(6);
96
97     /**
98      * Policy currently being processed.
99      */
100     @Getter(AccessLevel.PROTECTED)
101     private Operation policy;
102
103     /**
104      * Result of the last policy operation. This is just a place where the rules can store
105      * the value for passing to {@link #loadNextPolicy(OperationResult)}.
106      */
107     @Getter
108     @Setter
109     private OperationResult result = OperationResult.SUCCESS;
110
111     @Getter
112     @ToString.Include
113     private int numOnsets = 1;
114     @Getter
115     @ToString.Include
116     private int numAbatements = 0;
117
118     @Getter
119     private OperationFinalResult finalResult = null;
120
121     /**
122      * Message to be placed into the final notification. Typically used when something
123      * causes processing to abort.
124      */
125     @Getter
126     private String finalMessage = null;
127
128     private final transient WorkingMemory workMem;
129     private transient FactHandle factHandle;
130
131
132     /**
133      * Constructs the object.
134      *
135      * @param services services the manager should use when processing the event
136      * @param params control loop parameters
137      * @param requestId event request ID
138      * @param workMem working memory to update if this changes
139      * @throws ControlLoopException if the event is invalid or if a YAML processor cannot
140      *         be created
141      */
142     protected ClEventManagerWithSteps(EventManagerServices services, ControlLoopParams params, UUID requestId,
143                     WorkingMemory workMem) throws ControlLoopException {
144
145         super(services, params, requestId);
146
147         if (requestId == null) {
148             throw new ControlLoopException("No request ID");
149         }
150
151         this.workMem = workMem;
152         this.requestIdStr = getRequestId().toString();
153     }
154
155     @Override
156     public void destroy() {
157         for (T step : getSteps()) {
158             step.cancel();
159         }
160
161         super.destroy();
162     }
163
164     /**
165      * Starts the manager and loads the first policy.
166      *
167      * @throws ControlLoopException if the processor cannot get a policy
168      */
169     public void start() throws ControlLoopException {
170         if (!isActive()) {
171             throw new IllegalStateException("manager is no longer active");
172         }
173
174         if ((factHandle = workMem.getFactHandle(this)) == null) {
175             throw new IllegalStateException("manager is not in working memory");
176         }
177
178         if (!getSteps().isEmpty()) {
179             throw new IllegalStateException("manager already started");
180         }
181
182         loadPolicy();
183     }
184
185     /**
186      * Indicates that processing has been aborted.
187      *
188      * @param finalState final state
189      * @param finalResult final result
190      * @param finalMessage final message
191      */
192     public void abort(@NonNull State finalState, OperationFinalResult finalResult, String finalMessage) {
193         this.state = finalState;
194         this.finalResult = finalResult;
195         this.finalMessage = finalMessage;
196     }
197
198     /**
199      * Loads the next policy.
200      *
201      * @param lastResult result from the last policy
202      *
203      * @throws ControlLoopException if the processor cannot get a policy
204      */
205     public void loadNextPolicy(@NonNull OperationResult lastResult) throws ControlLoopException {
206         getProcessor().nextPolicyForResult(lastResult);
207         loadPolicy();
208     }
209
210     /**
211      * Loads the current policy.
212      *
213      * @throws ControlLoopException if the processor cannot get a policy
214      */
215     protected void loadPolicy() throws ControlLoopException {
216         if ((finalResult = getProcessor().checkIsCurrentPolicyFinal()) != null) {
217             // final policy - nothing more to do
218             return;
219         }
220
221         policy = getProcessor().getCurrentPolicy();
222
223         var actor = policy.getActorOperation();
224
225         OperationalTarget target = actor.getTarget();
226         String targetType = (target != null ? target.getTargetType() : null);
227         Map<String, String> entityIds = (target != null ? target.getEntityIds() : null);
228
229         // convert policy payload from Map<String,String> to Map<String,Object>
230         Map<String, Object> payload = new LinkedHashMap<>();
231         if (actor.getPayload() != null) {
232             payload.putAll(actor.getPayload());
233         }
234
235         // @formatter:off
236         ControlLoopOperationParams params = ControlLoopOperationParams.builder()
237                         .actorService(getActorService())
238                         .actor(actor.getActor())
239                         .operation(actor.getOperation())
240                         .requestId(getRequestId())
241                         .executor(getExecutor())
242                         .retry(policy.getRetries())
243                         .timeoutSec(policy.getTimeout())
244                         .targetType(TargetType.toTargetType(targetType))
245                         .targetEntityIds(entityIds)
246                         .payload(payload)
247                         .startCallback(this::onStart)
248                         .completeCallback(this::onComplete)
249                         .build();
250         // @formatter:on
251
252         // load the policy's operation
253         loadPolicyStep(params);
254     }
255
256     /**
257      * Makes the step associated with the given parameters.
258      *
259      * @param params operation's parameters
260      */
261     protected abstract void loadPolicyStep(ControlLoopOperationParams params);
262
263     /**
264      * Loads the preprocessor steps needed by the step that's at the front of the queue.
265      */
266     public void loadPreprocessorSteps() {
267         if (getSteps().size() >= MAX_STEPS) {
268             throw new IllegalStateException("too many steps");
269         }
270
271         // initialize the step so we can query its properties
272         assert getSteps().peek() != null;
273         getSteps().peek().init();
274     }
275
276     /**
277      * Executes the first step in the queue.
278      *
279      * @return {@code true} if the step was started, {@code false} if it is no longer
280      *         needed (or if the queue is empty)
281      */
282     public boolean executeStep() {
283         T step = getSteps().peek();
284         if (step == null) {
285             return false;
286         }
287
288         return step.start(getEndTimeMs() - System.currentTimeMillis());
289     }
290
291     /**
292      * Discards the current step, if any.
293      */
294     public void nextStep() {
295         getSteps().poll();
296     }
297
298     /**
299      * Delivers a notification to a topic.
300      *
301      * @param sinkName name of the topic sink
302      * @param notification notification to be published, or {@code null} if nothing is to
303      *        be published
304      * @param notificationType type of notification, used when logging error messages
305      * @param ruleName name of the rule doing the publishing
306      */
307     public <N> void deliver(String sinkName, N notification, String notificationType, String ruleName) {
308         try {
309             if (notification != null) {
310                 getPolicyEngineManager().deliver(sinkName, notification);
311             }
312
313         } catch (RuntimeException e) {
314             logger.warn("{}: {}.{}: manager={} exception publishing {}", getClosedLoopControlName(), getPolicyName(),
315                             ruleName, this, notificationType, e);
316         }
317     }
318
319     protected int bumpOffsets() {
320         return numOnsets++;
321     }
322
323     protected int bumpAbatements() {
324         return numAbatements++;
325     }
326
327     @Override
328     public void onStart(OperationOutcome outcome) {
329         super.onStart(outcome);
330         workMem.update(factHandle, this);
331     }
332
333     @Override
334     public void onComplete(OperationOutcome outcome) {
335         super.onComplete(outcome);
336         workMem.update(factHandle, this);
337     }
338
339     // these following methods may be overridden by junit tests
340
341     protected PolicyEngine getPolicyEngineManager() {
342         return PolicyEngineConstants.getManager();
343     }
344 }