2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
6 * =============================================================================
7 * Modifications Copyright (C) 2019 Ericsson
8 * =============================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
21 * ============LICENSE_END=========================================================
24 package org.onap.appc.flow.controller.node;
26 import static org.onap.appc.flow.controller.utils.FlowControllerConstants.ACTION_LEVEL;
27 import static org.onap.appc.flow.controller.utils.FlowControllerConstants.DESINGTIME;
28 import static org.onap.appc.flow.controller.utils.FlowControllerConstants.GRAPH;
29 import static org.onap.appc.flow.controller.utils.FlowControllerConstants.INPUT_PARAM_RESPONSE_PREFIX;
30 import static org.onap.appc.flow.controller.utils.FlowControllerConstants.NODE;
31 import static org.onap.appc.flow.controller.utils.FlowControllerConstants.OUTPUT_PARAM_ERROR_MESSAGE;
32 import static org.onap.appc.flow.controller.utils.FlowControllerConstants.OUTPUT_PARAM_STATUS;
33 import static org.onap.appc.flow.controller.utils.FlowControllerConstants.OUTPUT_STATUS_FAILURE;
34 import static org.onap.appc.flow.controller.utils.FlowControllerConstants.OUTPUT_STATUS_MESSAGE;
35 import static org.onap.appc.flow.controller.utils.FlowControllerConstants.OUTPUT_STATUS_SUCCESS;
36 import static org.onap.appc.flow.controller.utils.FlowControllerConstants.REQUEST_ACTION;
37 import static org.onap.appc.flow.controller.utils.FlowControllerConstants.REQUEST_ID;
38 import static org.onap.appc.flow.controller.utils.FlowControllerConstants.RESPONSE_PREFIX;
39 import static org.onap.appc.flow.controller.utils.FlowControllerConstants.REST;
40 import static org.onap.appc.flow.controller.utils.FlowControllerConstants.SEQUENCE_TYPE;
41 import static org.onap.appc.flow.controller.utils.FlowControllerConstants.VNF_TYPE;
42 import java.io.IOException;
43 import java.util.HashMap;
44 import java.util.List;
46 import org.apache.commons.lang3.StringUtils;
47 import org.onap.appc.flow.controller.ResponseHandlerImpl.DefaultResponseHandler;
48 import org.onap.appc.flow.controller.data.PrecheckOption;
49 import org.onap.appc.flow.controller.data.ResponseAction;
50 import org.onap.appc.flow.controller.data.Transaction;
51 import org.onap.appc.flow.controller.data.Transactions;
52 import org.onap.appc.flow.controller.dbervices.FlowControlDBService;
53 import org.onap.appc.flow.controller.executorImpl.GraphExecutor;
54 import org.onap.appc.flow.controller.executorImpl.NodeExecutor;
55 import org.onap.appc.flow.controller.executorImpl.RestExecutor;
56 import org.onap.appc.flow.controller.interfaces.FlowExecutorInterface;
57 import org.onap.ccsdk.sli.core.sli.SvcLogicContext;
58 import org.onap.ccsdk.sli.core.sli.SvcLogicException;
59 import org.onap.ccsdk.sli.core.sli.SvcLogicJavaPlugin;
60 import com.att.eelf.configuration.EELFLogger;
61 import com.att.eelf.configuration.EELFManager;
62 import com.fasterxml.jackson.databind.ObjectMapper;
64 public class FlowControlNode implements SvcLogicJavaPlugin {
66 private static final EELFLogger log = EELFManager.getInstance().getLogger(FlowControlNode.class);
68 private final FlowControlDBService dbService;
69 private final FlowSequenceGenerator flowSequenceGenerator;
71 public FlowControlNode() {
72 this.dbService = FlowControlDBService.initialise();
73 this.flowSequenceGenerator = new FlowSequenceGenerator();
76 FlowControlNode(FlowControlDBService dbService, FlowSequenceGenerator flowSequenceGenerator) {
77 this.dbService = dbService;
78 this.flowSequenceGenerator = flowSequenceGenerator;
81 public void processFlow(Map<String, String> inParams, SvcLogicContext ctx) throws SvcLogicException {
82 log.debug("Received processParamKeys call with params : " + inParams);
83 String responsePrefix = inParams.get(INPUT_PARAM_RESPONSE_PREFIX);
85 responsePrefix = StringUtils.isNotBlank(responsePrefix) ? (responsePrefix + ".") : "";
86 SvcLogicContext localContext = new SvcLogicContext();
88 localContext.setAttribute(REQUEST_ID, ctx.getAttribute(REQUEST_ID));
89 localContext.setAttribute(VNF_TYPE, ctx.getAttribute(VNF_TYPE));
90 localContext.setAttribute(REQUEST_ACTION, ctx.getAttribute(REQUEST_ACTION));
91 localContext.setAttribute(ACTION_LEVEL, ctx.getAttribute(ACTION_LEVEL));
92 localContext.setAttribute(RESPONSE_PREFIX, responsePrefix);
93 ctx.setAttribute(RESPONSE_PREFIX, responsePrefix);
95 dbService.getFlowReferenceData(ctx, inParams, localContext);
97 for (String key : localContext.getAttributeKeySet()) {
98 log.debug("processFlow " + key + "=" + ctx.getAttribute(key));
100 processFlowSequence(inParams, ctx, localContext);
101 if (!ctx.getAttribute(responsePrefix + OUTPUT_PARAM_STATUS).equals(OUTPUT_STATUS_SUCCESS)) {
102 throw new SvcLogicException(ctx.getAttribute(responsePrefix + OUTPUT_STATUS_MESSAGE));
104 } catch (Exception e) {
105 ctx.setAttribute(responsePrefix + OUTPUT_PARAM_STATUS, OUTPUT_STATUS_FAILURE);
106 ctx.setAttribute(responsePrefix + OUTPUT_PARAM_ERROR_MESSAGE, e.getMessage());
107 log.error("Error occurred in processFlow ", e);
108 throw new SvcLogicException(e.getMessage());
112 private void processFlowSequence(Map<String, String> inParams, SvcLogicContext ctx, SvcLogicContext localContext)
115 String fn = "FlowExecutorNode.processflowSequence";
116 log.debug(fn + "Received model for flow : " + localContext.toString());
118 localContext.getAttributeKeySet().forEach(key -> log.debug(key + "=" + ctx.getAttribute(key)));
120 String flowSequence = flowSequenceGenerator.getFlowSequence(inParams, ctx, localContext);
122 log.debug("Received Flow Sequence : " + flowSequence);
123 HashMap<Integer, Transaction> transactionMap = createTransactionMap(flowSequence, localContext);
124 executeAllTransaction(transactionMap, ctx);
125 log.info("Executed all the transaction successfully");
128 private void executeAllTransaction(HashMap<Integer, Transaction> transactionMap, SvcLogicContext ctx)
131 String fn = "FlowExecutorNode.executeAllTransaction ";
133 FlowExecutorInterface flowExecutor;
134 for (int key = 1; key <= transactionMap.size(); key++) {
135 log.debug(fn + "Starting transactions ID " + key + " :)=" + retry);
136 Transaction transaction = transactionMap.get(key);
137 if (!preProcessor(transactionMap, transaction)) {
138 log.info("Skipping Transaction ID " + transaction.getTransactionId());
141 if (transaction.getExecutionType() != null) {
142 switch (transaction.getExecutionType()) {
144 flowExecutor = new GraphExecutor();
147 flowExecutor = new NodeExecutor();
150 flowExecutor = new RestExecutor();
153 throw new Exception("No Executor found for transaction ID" + transaction.getTransactionId());
155 flowExecutor.execute(transaction, ctx);
156 ResponseAction responseAction = handleResponse(transaction, ctx);
158 if (responseAction.getWait() != null && Integer.parseInt(responseAction.getWait()) > 0) {
159 log.debug(fn + "Going to Sleep .... " + responseAction.getWait());
160 Thread.sleep(Integer.parseInt(responseAction.getWait()) * 1000L);
162 if (responseAction.isIntermediateMessage()) {
163 log.debug(fn + "Sending Intermediate Message back .... ");
164 sendIntermediateMessage();
166 if (responseAction.getRetry() != null && Integer.parseInt(responseAction.getRetry()) > retry) {
167 log.debug(fn + "Ooppss!!! We will retry again ....... ");
170 log.debug(fn + "key =" + key + "retry =" + retry);
172 if (responseAction.isIgnore()) {
173 log.debug(fn + "Ignoring this Error and moving ahead ....... ");
176 if (responseAction.isStop()) {
177 log.debug(fn + "Need to Stop ....... ");
180 if (responseAction.getJump() != null && Integer.parseInt(responseAction.getJump()) > 0) {
181 key = Integer.parseInt(responseAction.getJump());
184 log.debug(fn + "key =" + key + "retry =" + retry);
187 throw new Exception("Don't know how to execute transaction ID " + transaction.getTransactionId());
192 protected void sendIntermediateMessage() {
193 // TODO Auto-generated method stub
196 protected ResponseAction handleResponse(Transaction transaction, SvcLogicContext ctx) {
197 log.info("Handling Response for transaction Id " + transaction.getTransactionId());
198 DefaultResponseHandler defaultHandler = new DefaultResponseHandler();
199 return defaultHandler.handlerResponse(transaction, ctx);
202 private boolean preProcessor(HashMap<Integer, Transaction> transactionMap, Transaction transaction)
205 log.debug("Starting Preprocessing Logic ");
206 boolean runThisStep = false;
208 if (transaction.getPrecheck() != null && transaction.getPrecheck().getPrecheckOptions() != null
209 && !transaction.getPrecheck().getPrecheckOptions().isEmpty()) {
211 List<PrecheckOption> precheckOptions = transaction.getPrecheck().getPrecheckOptions();
212 for (PrecheckOption precheck : precheckOptions) {
213 Transaction trans = transactionMap.get(precheck.getpTransactionID());
214 ObjectMapper mapper = new ObjectMapper();
215 log.info("Mapper= " + mapper.writeValueAsString(trans));
216 HashMap trmap = mapper.readValue(mapper.writeValueAsString(trans), HashMap.class);
217 runThisStep = trmap.get(precheck.getParamName()) != null
218 && ((String) trmap.get(precheck.getParamName())).equalsIgnoreCase(precheck.getParamValue());
220 if (("any").equalsIgnoreCase(transaction.getPrecheck().getPrecheckOperator()) && runThisStep) {
225 log.debug("No Pre check defined for transaction ID " + transaction.getTransactionId());
228 } catch (Exception e) {
229 log.error("Error occured when Preprocessing Logic ", e);
232 log.debug("Returing process current Transaction = " + runThisStep);
236 private HashMap<Integer, Transaction> createTransactionMap(String flowSequence, SvcLogicContext localContext)
239 ObjectMapper mapper = new ObjectMapper();
240 Transactions transactions = mapper.readValue(flowSequence, Transactions.class);
241 HashMap<Integer, Transaction> transMap = new HashMap<>();
242 for (Transaction transaction : transactions.getTransactions()) {
243 compileFlowDependencies(transaction, localContext);
244 // parse the Transactions Object and create records in process_flow_status table
245 // loadTransactionIntoStatus(transactions, ctx);
246 transMap.put(transaction.getTransactionId(), transaction);
251 private void compileFlowDependencies(Transaction transaction, SvcLogicContext localContext) throws Exception {
253 dbService.populateModuleAndRPC(transaction, localContext.getAttribute(VNF_TYPE));
254 ObjectMapper mapper = new ObjectMapper();
255 log.debug("Individual Transaction Details :" + transaction.toString());
257 if ((localContext.getAttribute(SEQUENCE_TYPE) == null) || (localContext.getAttribute(SEQUENCE_TYPE) != null
258 && !localContext.getAttribute(SEQUENCE_TYPE).equalsIgnoreCase(DESINGTIME))) {
260 localContext.setAttribute("artifact-content", mapper.writeValueAsString(transaction));
261 dbService.loadSequenceIntoDB(localContext);
263 // get a field in transction class as transactionhandle interface and register
264 // the Handler here for each transactions