2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
 
   6  * =============================================================================
 
   7  * Modifications Copyright (C) 2019 Ericsson
 
   8  * =============================================================================
 
   9  * Licensed under the Apache License, Version 2.0 (the "License");
 
  10  * you may not use this file except in compliance with the License.
 
  11  * You may obtain a copy of the License at
 
  13  *      http://www.apache.org/licenses/LICENSE-2.0
 
  15  * Unless required by applicable law or agreed to in writing, software
 
  16  * distributed under the License is distributed on an "AS IS" BASIS,
 
  17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  18  * See the License for the specific language governing permissions and
 
  19  * limitations under the License.
 
  21  * ============LICENSE_END=========================================================
 
  24 package org.onap.appc.flow.controller.node;
 
  26 import static org.onap.appc.flow.controller.utils.FlowControllerConstants.ACTION_LEVEL;
 
  27 import static org.onap.appc.flow.controller.utils.FlowControllerConstants.DESINGTIME;
 
  28 import static org.onap.appc.flow.controller.utils.FlowControllerConstants.GRAPH;
 
  29 import static org.onap.appc.flow.controller.utils.FlowControllerConstants.INPUT_PARAM_RESPONSE_PREFIX;
 
  30 import static org.onap.appc.flow.controller.utils.FlowControllerConstants.NODE;
 
  31 import static org.onap.appc.flow.controller.utils.FlowControllerConstants.OUTPUT_PARAM_ERROR_MESSAGE;
 
  32 import static org.onap.appc.flow.controller.utils.FlowControllerConstants.OUTPUT_PARAM_STATUS;
 
  33 import static org.onap.appc.flow.controller.utils.FlowControllerConstants.OUTPUT_STATUS_FAILURE;
 
  34 import static org.onap.appc.flow.controller.utils.FlowControllerConstants.OUTPUT_STATUS_MESSAGE;
 
  35 import static org.onap.appc.flow.controller.utils.FlowControllerConstants.OUTPUT_STATUS_SUCCESS;
 
  36 import static org.onap.appc.flow.controller.utils.FlowControllerConstants.REQUEST_ACTION;
 
  37 import static org.onap.appc.flow.controller.utils.FlowControllerConstants.REQUEST_ID;
 
  38 import static org.onap.appc.flow.controller.utils.FlowControllerConstants.RESPONSE_PREFIX;
 
  39 import static org.onap.appc.flow.controller.utils.FlowControllerConstants.REST;
 
  40 import static org.onap.appc.flow.controller.utils.FlowControllerConstants.SEQUENCE_TYPE;
 
  41 import static org.onap.appc.flow.controller.utils.FlowControllerConstants.VNF_TYPE;
 
  42 import java.io.IOException;
 
  43 import java.util.HashMap;
 
  44 import java.util.List;
 
  46 import org.apache.commons.lang3.StringUtils;
 
  47 import org.onap.appc.flow.controller.ResponseHandlerImpl.DefaultResponseHandler;
 
  48 import org.onap.appc.flow.controller.data.PrecheckOption;
 
  49 import org.onap.appc.flow.controller.data.ResponseAction;
 
  50 import org.onap.appc.flow.controller.data.Transaction;
 
  51 import org.onap.appc.flow.controller.data.Transactions;
 
  52 import org.onap.appc.flow.controller.dbervices.FlowControlDBService;
 
  53 import org.onap.appc.flow.controller.executorImpl.GraphExecutor;
 
  54 import org.onap.appc.flow.controller.executorImpl.NodeExecutor;
 
  55 import org.onap.appc.flow.controller.executorImpl.RestExecutor;
 
  56 import org.onap.appc.flow.controller.interfaces.FlowExecutorInterface;
 
  57 import org.onap.ccsdk.sli.core.sli.SvcLogicContext;
 
  58 import org.onap.ccsdk.sli.core.sli.SvcLogicException;
 
  59 import org.onap.ccsdk.sli.core.sli.SvcLogicJavaPlugin;
 
  60 import com.att.eelf.configuration.EELFLogger;
 
  61 import com.att.eelf.configuration.EELFManager;
 
  62 import com.fasterxml.jackson.databind.ObjectMapper;
 
  64 public class FlowControlNode implements SvcLogicJavaPlugin {
 
  66     private static final EELFLogger log = EELFManager.getInstance().getLogger(FlowControlNode.class);
 
  68     private final FlowControlDBService dbService;
 
  69     private final FlowSequenceGenerator flowSequenceGenerator;
 
  71     public FlowControlNode() {
 
  72         this.dbService = FlowControlDBService.initialise();
 
  73         this.flowSequenceGenerator = new FlowSequenceGenerator();
 
  76     FlowControlNode(FlowControlDBService dbService, FlowSequenceGenerator flowSequenceGenerator) {
 
  77         this.dbService = dbService;
 
  78         this.flowSequenceGenerator = flowSequenceGenerator;
 
  81     public void processFlow(Map<String, String> inParams, SvcLogicContext ctx) throws SvcLogicException {
 
  82         log.debug("Received processParamKeys call with params : " + inParams);
 
  83         String responsePrefix = inParams.get(INPUT_PARAM_RESPONSE_PREFIX);
 
  85             responsePrefix = StringUtils.isNotBlank(responsePrefix) ? (responsePrefix + ".") : "";
 
  86             SvcLogicContext localContext = new SvcLogicContext();
 
  88             localContext.setAttribute(REQUEST_ID, ctx.getAttribute(REQUEST_ID));
 
  89             localContext.setAttribute(VNF_TYPE, ctx.getAttribute(VNF_TYPE));
 
  90             localContext.setAttribute(REQUEST_ACTION, ctx.getAttribute(REQUEST_ACTION));
 
  91             localContext.setAttribute(ACTION_LEVEL, ctx.getAttribute(ACTION_LEVEL));
 
  92             localContext.setAttribute(RESPONSE_PREFIX, responsePrefix);
 
  93             ctx.setAttribute(RESPONSE_PREFIX, responsePrefix);
 
  95             dbService.getFlowReferenceData(ctx, inParams, localContext);
 
  97             for (String key : localContext.getAttributeKeySet()) {
 
  98                 log.debug("processFlow " + key + "=" + ctx.getAttribute(key));
 
 100             processFlowSequence(inParams, ctx, localContext);
 
 101             if (!ctx.getAttribute(responsePrefix + OUTPUT_PARAM_STATUS).equals(OUTPUT_STATUS_SUCCESS)) {
 
 102                 throw new SvcLogicException(ctx.getAttribute(responsePrefix + OUTPUT_STATUS_MESSAGE));
 
 104         } catch (Exception e) {
 
 105             ctx.setAttribute(responsePrefix + OUTPUT_PARAM_STATUS, OUTPUT_STATUS_FAILURE);
 
 106             ctx.setAttribute(responsePrefix + OUTPUT_PARAM_ERROR_MESSAGE, e.getMessage());
 
 107             log.error("Error occurred in processFlow ", e);
 
 108             throw new SvcLogicException(e.getMessage());
 
 112     private void processFlowSequence(Map<String, String> inParams, SvcLogicContext ctx, SvcLogicContext localContext)
 
 115         String fn = "FlowExecutorNode.processflowSequence";
 
 116         log.debug(fn + "Received model for flow : " + localContext.toString());
 
 118         localContext.getAttributeKeySet().forEach(key -> log.debug(key + "=" + ctx.getAttribute(key)));
 
 120         String flowSequence = flowSequenceGenerator.getFlowSequence(inParams, ctx, localContext);
 
 122         log.debug("Received Flow Sequence : " + flowSequence);
 
 123         HashMap<Integer, Transaction> transactionMap = createTransactionMap(flowSequence, localContext);
 
 124         executeAllTransaction(transactionMap, ctx);
 
 125         log.info("Executed all the transaction successfully");
 
 128     private void executeAllTransaction(HashMap<Integer, Transaction> transactionMap, SvcLogicContext ctx)
 
 131         String fn = "FlowExecutorNode.executeAllTransaction ";
 
 133         FlowExecutorInterface flowExecutor;
 
 134         for (int key = 1; key <= transactionMap.size(); key++) {
 
 135             log.debug(fn + "Starting transactions ID " + key + " :)=" + retry);
 
 136             Transaction transaction = transactionMap.get(key);
 
 137             if (!preProcessor(transactionMap, transaction)) {
 
 138                 log.info("Skipping Transaction ID " + transaction.getTransactionId());
 
 141             if (transaction.getExecutionType() != null) {
 
 142                 switch (transaction.getExecutionType()) {
 
 144                     flowExecutor = new GraphExecutor();
 
 147                     flowExecutor = new NodeExecutor();
 
 150                     flowExecutor = new RestExecutor();
 
 153                     throw new Exception("No Executor found for transaction ID" + transaction.getTransactionId());
 
 155                 flowExecutor.execute(transaction, ctx);
 
 156                 ResponseAction responseAction = handleResponse(transaction, ctx);
 
 158                 if (responseAction.getWait() != null && Integer.parseInt(responseAction.getWait()) > 0) {
 
 159                     log.debug(fn + "Going to Sleep .... " + responseAction.getWait());
 
 160                     Thread.sleep(Integer.parseInt(responseAction.getWait()) * 1000L);
 
 162                 if (responseAction.isIntermediateMessage()) {
 
 163                     log.debug(fn + "Sending Intermediate Message back  .... ");
 
 164                     sendIntermediateMessage();
 
 166                 if (responseAction.getRetry() != null && Integer.parseInt(responseAction.getRetry()) > retry) {
 
 167                     log.debug(fn + "Ooppss!!! We will retry again ....... ");
 
 170                     log.debug(fn + "key =" + key + "retry =" + retry);
 
 172                 if (responseAction.isIgnore()) {
 
 173                     log.debug(fn + "Ignoring this Error and moving ahead  ....... ");
 
 176                 if (responseAction.isStop()) {
 
 177                     log.debug(fn + "Need to Stop  ....... ");
 
 180                 if (responseAction.getJump() != null && Integer.parseInt(responseAction.getJump()) > 0) {
 
 181                     key = Integer.parseInt(responseAction.getJump());
 
 184                 log.debug(fn + "key =" + key + "retry =" + retry);
 
 187                 throw new Exception("Don't know how to execute transaction ID " + transaction.getTransactionId());
 
 192     protected void sendIntermediateMessage() {
 
 193         // TODO Auto-generated method stub
 
 196     protected ResponseAction handleResponse(Transaction transaction, SvcLogicContext ctx) {
 
 197         log.info("Handling Response for transaction Id " + transaction.getTransactionId());
 
 198         DefaultResponseHandler defaultHandler = new DefaultResponseHandler();
 
 199         return defaultHandler.handlerResponse(transaction, ctx);
 
 202     private boolean preProcessor(HashMap<Integer, Transaction> transactionMap, Transaction transaction)
 
 205         log.debug("Starting Preprocessing Logic ");
 
 206         boolean runThisStep = false;
 
 208             if (transaction.getPrecheck() != null && transaction.getPrecheck().getPrecheckOptions() != null
 
 209                     && !transaction.getPrecheck().getPrecheckOptions().isEmpty()) {
 
 211                 List<PrecheckOption> precheckOptions = transaction.getPrecheck().getPrecheckOptions();
 
 212                 for (PrecheckOption precheck : precheckOptions) {
 
 213                     Transaction trans = transactionMap.get(precheck.getpTransactionID());
 
 214                     ObjectMapper mapper = new ObjectMapper();
 
 215                     log.info("Mapper= " + mapper.writeValueAsString(trans));
 
 216                     HashMap trmap = mapper.readValue(mapper.writeValueAsString(trans), HashMap.class);
 
 217                     runThisStep = trmap.get(precheck.getParamName()) != null
 
 218                             && ((String) trmap.get(precheck.getParamName())).equalsIgnoreCase(precheck.getParamValue());
 
 220                     if (("any").equalsIgnoreCase(transaction.getPrecheck().getPrecheckOperator()) && runThisStep) {
 
 225                 log.debug("No Pre check defined for transaction ID " + transaction.getTransactionId());
 
 228         } catch (Exception e) {
 
 229             log.error("Error occured when Preprocessing Logic ", e);
 
 232         log.debug("Returing process current Transaction = " + runThisStep);
 
 236     private HashMap<Integer, Transaction> createTransactionMap(String flowSequence, SvcLogicContext localContext)
 
 239         ObjectMapper mapper = new ObjectMapper();
 
 240         Transactions transactions = mapper.readValue(flowSequence, Transactions.class);
 
 241         HashMap<Integer, Transaction> transMap = new HashMap<>();
 
 242         for (Transaction transaction : transactions.getTransactions()) {
 
 243             compileFlowDependencies(transaction, localContext);
 
 244             // parse the Transactions Object and create records in process_flow_status table
 
 245             // loadTransactionIntoStatus(transactions, ctx);
 
 246             transMap.put(transaction.getTransactionId(), transaction);
 
 251     private void compileFlowDependencies(Transaction transaction, SvcLogicContext localContext) throws Exception {
 
 253         dbService.populateModuleAndRPC(transaction, localContext.getAttribute(VNF_TYPE));
 
 254         ObjectMapper mapper = new ObjectMapper();
 
 255         log.debug("Individual Transaction Details :" + transaction.toString());
 
 257         if ((localContext.getAttribute(SEQUENCE_TYPE) == null) || (localContext.getAttribute(SEQUENCE_TYPE) != null
 
 258                 && !localContext.getAttribute(SEQUENCE_TYPE).equalsIgnoreCase(DESINGTIME))) {
 
 260             localContext.setAttribute("artifact-content", mapper.writeValueAsString(transaction));
 
 261             dbService.loadSequenceIntoDB(localContext);
 
 263         // get a field in transction class as transactionhandle interface and register
 
 264         // the Handler here for each transactions