2  * ============LICENSE_START=======================================================
 
   3  *  Copyright (C) 2020 Nordix Foundation.
 
   4  * ================================================================================
 
   5  * Licensed under the Apache License, Version 2.0 (the "License");
 
   6  * you may not use this file except in compliance with the License.
 
   7  * You may obtain a copy of the License at
 
   9  *      http://www.apache.org/licenses/LICENSE-2.0
 
  11  * Unless required by applicable law or agreed to in writing, software
 
  12  * distributed under the License is distributed on an "AS IS" BASIS,
 
  13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  14  * See the License for the specific language governing permissions and
 
  15  * limitations under the License.
 
  17  * SPDX-License-Identifier: Apache-2.0
 
  18  * ============LICENSE_END=========================================================
 
  20 package org.onap.so.etsi.nfvo.ns.lcm.bpmn.flows.service;
 
  22 import static org.onap.so.etsi.nfvo.ns.lcm.bpmn.flows.CamundaVariableNameConstants.CREATE_NS_REQUEST_PARAM_NAME;
 
  23 import static org.onap.so.etsi.nfvo.ns.lcm.bpmn.flows.CamundaVariableNameConstants.GLOBAL_CUSTOMER_ID_PARAM_NAME;
 
  24 import static org.onap.so.etsi.nfvo.ns.lcm.bpmn.flows.CamundaVariableNameConstants.JOB_ID_PARAM_NAME;
 
  25 import static org.onap.so.etsi.nfvo.ns.lcm.bpmn.flows.CamundaVariableNameConstants.SERVICE_TYPE_PARAM_NAME;
 
  26 import static org.onap.so.etsi.nfvo.ns.lcm.bpmn.flows.Constants.CREATE_NS_WORKFLOW_NAME;
 
  27 import static org.onap.so.etsi.nfvo.ns.lcm.database.beans.JobStatusEnum.ERROR;
 
  28 import static org.onap.so.etsi.nfvo.ns.lcm.database.beans.JobStatusEnum.FINISHED;
 
  29 import static org.onap.so.etsi.nfvo.ns.lcm.database.beans.JobStatusEnum.FINISHED_WITH_ERROR;
 
  30 import static org.slf4j.LoggerFactory.getLogger;
 
  31 import java.time.Instant;
 
  32 import java.time.LocalDateTime;
 
  33 import java.util.HashMap;
 
  35 import java.util.Optional;
 
  36 import java.util.concurrent.TimeUnit;
 
  37 import org.apache.commons.lang3.tuple.ImmutablePair;
 
  38 import org.onap.so.etsi.nfvo.ns.lcm.bpmn.flows.exceptions.NsRequestProcessingException;
 
  39 import org.onap.so.etsi.nfvo.ns.lcm.database.beans.JobAction;
 
  40 import org.onap.so.etsi.nfvo.ns.lcm.database.beans.JobStatusEnum;
 
  41 import org.onap.so.etsi.nfvo.ns.lcm.database.beans.NfvoJob;
 
  42 import org.onap.so.etsi.nfvo.ns.lcm.database.service.DatabaseServiceProvider;
 
  43 import org.onap.so.etsi.nfvo.ns.lcm.model.CreateNsRequest;
 
  44 import org.onap.so.etsi.nfvo.ns.lcm.model.InlineResponse400;
 
  45 import org.onap.so.etsi.nfvo.ns.lcm.model.NsInstancesNsInstance;
 
  46 import org.slf4j.Logger;
 
  47 import org.springframework.beans.factory.annotation.Autowired;
 
  48 import org.springframework.beans.factory.annotation.Value;
 
  49 import org.springframework.stereotype.Service;
 
  50 import com.google.common.collect.ImmutableSet;
 
  53  * @author Waqas Ikram (waqas.ikram@est.tech)
 
  57 public class JobExecutorService {
 
  59     private static final Logger logger = getLogger(JobExecutorService.class);
 
  61     private static final ImmutableSet<JobStatusEnum> JOB_FINISHED_STATES =
 
  62             ImmutableSet.of(FINISHED, ERROR, FINISHED_WITH_ERROR);
 
  64     private static final int SLEEP_TIME_IN_SECONDS = 5;
 
  66     @Value("${so-etsi-ns-lcm-workflow-engine.requesttimeout.timeoutInSeconds:300}")
 
  67     private int timeOutInSeconds;
 
  69     private final DatabaseServiceProvider databaseServiceProvider;
 
  70     private final WorkflowExecutorService workflowExecutorService;
 
  71     private final WorkflowQueryService workflowQueryService;
 
  74     public JobExecutorService(final DatabaseServiceProvider databaseServiceProvider,
 
  75             final WorkflowExecutorService workflowExecutorService, final WorkflowQueryService workflowQueryService) {
 
  76         this.databaseServiceProvider = databaseServiceProvider;
 
  77         this.workflowExecutorService = workflowExecutorService;
 
  78         this.workflowQueryService = workflowQueryService;
 
  81     public NsInstancesNsInstance runCreateNsJob(final CreateNsRequest createNsRequest, final String globalCustomerId,
 
  82             final String serviceType) {
 
  83         logger.info("Starting 'Create NS' workflow job for request:\n{}", createNsRequest);
 
  84         final NfvoJob newJob = new NfvoJob().startTime(LocalDateTime.now()).jobType("NS").jobAction(JobAction.CREATE)
 
  85                 .resourceId(createNsRequest.getNsdId()).resourceName(createNsRequest.getNsName())
 
  86                 .status(JobStatusEnum.STARTING).progress(5);
 
  87         databaseServiceProvider.addJob(newJob);
 
  89         logger.info("New job created in database :\n{}", newJob);
 
  91         workflowExecutorService.executeWorkflow(newJob.getJobId(), CREATE_NS_WORKFLOW_NAME,
 
  92                 getVariables(newJob.getJobId(), createNsRequest, globalCustomerId, serviceType));
 
  94         final ImmutablePair<String, JobStatusEnum> immutablePair =
 
  95                 waitForJobToFinish(newJob.getJobId(), JOB_FINISHED_STATES);
 
  97         if (immutablePair.getRight() == null) {
 
  98             final String message = "Failed to create NS for request: \n" + createNsRequest;
 
  99             logger.error(message);
 
 100             throw new NsRequestProcessingException(message);
 
 102         final JobStatusEnum finalJobStatus = immutablePair.getRight();
 
 103         final String processInstanceId = immutablePair.getLeft();
 
 105         if (!FINISHED.equals(finalJobStatus)) {
 
 107             final Optional<InlineResponse400> optional = workflowQueryService.getProblemDetails(processInstanceId);
 
 108             if (optional.isPresent()) {
 
 109                 final InlineResponse400 problemDetails = optional.get();
 
 110                 final String message =
 
 111                         "Failed to create NS for request: \n" + createNsRequest + " due to \n" + problemDetails;
 
 112                 logger.error(message);
 
 113                 throw new NsRequestProcessingException(message, problemDetails);
 
 116             final String message = "Received unexpected Job Status: " + finalJobStatus
 
 117                     + " Failed to Create NS for request: \n" + createNsRequest;
 
 118             logger.error(message);
 
 119             throw new NsRequestProcessingException(message);
 
 122         logger.debug("Will query for CreateNsResponse using processInstanceId:{}", processInstanceId);
 
 123         final Optional<NsInstancesNsInstance> optional = workflowQueryService.getCreateNsResponse(processInstanceId);
 
 124         if (optional.isEmpty()) {
 
 125             final String message =
 
 126                     "Unable to find CreateNsReponse in Camunda History for process instance: " + processInstanceId;
 
 127             logger.error(message);
 
 128             throw new NsRequestProcessingException(message);
 
 130         return optional.get();
 
 133     private ImmutablePair<String, JobStatusEnum> waitForJobToFinish(final String jobId,
 
 134             final ImmutableSet<JobStatusEnum> jobFinishedStates) {
 
 136             final long startTimeInMillis = System.currentTimeMillis();
 
 137             final long timeOutTime = startTimeInMillis + TimeUnit.SECONDS.toMillis(timeOutInSeconds);
 
 139             logger.info("Will wait till {} for {} job to finish", Instant.ofEpochMilli(timeOutTime).toString(), jobId);
 
 140             JobStatusEnum currentJobStatus = null;
 
 141             while (timeOutTime > System.currentTimeMillis()) {
 
 143                 final Optional<NfvoJob> optional = databaseServiceProvider.getJob(jobId);
 
 145                 if (optional.isEmpty()) {
 
 146                     logger.error("Unable to find Job using jobId: {}", jobId);
 
 147                     return ImmutablePair.nullPair();
 
 150                 final NfvoJob nfvoJob = optional.get();
 
 151                 currentJobStatus = nfvoJob.getStatus();
 
 152                 logger.debug("Received job status response: \n ", nfvoJob);
 
 153                 if (jobFinishedStates.contains(nfvoJob.getStatus())) {
 
 154                     logger.info("Job finished \n {}", currentJobStatus);
 
 155                     return ImmutablePair.of(nfvoJob.getProcessInstanceId(), currentJobStatus);
 
 158                 logger.debug("Haven't received one of finish state {} yet, will try again in {} seconds",
 
 159                         jobFinishedStates, SLEEP_TIME_IN_SECONDS);
 
 160                 TimeUnit.SECONDS.sleep(SLEEP_TIME_IN_SECONDS);
 
 163             logger.warn("Timeout current job status: {}", currentJobStatus);
 
 164             return ImmutablePair.nullPair();
 
 165         } catch (final InterruptedException interruptedException) {
 
 166             Thread.currentThread().interrupt();
 
 167             logger.error("Sleep was interrupted", interruptedException);
 
 168             return ImmutablePair.nullPair();
 
 172     private Map<String, Object> getVariables(final String jobId, final CreateNsRequest createNsRequest,
 
 173             final String globalCustomerId, final String serviceType) {
 
 174         final Map<String, Object> variables = new HashMap<>();
 
 175         variables.put(JOB_ID_PARAM_NAME, jobId);
 
 176         variables.put(CREATE_NS_REQUEST_PARAM_NAME, createNsRequest);
 
 177         variables.put(GLOBAL_CUSTOMER_ID_PARAM_NAME, globalCustomerId);
 
 178         variables.put(SERVICE_TYPE_PARAM_NAME, serviceType);