2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2023 Nordix Foundation.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
20 package org.onap.so.cnfm.lcm.bpmn.flows.service;
22 import static org.onap.so.cnfm.lcm.bpmn.flows.CamundaVariableNameConstants.AS_INSTANCE_ID_PARAM_NAME;
23 import static org.onap.so.cnfm.lcm.bpmn.flows.CamundaVariableNameConstants.CREATE_AS_REQUEST_PARAM_NAME;
24 import static org.onap.so.cnfm.lcm.bpmn.flows.CamundaVariableNameConstants.INSTANTIATE_AS_REQUEST_PARAM_NAME;
25 import static org.onap.so.cnfm.lcm.bpmn.flows.CamundaVariableNameConstants.JOB_ID_PARAM_NAME;
26 import static org.onap.so.cnfm.lcm.bpmn.flows.CamundaVariableNameConstants.OCC_ID_PARAM_NAME;
27 import static org.onap.so.cnfm.lcm.bpmn.flows.CamundaVariableNameConstants.TERMINATE_AS_REQUEST_PARAM_NAME;
28 import static org.onap.so.cnfm.lcm.bpmn.flows.Constants.CREATE_AS_WORKFLOW_NAME;
29 import static org.onap.so.cnfm.lcm.bpmn.flows.Constants.DELETE_AS_WORKFLOW_NAME;
30 import static org.onap.so.cnfm.lcm.bpmn.flows.Constants.INSTANTIATE_AS_WORKFLOW_NAME;
31 import static org.onap.so.cnfm.lcm.bpmn.flows.Constants.TERMINATE_AS_WORKFLOW_NAME;
32 import static org.onap.so.cnfm.lcm.database.beans.JobStatusEnum.ERROR;
33 import static org.onap.so.cnfm.lcm.database.beans.JobStatusEnum.FINISHED;
34 import static org.onap.so.cnfm.lcm.database.beans.JobStatusEnum.FINISHED_WITH_ERROR;
35 import static org.onap.so.cnfm.lcm.database.beans.JobStatusEnum.IN_PROGRESS;
36 import static org.slf4j.LoggerFactory.getLogger;
37 import java.time.Instant;
38 import java.time.LocalDateTime;
39 import java.util.HashMap;
41 import java.util.Optional;
42 import java.util.concurrent.TimeUnit;
43 import org.apache.commons.lang3.tuple.ImmutablePair;
44 import org.onap.so.cnfm.lcm.bpmn.flows.GsonProvider;
45 import org.onap.so.cnfm.lcm.bpmn.flows.exceptions.AsRequestProcessingException;
46 import org.onap.so.cnfm.lcm.database.beans.AsInst;
47 import org.onap.so.cnfm.lcm.database.beans.AsLcmOpOcc;
48 import org.onap.so.cnfm.lcm.database.beans.AsLcmOpType;
49 import org.onap.so.cnfm.lcm.database.beans.Job;
50 import org.onap.so.cnfm.lcm.database.beans.JobAction;
51 import org.onap.so.cnfm.lcm.database.beans.JobStatusEnum;
52 import org.onap.so.cnfm.lcm.database.beans.OperationStateEnum;
53 import org.onap.so.cnfm.lcm.database.beans.State;
54 import org.onap.so.cnfm.lcm.database.service.DatabaseServiceProvider;
55 import org.onap.so.cnfm.lcm.model.AsInstance;
56 import org.onap.so.cnfm.lcm.model.CreateAsRequest;
57 import org.onap.so.cnfm.lcm.model.ErrorDetails;
58 import org.onap.so.cnfm.lcm.model.InstantiateAsRequest;
59 import org.onap.so.cnfm.lcm.model.TerminateAsRequest;
60 import org.slf4j.Logger;
61 import org.springframework.beans.factory.annotation.Autowired;
62 import org.springframework.beans.factory.annotation.Value;
63 import org.springframework.stereotype.Service;
64 import com.google.common.collect.ImmutableSet;
65 import com.google.gson.Gson;
68 * @author Waqas Ikram (waqas.ikram@est.tech)
72 public class JobExecutorService {
74 private static final Logger logger = getLogger(JobExecutorService.class);
76 private static final ImmutableSet<JobStatusEnum> JOB_FINISHED_STATES =
77 ImmutableSet.of(FINISHED, ERROR, FINISHED_WITH_ERROR);
79 private static final int SLEEP_TIME_IN_SECONDS = 5;
81 @Value("${so-cnfm-lcm.requesttimeout.timeoutInSeconds:300}")
82 private int timeOutInSeconds;
84 private final DatabaseServiceProvider databaseServiceProvider;
85 private final WorkflowExecutorService workflowExecutorService;
86 private final WorkflowQueryService workflowQueryService;
87 private final Gson gson;
90 public JobExecutorService(final DatabaseServiceProvider databaseServiceProvider,
91 final WorkflowExecutorService workflowExecutorService, final WorkflowQueryService workflowQueryService,
92 final GsonProvider gsonProvider) {
93 this.databaseServiceProvider = databaseServiceProvider;
94 this.workflowExecutorService = workflowExecutorService;
95 this.workflowQueryService = workflowQueryService;
96 this.gson = gsonProvider.getGson();
99 public AsInstance runCreateAsJob(final CreateAsRequest createAsRequest) {
100 logger.info("Starting 'Create AS' workflow job for request:\n{}", createAsRequest);
101 final Job newJob = new Job().startTime(LocalDateTime.now()).jobType("AS").jobAction(JobAction.CREATE)
102 .resourceId(createAsRequest.getAsdId()).resourceName(createAsRequest.getAsInstanceName())
103 .status(JobStatusEnum.STARTING);
104 databaseServiceProvider.addJob(newJob);
106 logger.info("New job created in database :\n{}", newJob);
108 workflowExecutorService.executeWorkflow(newJob.getJobId(), CREATE_AS_WORKFLOW_NAME,
109 getVariables(newJob.getJobId(), createAsRequest));
111 final ImmutablePair<String, JobStatusEnum> immutablePair =
112 waitForJobToFinish(newJob.getJobId(), JOB_FINISHED_STATES);
114 if (immutablePair.getRight() == null) {
115 final String message = "Failed to create AS for request: \n" + createAsRequest;
116 logger.error(message);
117 throw new AsRequestProcessingException(message);
119 final JobStatusEnum finalJobStatus = immutablePair.getRight();
120 final String processInstanceId = immutablePair.getLeft();
122 if (!FINISHED.equals(finalJobStatus)) {
124 final Optional<ErrorDetails> optional = workflowQueryService.getErrorDetails(processInstanceId);
125 if (optional.isPresent()) {
126 final ErrorDetails errorDetails = optional.get();
127 final String message =
128 "Failed to create AS for request: \n" + createAsRequest + " due to \n" + errorDetails;
129 logger.error(message);
130 throw new AsRequestProcessingException(message, errorDetails);
133 final String message = "Received unexpected Job Status: " + finalJobStatus
134 + " Failed to Create AS for request: \n" + createAsRequest;
135 logger.error(message);
136 throw new AsRequestProcessingException(message);
139 logger.debug("Will query for CreateAsResponse using processInstanceId:{}", processInstanceId);
140 final Optional<AsInstance> optional = workflowQueryService.getCreateNsResponse(processInstanceId);
141 if (optional.isEmpty()) {
142 final String message =
143 "Unable to find CreateAsReponse in Camunda History for process instance: " + processInstanceId;
144 logger.error(message);
145 throw new AsRequestProcessingException(message);
147 return optional.get();
150 public String runInstantiateAsJob(final String asInstanceId, final InstantiateAsRequest instantiateAsRequest) {
151 final Job newJob = new Job().startTime(LocalDateTime.now()).jobType("AS").jobAction(JobAction.INSTANTIATE)
152 .resourceId(asInstanceId).status(JobStatusEnum.STARTING);
153 databaseServiceProvider.addJob(newJob);
154 logger.info("New job created in database :\n{}", newJob);
156 final LocalDateTime currentDateTime = LocalDateTime.now();
157 final AsLcmOpOcc newAsLcmOpOcc = new AsLcmOpOcc().id(asInstanceId).operation(AsLcmOpType.INSTANTIATE)
158 .operationState(OperationStateEnum.PROCESSING).stateEnteredTime(currentDateTime)
159 .startTime(currentDateTime).asInst(getAsInst(asInstanceId)).isAutoInvocation(false)
160 .isCancelPending(false).operationParams(gson.toJson(instantiateAsRequest));
161 databaseServiceProvider.addAsLcmOpOcc(newAsLcmOpOcc);
162 logger.info("New AsLcmOpOcc created in database :\n{}", newAsLcmOpOcc);
164 workflowExecutorService.executeWorkflow(newJob.getJobId(), INSTANTIATE_AS_WORKFLOW_NAME,
165 getVariables(asInstanceId, newJob.getJobId(), newAsLcmOpOcc.getId(), instantiateAsRequest));
167 final ImmutableSet<JobStatusEnum> jobFinishedStates =
168 ImmutableSet.of(FINISHED, ERROR, FINISHED_WITH_ERROR, IN_PROGRESS);
169 final ImmutablePair<String, JobStatusEnum> immutablePair =
170 waitForJobToFinish(newJob.getJobId(), jobFinishedStates);
172 if (immutablePair.getRight() == null) {
173 final String message = "Failed to Instantiate AS for request: \n" + instantiateAsRequest;
174 logger.error(message);
175 throw new AsRequestProcessingException(message);
178 final JobStatusEnum finalJobStatus = immutablePair.getRight();
179 if (IN_PROGRESS.equals(finalJobStatus) || FINISHED.equals(finalJobStatus)) {
180 logger.info("Instantiation Job status: {}", finalJobStatus);
181 return newAsLcmOpOcc.getId();
184 final String message = "Received unexpected Job Status: " + finalJobStatus
185 + " Failed to instantiate AS for request: \n" + instantiateAsRequest;
186 logger.error(message);
187 throw new AsRequestProcessingException(message);
190 public String runTerminateAsJob(final String asInstanceId, final TerminateAsRequest terminateAsRequest) {
191 doInitialTerminateChecks(asInstanceId, terminateAsRequest);
193 final Job newJob = new Job().startTime(LocalDateTime.now()).jobType("AS").jobAction(JobAction.TERMINATE)
194 .resourceId(asInstanceId).status(JobStatusEnum.STARTING);
195 databaseServiceProvider.addJob(newJob);
196 logger.info("New job created in database :\n{}", newJob);
198 final LocalDateTime currentDateTime = LocalDateTime.now();
199 final AsLcmOpOcc newAsLcmOpOcc = new AsLcmOpOcc().id(asInstanceId).operation(AsLcmOpType.TERMINATE)
200 .operationState(OperationStateEnum.PROCESSING).stateEnteredTime(currentDateTime)
201 .startTime(currentDateTime).asInst(getAsInst(asInstanceId)).isAutoInvocation(false)
202 .isCancelPending(false).operationParams(gson.toJson(terminateAsRequest));
203 databaseServiceProvider.addAsLcmOpOcc(newAsLcmOpOcc);
204 logger.info("New AsLcmOpOcc created in database :\n{}", newAsLcmOpOcc);
206 workflowExecutorService.executeWorkflow(newJob.getJobId(), TERMINATE_AS_WORKFLOW_NAME,
207 getVariables(asInstanceId, newJob.getJobId(), newAsLcmOpOcc.getId(), terminateAsRequest));
209 final ImmutableSet<JobStatusEnum> jobFinishedStates =
210 ImmutableSet.of(FINISHED, ERROR, FINISHED_WITH_ERROR, IN_PROGRESS);
211 final ImmutablePair<String, JobStatusEnum> immutablePair =
212 waitForJobToFinish(newJob.getJobId(), jobFinishedStates);
214 if (immutablePair.getRight() == null) {
215 final String message =
216 "Failed to Terminate AS with id: " + asInstanceId + " for request: \n" + terminateAsRequest;
217 logger.error(message);
218 throw new AsRequestProcessingException(message);
221 final JobStatusEnum finalJobStatus = immutablePair.getRight();
223 if (IN_PROGRESS.equals(finalJobStatus) || FINISHED.equals(finalJobStatus)) {
224 logger.info("Termination Job status: {}", finalJobStatus);
225 return newAsLcmOpOcc.getId();
228 final String message = "Received unexpected Job Status: " + finalJobStatus + " Failed to Terminate AS with id: "
229 + asInstanceId + " for request: \n" + terminateAsRequest;
230 logger.error(message);
231 throw new AsRequestProcessingException(message);
234 public void runDeleteAsJob(final String asInstanceId) {
235 final Job newJob = new Job().startTime(LocalDateTime.now()).jobType("AS").jobAction(JobAction.DELETE)
236 .resourceId(asInstanceId).status(JobStatusEnum.STARTING);
237 databaseServiceProvider.addJob(newJob);
238 logger.info("New job created in database :\n{}", newJob);
240 workflowExecutorService.executeWorkflow(newJob.getJobId(), DELETE_AS_WORKFLOW_NAME,
241 getVariables(asInstanceId, newJob.getJobId()));
243 final ImmutablePair<String, JobStatusEnum> immutablePair =
244 waitForJobToFinish(newJob.getJobId(), JOB_FINISHED_STATES);
246 if (immutablePair.getRight() == null) {
247 final String message = "Failed to Delete AS with id: " + asInstanceId;
248 logger.error(message);
249 throw new AsRequestProcessingException(message);
252 final JobStatusEnum finalJobStatus = immutablePair.getRight();
253 final String processInstanceId = immutablePair.getLeft();
255 logger.info("Delete Job status: {}", finalJobStatus);
257 if (!FINISHED.equals(finalJobStatus)) {
259 final Optional<ErrorDetails> optional = workflowQueryService.getErrorDetails(processInstanceId);
260 if (optional.isPresent()) {
261 final ErrorDetails errorDetails = optional.get();
262 final String message = "Failed to Delete AS with id: " + asInstanceId + " due to \n" + errorDetails;
263 logger.error(message);
264 throw new AsRequestProcessingException(message, errorDetails);
267 final String message = "Received unexpected Job Status: " + finalJobStatus
268 + " Failed to Delete AS with id: " + asInstanceId;
269 logger.error(message);
270 throw new AsRequestProcessingException(message);
273 logger.debug("Delete AS finished successfully ...");
276 private AsInst getAsInst(final String asInstId) {
277 logger.info("Getting AsInst with nsInstId: {}", asInstId);
278 final Optional<AsInst> optionalNfvoNsInst = databaseServiceProvider.getAsInst(asInstId);
280 if (optionalNfvoNsInst.isEmpty()) {
281 final String message = "No matching AS Instance for id: " + asInstId + " found in database.";
282 throw new AsRequestProcessingException(message);
285 return optionalNfvoNsInst.get();
288 private ImmutablePair<String, JobStatusEnum> waitForJobToFinish(final String jobId,
289 final ImmutableSet<JobStatusEnum> jobFinishedStates) {
291 final long startTimeInMillis = System.currentTimeMillis();
292 final long timeOutTime = startTimeInMillis + TimeUnit.SECONDS.toMillis(timeOutInSeconds);
294 logger.info("Will wait till {} for {} job to finish", Instant.ofEpochMilli(timeOutTime).toString(), jobId);
295 JobStatusEnum currentJobStatus = null;
296 while (timeOutTime > System.currentTimeMillis()) {
298 final Optional<Job> optional = databaseServiceProvider.getRefreshedJob(jobId);
300 if (optional.isEmpty()) {
301 logger.error("Unable to find Job using jobId: {}", jobId);
302 return ImmutablePair.nullPair();
305 final Job job = optional.get();
306 currentJobStatus = job.getStatus();
307 logger.debug("Received job status response: \n {}", job);
308 if (jobFinishedStates.contains(currentJobStatus)) {
309 logger.info("Job finished \n {}", currentJobStatus);
310 return ImmutablePair.of(job.getProcessInstanceId(), currentJobStatus);
313 logger.info("Haven't received one of finish state {} yet, will try again in {} seconds",
314 jobFinishedStates, SLEEP_TIME_IN_SECONDS);
315 TimeUnit.SECONDS.sleep(SLEEP_TIME_IN_SECONDS);
318 logger.warn("Timeout current job status: {}", currentJobStatus);
319 return ImmutablePair.nullPair();
320 } catch (final InterruptedException interruptedException) {
321 Thread.currentThread().interrupt();
322 logger.error("Sleep was interrupted", interruptedException);
323 return ImmutablePair.nullPair();
327 private void doInitialTerminateChecks(final String asInstanceId, final TerminateAsRequest terminateAsRequest) {
328 final AsInst asInst = getAsInst(asInstanceId);
329 if (isNotInstantiated(asInst)) {
330 final String message = "TerminateAsRequest received: " + terminateAsRequest + " for asInstanceId: "
331 + asInstanceId + "\nUnable to terminate. AS Instance is already in " + State.NOT_INSTANTIATED
332 + " state." + "\nThis method can only be used with an AS instance in the " + State.INSTANTIATED
334 logger.error(message);
335 throw new AsRequestProcessingException(message);
339 private boolean isNotInstantiated(final AsInst asInst) {
340 return State.NOT_INSTANTIATED.equals(asInst.getStatus());
343 private Map<String, Object> getVariables(final String jobId, final CreateAsRequest createAsRequest) {
344 final Map<String, Object> variables = new HashMap<>();
345 variables.put(JOB_ID_PARAM_NAME, jobId);
346 variables.put(CREATE_AS_REQUEST_PARAM_NAME, createAsRequest);
350 private Map<String, Object> getVariables(final String asInstanceId, final String jobId, final String occId,
351 final InstantiateAsRequest instantiateAsRequest) {
352 final Map<String, Object> variables = new HashMap<>();
353 variables.put(AS_INSTANCE_ID_PARAM_NAME, asInstanceId);
354 variables.put(JOB_ID_PARAM_NAME, jobId);
355 variables.put(OCC_ID_PARAM_NAME, occId);
356 variables.put(INSTANTIATE_AS_REQUEST_PARAM_NAME, instantiateAsRequest);
360 private Map<String, Object> getVariables(final String asInstanceId, final String jobId, final String occId,
361 final TerminateAsRequest terminateAsRequest) {
362 final Map<String, Object> variables = new HashMap<>();
363 variables.put(AS_INSTANCE_ID_PARAM_NAME, asInstanceId);
364 variables.put(JOB_ID_PARAM_NAME, jobId);
365 variables.put(OCC_ID_PARAM_NAME, occId);
366 variables.put(TERMINATE_AS_REQUEST_PARAM_NAME, terminateAsRequest);
370 private Map<String, Object> getVariables(final String asInstanceId, final String jobId) {
371 final Map<String, Object> variables = new HashMap<>();
372 variables.put(AS_INSTANCE_ID_PARAM_NAME, asInstanceId);
373 variables.put(JOB_ID_PARAM_NAME, jobId);