Make drools-pdp work with drools and java versions compatible
[policy/drools-applications.git] / controlloop / common / eventmanager / src / main / java / org / onap / policy / controlloop / eventmanager / ClEventManagerWithSteps.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2023-2024 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.controlloop.eventmanager;
23
24 import java.util.ArrayDeque;
25 import java.util.Deque;
26 import java.util.LinkedHashMap;
27 import java.util.Map;
28 import java.util.UUID;
29 import lombok.AccessLevel;
30 import lombok.Getter;
31 import lombok.NonNull;
32 import lombok.Setter;
33 import lombok.ToString;
34 import org.drools.core.WorkingMemory;
35 import org.kie.api.runtime.rule.FactHandle;
36 import org.onap.policy.controlloop.ControlLoopException;
37 import org.onap.policy.controlloop.actorserviceprovider.OperationFinalResult;
38 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
39 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
40 import org.onap.policy.controlloop.actorserviceprovider.TargetType;
41 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
42 import org.onap.policy.controlloop.drl.legacy.ControlLoopParams;
43 import org.onap.policy.drools.domain.models.operational.Operation;
44 import org.onap.policy.drools.domain.models.operational.OperationalTarget;
45 import org.onap.policy.drools.system.PolicyEngine;
46 import org.onap.policy.drools.system.PolicyEngineConstants;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 /**
51  * Manager for a single control loop event. Processing progresses through each policy,
52  * which involves at least one step. As a step is processed, additional preprocessor steps
53  * may be pushed onto the queue (e.g., locks, A&AI queries, guards).
54  */
55 @ToString(onlyExplicitlyIncluded = true)
56 public abstract class ClEventManagerWithSteps<T extends Step> extends ControlLoopEventManager implements StepContext {
57
58     private static final Logger logger = LoggerFactory.getLogger(ClEventManagerWithSteps.class);
59     private static final long serialVersionUID = -1216568161322872641L;
60
61     /**
62      * Maximum number of steps, for a single policy, allowed in the queue at a time. This
63      * prevents an infinite loop occurring with calls to {@link #loadPreprocessorSteps()}.
64      */
65     public static final int MAX_STEPS = 30;
66
67     public enum State {
68         LOAD_POLICY, POLICY_LOADED, AWAITING_OUTCOME, DONE
69     }
70
71     /**
72      * Request ID, as a String.
73      */
74     @Getter
75     private final String requestIdStr;
76
77     @Getter
78     @Setter
79     private State state;
80
81     /**
82      * {@code True} if the event has been accepted (i.e., an "ACTIVE" notification has
83      * been delivered), {@code false} otherwise.
84      */
85     @Getter
86     @Setter
87     private boolean accepted;
88
89     /**
90      * Queue of steps waiting to be performed.
91      */
92     @Getter
93     private final transient Deque<T> steps = new ArrayDeque<>(6);
94
95     /**
96      * Policy currently being processed.
97      */
98     @Getter(AccessLevel.PROTECTED)
99     private Operation policy;
100
101     /**
102      * Result of the last policy operation. This is just a place where the rules can store
103      * the value for passing to {@link #loadNextPolicy(OperationResult)}.
104      */
105     @Getter
106     @Setter
107     private OperationResult result = OperationResult.SUCCESS;
108
109     @Getter
110     @ToString.Include
111     private int numOnsets = 1;
112     @Getter
113     @ToString.Include
114     private int numAbatements = 0;
115
116     @Getter
117     private OperationFinalResult finalResult = null;
118
119     /**
120      * Message to be placed into the final notification. Typically used when something
121      * causes processing to abort.
122      */
123     @Getter
124     private String finalMessage = null;
125
126     private final transient WorkingMemory workMem;
127     private transient FactHandle factHandle;
128
129
130     /**
131      * Constructs the object.
132      *
133      * @param services services the manager should use when processing the event
134      * @param params control loop parameters
135      * @param requestId event request ID
136      * @param workMem working memory to update if this changes
137      * @throws ControlLoopException if the event is invalid or if a YAML processor cannot
138      *         be created
139      */
140     protected ClEventManagerWithSteps(EventManagerServices services, ControlLoopParams params, UUID requestId,
141                     WorkingMemory workMem) throws ControlLoopException {
142
143         super(services, params, requestId);
144
145         if (requestId == null) {
146             throw new ControlLoopException("No request ID");
147         }
148
149         this.workMem = workMem;
150         this.requestIdStr = getRequestId().toString();
151     }
152
153     @Override
154     public void destroy() {
155         for (T step : getSteps()) {
156             step.cancel();
157         }
158
159         super.destroy();
160     }
161
162     /**
163      * Starts the manager and loads the first policy.
164      *
165      * @throws ControlLoopException if the processor cannot get a policy
166      */
167     public void start() throws ControlLoopException {
168         if (!isActive()) {
169             throw new IllegalStateException("manager is no longer active");
170         }
171
172         if ((factHandle = workMem.getFactHandle(this)) == null) {
173             throw new IllegalStateException("manager is not in working memory");
174         }
175
176         if (!getSteps().isEmpty()) {
177             throw new IllegalStateException("manager already started");
178         }
179
180         loadPolicy();
181     }
182
183     /**
184      * Indicates that processing has been aborted.
185      *
186      * @param finalState final state
187      * @param finalResult final result
188      * @param finalMessage final message
189      */
190     public void abort(@NonNull State finalState, OperationFinalResult finalResult, String finalMessage) {
191         this.state = finalState;
192         this.finalResult = finalResult;
193         this.finalMessage = finalMessage;
194     }
195
196     /**
197      * Loads the next policy.
198      *
199      * @param lastResult result from the last policy
200      *
201      * @throws ControlLoopException if the processor cannot get a policy
202      */
203     public void loadNextPolicy(@NonNull OperationResult lastResult) throws ControlLoopException {
204         getProcessor().nextPolicyForResult(lastResult);
205         loadPolicy();
206     }
207
208     /**
209      * Loads the current policy.
210      *
211      * @throws ControlLoopException if the processor cannot get a policy
212      */
213     protected void loadPolicy() throws ControlLoopException {
214         if ((finalResult = getProcessor().checkIsCurrentPolicyFinal()) != null) {
215             // final policy - nothing more to do
216             return;
217         }
218
219         policy = getProcessor().getCurrentPolicy();
220
221         var actor = policy.getActorOperation();
222
223         OperationalTarget target = actor.getTarget();
224         String targetType = (target != null ? target.getTargetType() : null);
225         Map<String, String> entityIds = (target != null ? target.getEntityIds() : null);
226
227         // convert policy payload from Map<String,String> to Map<String,Object>
228         Map<String, Object> payload = new LinkedHashMap<>();
229         if (actor.getPayload() != null) {
230             payload.putAll(actor.getPayload());
231         }
232
233         // @formatter:off
234         ControlLoopOperationParams params = ControlLoopOperationParams.builder()
235                         .actorService(getActorService())
236                         .actor(actor.getActor())
237                         .operation(actor.getOperation())
238                         .requestId(getRequestId())
239                         .executor(getExecutor())
240                         .retry(policy.getRetries())
241                         .timeoutSec(policy.getTimeout())
242                         .targetType(TargetType.toTargetType(targetType))
243                         .targetEntityIds(entityIds)
244                         .payload(payload)
245                         .startCallback(this::onStart)
246                         .completeCallback(this::onComplete)
247                         .build();
248         // @formatter:on
249
250         // load the policy's operation
251         loadPolicyStep(params);
252     }
253
254     /**
255      * Makes the step associated with the given parameters.
256      *
257      * @param params operation's parameters
258      */
259     protected abstract void loadPolicyStep(ControlLoopOperationParams params);
260
261     /**
262      * Loads the preprocessor steps needed by the step that's at the front of the queue.
263      */
264     public void loadPreprocessorSteps() {
265         if (getSteps().size() >= MAX_STEPS) {
266             throw new IllegalStateException("too many steps");
267         }
268
269         // initialize the step so we can query its properties
270         assert getSteps().peek() != null;
271         getSteps().peek().init();
272     }
273
274     /**
275      * Executes the first step in the queue.
276      *
277      * @return {@code true} if the step was started, {@code false} if it is no longer
278      *         needed (or if the queue is empty)
279      */
280     public boolean executeStep() {
281         T step = getSteps().peek();
282         if (step == null) {
283             return false;
284         }
285
286         return step.start(getEndTimeMs() - System.currentTimeMillis());
287     }
288
289     /**
290      * Discards the current step, if any.
291      */
292     public void nextStep() {
293         getSteps().poll();
294     }
295
296     /**
297      * Delivers a notification to a topic.
298      *
299      * @param sinkName name of the topic sink
300      * @param notification notification to be published, or {@code null} if nothing is to
301      *        be published
302      * @param notificationType type of notification, used when logging error messages
303      * @param ruleName name of the rule doing the publishing
304      */
305     public <N> void deliver(String sinkName, N notification, String notificationType, String ruleName) {
306         try {
307             if (notification != null) {
308                 getPolicyEngineManager().deliver(sinkName, notification);
309             }
310
311         } catch (RuntimeException e) {
312             logger.warn("{}: {}.{}: manager={} exception publishing {}", getClosedLoopControlName(), getPolicyName(),
313                             ruleName, this, notificationType, e);
314         }
315     }
316
317     protected int bumpOffsets() {
318         return numOnsets++;
319     }
320
321     protected int bumpAbatements() {
322         return numAbatements++;
323     }
324
325     @Override
326     public void onStart(OperationOutcome outcome) {
327         super.onStart(outcome);
328         workMem.update(factHandle, this);
329     }
330
331     @Override
332     public void onComplete(OperationOutcome outcome) {
333         super.onComplete(outcome);
334         workMem.update(factHandle, this);
335     }
336
337     // these following methods may be overridden by junit tests
338
339     protected PolicyEngine getPolicyEngineManager() {
340         return PolicyEngineConstants.getManager();
341     }
342 }