2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * Modifications Copyright (C) 2019-2020 Nordix Foundation.
5 * Modifications Copyright (C) 2021 Bell Canada. All rights reserved.
6 * Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
20 * SPDX-License-Identifier: Apache-2.0
21 * ============LICENSE_END=========================================================
24 package org.onap.policy.apex.core.engine.executor;
26 import static org.onap.policy.common.utils.validation.Assertions.argumentOfClassNotNull;
28 import java.util.HashMap;
29 import java.util.List;
31 import java.util.Properties;
33 import java.util.TreeMap;
34 import java.util.TreeSet;
36 import lombok.NonNull;
37 import org.onap.policy.apex.context.ContextException;
38 import org.onap.policy.apex.core.engine.ExecutorParameters;
39 import org.onap.policy.apex.core.engine.TaskParameters;
40 import org.onap.policy.apex.core.engine.context.ApexInternalContext;
41 import org.onap.policy.apex.core.engine.executor.context.TaskExecutionContext;
42 import org.onap.policy.apex.core.engine.executor.exception.StateMachineException;
43 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
44 import org.onap.policy.apex.model.basicmodel.concepts.AxReferenceKey;
45 import org.onap.policy.apex.model.eventmodel.concepts.AxField;
46 import org.onap.policy.apex.model.policymodel.concepts.AxTask;
47 import org.onap.policy.apex.model.policymodel.concepts.AxTaskParameter;
48 import org.slf4j.ext.XLogger;
49 import org.slf4j.ext.XLoggerFactory;
52 * This abstract class executes a task in a state of an Apex policy and is specialized by classes that implement
53 * execution of task logic.
55 * @author Sven van der Meer (sven.van.der.meer@ericsson.com)
56 * @author Liam Fallon (liam.fallon@ericsson.com)
58 public abstract class TaskExecutor
59 implements Executor<Map<String, Object>, Map<String, Map<String, Object>>, AxTask, ApexInternalContext> {
60 // Logger for this class
61 private static final XLogger LOGGER = XLoggerFactory.getXLogger(TaskExecutor.class);
63 // Hold the task and context definitions for this task
65 private Executor<?, ?, ?, ?> parent = null;
66 private AxTask axTask = null;
67 private ApexInternalContext internalContext = null;
69 // Holds the incoming and outgoing fields
70 private Map<String, Object> incomingFields = null;
71 private Map<String, Map<String, Object>> outgoingFieldsMap = null;
73 // The next task executor
74 private Executor<Map<String, Object>, Map<String, Map<String, Object>>, AxTask, ApexInternalContext> nextExecutor =
77 // The task execution context; contains the facades for events and context to be used by tasks
78 // executed by this task
81 private TaskExecutionContext executionContext = null;
87 public void setContext(final Executor<?, ?, ?, ?> newParent, final AxTask newAxTask,
88 final ApexInternalContext newInternalContext) {
89 this.parent = newParent;
90 this.axTask = newAxTask;
91 this.internalContext = newInternalContext;
98 public void prepare() throws StateMachineException {
99 LOGGER.debug("prepare:" + axTask.getKey().getId() + "," + axTask.getTaskLogic().getLogicFlavour() + ","
100 + axTask.getTaskLogic().getLogic());
101 argumentOfClassNotNull(axTask.getTaskLogic().getLogic(), StateMachineException.class,
102 "task logic cannot be null.");
109 public Map<String, Map<String, Object>> execute(final long executionId, final Properties executionProperties,
110 final Map<String, Object> newIncomingFields) throws StateMachineException, ContextException {
111 throw new StateMachineException(
112 "execute() not implemented on abstract TaskExecutor class, only on its subclasses");
119 public final void executePre(final long executionId, @NonNull final Properties executionProperties,
120 final Map<String, Object> newIncomingFields) throws StateMachineException, ContextException {
121 LOGGER.debug("execute-pre:" + getSubject().getTaskLogic().getLogicFlavour() + ","
122 + getSubject().getKey().getId() + "," + getSubject().getTaskLogic().getLogic());
124 // Check that the incoming event has all the input fields for this state
125 Map<String, AxField> inputEventParameterMap = axTask.getInputEvent().getParameterMap();
126 final Set<String> missingTaskInputFields = new TreeSet<>(inputEventParameterMap.keySet());
127 missingTaskInputFields.removeAll(newIncomingFields.keySet());
129 // Remove fields from the set that are optional
130 missingTaskInputFields.removeIf(missingField -> inputEventParameterMap.get(missingField).getOptional());
132 if (!missingTaskInputFields.isEmpty()) {
133 throw new StateMachineException("task input fields \"" + missingTaskInputFields
134 + "\" are missing for task \"" + axTask.getKey().getId() + "\"");
137 // Record the incoming fields
138 this.incomingFields = newIncomingFields;
140 // Initiate the outgoing fields
141 outgoingFieldsMap = new TreeMap<>();
142 for (var outputEventEntry: axTask.getOutputEvents().entrySet()) {
143 Map<String, Object> outgoingFields = new TreeMap<>();
144 outputEventEntry.getValue().getParameterMap().keySet().forEach(field -> outgoingFields.put(field, null));
145 outgoingFieldsMap.put(outputEventEntry.getKey(), outgoingFields);
147 // Get task context object
148 executionContext = new TaskExecutionContext(this, executionId, executionProperties, getSubject(), getIncoming(),
149 outgoingFieldsMap.values(), getContext());
156 public final void executePost(final boolean returnValue) throws StateMachineException, ContextException {
158 String errorMessage = "execute-post: task logic execution failure on task \"" + axTask.getKey().getName()
159 + "\" in model " + internalContext.getKey().getId();
160 if (executionContext.getMessage() != null) {
161 errorMessage += ", user message: " + executionContext.getMessage();
163 LOGGER.warn(errorMessage);
164 throw new StateMachineException(errorMessage);
167 // Copy any unset fields from the input to the output if their data type and names are identical
168 axTask.getOutputEvents().entrySet().forEach(outputEventEntry -> outputEventEntry.getValue().getParameterMap()
169 .keySet().forEach(field -> copyInputField2Output(outputEventEntry.getKey(), field)));
171 // Finally, check that the outgoing fields have all the output fields defined for this state
172 // and, if not, output a list of missing fields
173 Map<String, Set<String>> missingTaskOutputFieldsMap = new TreeMap<>();
174 axTask.getOutputEvents().entrySet().forEach(outputEventEntry -> {
175 Set<String> missingTaskOutputFields = new TreeSet<>();
176 missingTaskOutputFields.addAll(outputEventEntry.getValue().getParameterMap().keySet());
177 String key = outputEventEntry.getKey();
178 missingTaskOutputFields.removeAll(outgoingFieldsMap.get(key).keySet());
179 missingTaskOutputFieldsMap.put(key, missingTaskOutputFields);
182 // Remove fields from the set that are optional
183 missingTaskOutputFieldsMap.entrySet()
184 .forEach(missingTaskOutputFieldsEntry -> missingTaskOutputFieldsEntry.getValue()
185 .removeIf(missingField -> axTask.getInputEvent().getParameterMap().containsKey(missingField)
186 || axTask.getOutputEvents().get(missingTaskOutputFieldsEntry.getKey()).getParameterMap()
187 .get(missingField).getOptional()));
188 missingTaskOutputFieldsMap.entrySet()
189 .removeIf(missingTaskOutputFieldsEntry -> missingTaskOutputFieldsEntry.getValue().isEmpty());
190 if (!missingTaskOutputFieldsMap.isEmpty()) {
191 throw new StateMachineException("Fields for task output events \"" + missingTaskOutputFieldsMap.keySet()
192 + "\" are missing for task \"" + axTask.getKey().getId() + "\"");
196 // Finally, check that the outgoing field map don't have any extra fields, if present, raise
197 // exception with the list of extra fields
198 final Map<String, Set<String>> extraTaskOutputFieldsMap = new TreeMap<>();
199 outgoingFieldsMap.entrySet().forEach(outgoingFieldsEntry -> extraTaskOutputFieldsMap
200 .put(outgoingFieldsEntry.getKey(), new TreeSet<>(outgoingFieldsEntry.getValue().keySet())));
201 extraTaskOutputFieldsMap.entrySet().forEach(extraTaskOutputFieldsEntry -> extraTaskOutputFieldsEntry.getValue()
202 .removeAll(axTask.getOutputEvents().get(extraTaskOutputFieldsEntry.getKey()).getParameterMap().keySet()));
203 extraTaskOutputFieldsMap.entrySet()
204 .removeIf(extraTaskOutputFieldsEntry -> extraTaskOutputFieldsEntry.getValue().isEmpty());
205 if (!extraTaskOutputFieldsMap.isEmpty()) {
206 throw new StateMachineException("task output event \"" + extraTaskOutputFieldsMap.keySet()
207 + "\" contains fields that are unwanted for task \"" + axTask.getKey().getId() + "\"");
211 "execute-post:" + axTask.getKey().getId() + ", returning fields " + outgoingFieldsMap.toString();
212 LOGGER.debug(message);
216 * If the input field exists on the output and it is not set in the task, then it should be copied to the output.
218 * @param eventName the event name
219 * @param field the input field
221 private void copyInputField2Output(String eventName, String field) {
222 Map<String, Object> outgoingFields = outgoingFieldsMap.get(eventName);
223 // Check if the field exists and is not set on the output
224 if (outgoingFields.get(field) != null) {
228 // This field is not in the output, check if it's on the input and is the same type
229 // (Note here, the output field definition has to exist so it's not null checked)
230 final AxField inputFieldDef = axTask.getInputEvent().getParameterMap().get(field);
231 final AxField outputFieldDef = axTask.getOutputEvents().get(eventName).getParameterMap().get(field);
232 if (inputFieldDef == null || !inputFieldDef.getSchema().equals(outputFieldDef.getSchema())) {
236 // We have an input field that matches our output field, copy the value across
237 outgoingFields.put(field, getIncoming().get(field));
241 * If taskParameters are provided in ApexConfig, then they will be updated in the Tasks.
242 * If taskId is empty, the task parameter is added/updated to all available tasks
243 * Otherwise, the task parameter is added/updated to the corresponding task only.
245 * @param taskParametersFromConfig the list of task parameters provided in ApexConfig during deployment
247 public void updateTaskParameters(List<TaskParameters> taskParametersFromConfig) {
248 Map<String, AxTaskParameter> taskParameters = getSubject().getTaskParameters();
249 if (null == taskParameters) {
250 taskParameters = new HashMap<>();
252 for (TaskParameters taskParameterFromConfig : taskParametersFromConfig) {
253 if (null == taskParameterFromConfig.getTaskId()
254 || getSubject().getId().equals(taskParameterFromConfig.getTaskId())) {
255 taskParameters.put(taskParameterFromConfig.getKey(),
256 new AxTaskParameter(new AxReferenceKey(), taskParameterFromConfig.getValue()));
259 getSubject().setTaskParameters(taskParameters);
266 public void cleanUp() throws StateMachineException {
267 throw new StateMachineException("cleanUp() not implemented on class");
274 public AxArtifactKey getKey() {
275 return axTask.getKey();
282 public AxTask getSubject() {
290 public ApexInternalContext getContext() {
291 return internalContext;
298 public Map<String, Object> getIncoming() {
299 return incomingFields;
306 public Map<String, Map<String, Object>> getOutgoing() {
307 return outgoingFieldsMap;
315 final Executor<Map<String, Object>, Map<String, Map<String, Object>>, AxTask, ApexInternalContext> nextEx) {
316 this.nextExecutor = nextEx;
323 public Executor<Map<String, Object>, Map<String, Map<String, Object>>, AxTask, ApexInternalContext> getNext() {
331 public void setParameters(final ExecutorParameters parameters) {