a449366403d4b23fbd401ac47aead190291a524c
[policy/drools-applications.git] / controlloop / common / eventmanager / src / main / java / org / onap / policy / controlloop / eventmanager / ClEventManagerWithSteps.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.eventmanager;
22
23 import java.util.ArrayDeque;
24 import java.util.Deque;
25 import java.util.LinkedHashMap;
26 import java.util.Map;
27 import java.util.UUID;
28 import lombok.AccessLevel;
29 import lombok.Getter;
30 import lombok.NonNull;
31 import lombok.Setter;
32 import lombok.ToString;
33 import org.drools.core.WorkingMemory;
34 import org.kie.api.runtime.rule.FactHandle;
35 import org.onap.policy.controlloop.ControlLoopException;
36 import org.onap.policy.controlloop.actorserviceprovider.OperationFinalResult;
37 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
38 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
39 import org.onap.policy.controlloop.actorserviceprovider.TargetType;
40 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
41 import org.onap.policy.controlloop.drl.legacy.ControlLoopParams;
42 import org.onap.policy.drools.domain.models.operational.Operation;
43 import org.onap.policy.drools.domain.models.operational.OperationalTarget;
44 import org.onap.policy.drools.system.PolicyEngine;
45 import org.onap.policy.drools.system.PolicyEngineConstants;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 /**
50  * Manager for a single control loop event. Processing progresses through each policy,
51  * which involves at least one step. As a step is processed, additional preprocessor steps
52  * may be pushed onto the queue (e.g., locks, A&AI queries, guards).
53  */
54 @ToString(onlyExplicitlyIncluded = true)
55 public abstract class ClEventManagerWithSteps<T extends Step> extends ControlLoopEventManager implements StepContext {
56
57     private static final Logger logger = LoggerFactory.getLogger(ClEventManagerWithSteps.class);
58     private static final long serialVersionUID = -1216568161322872641L;
59
60     /**
61      * Maximum number of steps, for a single policy, allowed in the queue at a time. This
62      * prevents an infinite loop occurring with calls to {@link #loadPreprocessorSteps()}.
63      */
64     public static final int MAX_STEPS = 30;
65
66     public enum State {
67         LOAD_POLICY, POLICY_LOADED, AWAITING_OUTCOME, DONE
68     }
69
70     /**
71      * Request ID, as a String.
72      */
73     @Getter
74     private final String requestIdStr;
75
76     @Getter
77     @Setter
78     private State state;
79
80     /**
81      * {@code True} if the event has been accepted (i.e., an "ACTIVE" notification has
82      * been delivered), {@code false} otherwise.
83      */
84     @Getter
85     @Setter
86     private boolean accepted;
87
88     /**
89      * Queue of steps waiting to be performed.
90      */
91     @Getter
92     private final transient Deque<T> steps = new ArrayDeque<>(6);
93
94     /**
95      * Policy currently being processed.
96      */
97     @Getter(AccessLevel.PROTECTED)
98     private Operation policy;
99
100     /**
101      * Result of the last policy operation. This is just a place where the rules can store
102      * the value for passing to {@link #loadNextPolicy()}.
103      */
104     @Getter
105     @Setter
106     private OperationResult result = OperationResult.SUCCESS;
107
108     @Getter
109     @ToString.Include
110     private int numOnsets = 1;
111     @Getter
112     @ToString.Include
113     private int numAbatements = 0;
114
115     @Getter
116     private OperationFinalResult finalResult = null;
117
118     /**
119      * Message to be placed into the final notification. Typically used when something
120      * causes processing to abort.
121      */
122     @Getter
123     private String finalMessage = null;
124
125     private final transient WorkingMemory workMem;
126     private transient FactHandle factHandle;
127
128
129     /**
130      * Constructs the object.
131      *
132      * @param services services the manager should use when processing the event
133      * @param params control loop parameters
134      * @param requestId event request ID
135      * @param workMem working memory to update if this changes
136      * @throws ControlLoopException if the event is invalid or if a YAML processor cannot
137      *         be created
138      */
139     protected ClEventManagerWithSteps(EventManagerServices services, ControlLoopParams params, UUID requestId,
140                     WorkingMemory workMem) throws ControlLoopException {
141
142         super(services, params, requestId);
143
144         if (requestId == null) {
145             throw new ControlLoopException("No request ID");
146         }
147
148         this.workMem = workMem;
149         this.requestIdStr = getRequestId().toString();
150     }
151
152     @Override
153     public void destroy() {
154         for (T step : getSteps()) {
155             step.cancel();
156         }
157
158         super.destroy();
159     }
160
161     /**
162      * Starts the manager and loads the first policy.
163      *
164      * @throws ControlLoopException if the processor cannot get a policy
165      */
166     public void start() throws ControlLoopException {
167         if (!isActive()) {
168             throw new IllegalStateException("manager is no longer active");
169         }
170
171         if ((factHandle = workMem.getFactHandle(this)) == null) {
172             throw new IllegalStateException("manager is not in working memory");
173         }
174
175         if (!getSteps().isEmpty()) {
176             throw new IllegalStateException("manager already started");
177         }
178
179         loadPolicy();
180     }
181
182     /**
183      * Indicates that processing has been aborted.
184      *
185      * @param finalState final state
186      * @param finalResult final result
187      * @param finalMessage final message
188      */
189     public void abort(@NonNull State finalState, OperationFinalResult finalResult, String finalMessage) {
190         this.state = finalState;
191         this.finalResult = finalResult;
192         this.finalMessage = finalMessage;
193     }
194
195     /**
196      * Loads the next policy.
197      *
198      * @param lastResult result from the last policy
199      *
200      * @throws ControlLoopException if the processor cannot get a policy
201      */
202     public void loadNextPolicy(@NonNull OperationResult lastResult) throws ControlLoopException {
203         getProcessor().nextPolicyForResult(lastResult);
204         loadPolicy();
205     }
206
207     /**
208      * Loads the current policy.
209      *
210      * @throws ControlLoopException if the processor cannot get a policy
211      */
212     protected void loadPolicy() throws ControlLoopException {
213         if ((finalResult = getProcessor().checkIsCurrentPolicyFinal()) != null) {
214             // final policy - nothing more to do
215             return;
216         }
217
218         policy = getProcessor().getCurrentPolicy();
219
220         var actor = policy.getActorOperation();
221
222         OperationalTarget target = actor.getTarget();
223         String targetType = (target != null ? target.getTargetType() : null);
224         Map<String, String> entityIds = (target != null ? target.getEntityIds() : null);
225
226         // convert policy payload from Map<String,String> to Map<String,Object>
227         Map<String, Object> payload = new LinkedHashMap<>();
228         if (actor.getPayload() != null) {
229             payload.putAll(actor.getPayload());
230         }
231
232         // @formatter:off
233         ControlLoopOperationParams params = ControlLoopOperationParams.builder()
234                         .actorService(getActorService())
235                         .actor(actor.getActor())
236                         .operation(actor.getOperation())
237                         .requestId(getRequestId())
238                         .executor(getExecutor())
239                         .retry(policy.getRetries())
240                         .timeoutSec(policy.getTimeout())
241                         .targetType(TargetType.toTargetType(targetType))
242                         .targetEntityIds(entityIds)
243                         .payload(payload)
244                         .startCallback(this::onStart)
245                         .completeCallback(this::onComplete)
246                         .build();
247         // @formatter:on
248
249         // load the policy's operation
250         loadPolicyStep(params);
251     }
252
253     /**
254      * Makes the step associated with the given parameters.
255      *
256      * @param params operation's parameters
257      */
258     protected abstract void loadPolicyStep(ControlLoopOperationParams params);
259
260     /**
261      * Loads the preprocessor steps needed by the step that's at the front of the queue.
262      */
263     public void loadPreprocessorSteps() {
264         if (getSteps().size() >= MAX_STEPS) {
265             throw new IllegalStateException("too many steps");
266         }
267
268         // initialize the step so we can query its properties
269         getSteps().peek().init();
270     }
271
272     /**
273      * Executes the first step in the queue.
274      *
275      * @return {@code true} if the step was started, {@code false} if it is no longer
276      *         needed (or if the queue is empty)
277      */
278     public boolean executeStep() {
279         T step = getSteps().peek();
280         if (step == null) {
281             return false;
282         }
283
284         return step.start(getEndTimeMs() - System.currentTimeMillis());
285     }
286
287     /**
288      * Discards the current step, if any.
289      */
290     public void nextStep() {
291         getSteps().poll();
292     }
293
294     /**
295      * Delivers a notification to a topic.
296      *
297      * @param sinkName name of the topic sink
298      * @param notification notification to be published, or {@code null} if nothing is to
299      *        be published
300      * @param notificationType type of notification, used when logging error messages
301      * @param ruleName name of the rule doing the publishing
302      */
303     public <N> void deliver(String sinkName, N notification, String notificationType, String ruleName) {
304         try {
305             if (notification != null) {
306                 getPolicyEngineManager().deliver(sinkName, notification);
307             }
308
309         } catch (RuntimeException e) {
310             logger.warn("{}: {}.{}: manager={} exception publishing {}", getClosedLoopControlName(), getPolicyName(),
311                             ruleName, this, notificationType, e);
312         }
313     }
314
315     protected int bumpOffsets() {
316         return numOnsets++;
317     }
318
319     protected int bumpAbatements() {
320         return numAbatements++;
321     }
322
323     @Override
324     public void onStart(OperationOutcome outcome) {
325         super.onStart(outcome);
326         workMem.update(factHandle, this);
327     }
328
329     @Override
330     public void onComplete(OperationOutcome outcome) {
331         super.onComplete(outcome);
332         workMem.update(factHandle, this);
333     }
334
335     // these following methods may be overridden by junit tests
336
337     protected PolicyEngine getPolicyEngineManager() {
338         return PolicyEngineConstants.getManager();
339     }
340 }