c4bd210a7e381525aeaab7e15eea554a973c9de5
[so/adapters/so-cnf-adapter.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2023 Nordix Foundation.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.so.cnfm.lcm.bpmn.flows.service;
21
22 import static org.onap.so.cnfm.lcm.bpmn.flows.CamundaVariableNameConstants.AS_INSTANCE_ID_PARAM_NAME;
23 import static org.onap.so.cnfm.lcm.bpmn.flows.CamundaVariableNameConstants.CREATE_AS_REQUEST_PARAM_NAME;
24 import static org.onap.so.cnfm.lcm.bpmn.flows.CamundaVariableNameConstants.INSTANTIATE_AS_REQUEST_PARAM_NAME;
25 import static org.onap.so.cnfm.lcm.bpmn.flows.CamundaVariableNameConstants.JOB_ID_PARAM_NAME;
26 import static org.onap.so.cnfm.lcm.bpmn.flows.CamundaVariableNameConstants.OCC_ID_PARAM_NAME;
27 import static org.onap.so.cnfm.lcm.bpmn.flows.CamundaVariableNameConstants.TERMINATE_AS_REQUEST_PARAM_NAME;
28 import static org.onap.so.cnfm.lcm.bpmn.flows.Constants.CREATE_AS_WORKFLOW_NAME;
29 import static org.onap.so.cnfm.lcm.bpmn.flows.Constants.DELETE_AS_WORKFLOW_NAME;
30 import static org.onap.so.cnfm.lcm.bpmn.flows.Constants.INSTANTIATE_AS_WORKFLOW_NAME;
31 import static org.onap.so.cnfm.lcm.bpmn.flows.Constants.TERMINATE_AS_WORKFLOW_NAME;
32 import static org.onap.so.cnfm.lcm.database.beans.JobStatusEnum.ERROR;
33 import static org.onap.so.cnfm.lcm.database.beans.JobStatusEnum.FINISHED;
34 import static org.onap.so.cnfm.lcm.database.beans.JobStatusEnum.FINISHED_WITH_ERROR;
35 import static org.onap.so.cnfm.lcm.database.beans.JobStatusEnum.IN_PROGRESS;
36 import static org.slf4j.LoggerFactory.getLogger;
37 import java.time.Instant;
38 import java.time.LocalDateTime;
39 import java.util.HashMap;
40 import java.util.Map;
41 import java.util.Optional;
42 import java.util.concurrent.TimeUnit;
43 import org.apache.commons.lang3.tuple.ImmutablePair;
44 import org.onap.so.cnfm.lcm.bpmn.flows.GsonProvider;
45 import org.onap.so.cnfm.lcm.bpmn.flows.exceptions.AsRequestProcessingException;
46 import org.onap.so.cnfm.lcm.database.beans.AsInst;
47 import org.onap.so.cnfm.lcm.database.beans.AsLcmOpOcc;
48 import org.onap.so.cnfm.lcm.database.beans.AsLcmOpType;
49 import org.onap.so.cnfm.lcm.database.beans.Job;
50 import org.onap.so.cnfm.lcm.database.beans.JobAction;
51 import org.onap.so.cnfm.lcm.database.beans.JobStatusEnum;
52 import org.onap.so.cnfm.lcm.database.beans.OperationStateEnum;
53 import org.onap.so.cnfm.lcm.database.beans.State;
54 import org.onap.so.cnfm.lcm.database.service.DatabaseServiceProvider;
55 import org.onap.so.cnfm.lcm.model.AsInstance;
56 import org.onap.so.cnfm.lcm.model.CreateAsRequest;
57 import org.onap.so.cnfm.lcm.model.ErrorDetails;
58 import org.onap.so.cnfm.lcm.model.InstantiateAsRequest;
59 import org.onap.so.cnfm.lcm.model.TerminateAsRequest;
60 import org.slf4j.Logger;
61 import org.springframework.beans.factory.annotation.Autowired;
62 import org.springframework.beans.factory.annotation.Value;
63 import org.springframework.stereotype.Service;
64 import com.google.common.collect.ImmutableSet;
65 import com.google.gson.Gson;
66
67 /**
68  * @author Waqas Ikram (waqas.ikram@est.tech)
69  *
70  */
71 @Service
72 public class JobExecutorService {
73
74     private static final Logger logger = getLogger(JobExecutorService.class);
75
76     private static final ImmutableSet<JobStatusEnum> JOB_FINISHED_STATES =
77             ImmutableSet.of(FINISHED, ERROR, FINISHED_WITH_ERROR);
78
79     private static final int SLEEP_TIME_IN_SECONDS = 5;
80
81     @Value("${so-cnfm-lcm.requesttimeout.timeoutInSeconds:300}")
82     private int timeOutInSeconds;
83
84     private final DatabaseServiceProvider databaseServiceProvider;
85     private final WorkflowExecutorService workflowExecutorService;
86     private final WorkflowQueryService workflowQueryService;
87     private final Gson gson;
88
89     @Autowired
90     public JobExecutorService(final DatabaseServiceProvider databaseServiceProvider,
91             final WorkflowExecutorService workflowExecutorService, final WorkflowQueryService workflowQueryService,
92             final GsonProvider gsonProvider) {
93         this.databaseServiceProvider = databaseServiceProvider;
94         this.workflowExecutorService = workflowExecutorService;
95         this.workflowQueryService = workflowQueryService;
96         this.gson = gsonProvider.getGson();
97     }
98
99     public AsInstance runCreateAsJob(final CreateAsRequest createAsRequest) {
100         logger.info("Starting 'Create AS' workflow job for request:\n{}", createAsRequest);
101         final Job newJob = new Job().startTime(LocalDateTime.now()).jobType("AS").jobAction(JobAction.CREATE)
102                 .resourceId(createAsRequest.getAsdId()).resourceName(createAsRequest.getAsInstanceName())
103                 .status(JobStatusEnum.STARTING);
104         databaseServiceProvider.addJob(newJob);
105
106         logger.info("New job created in database :\n{}", newJob);
107
108         workflowExecutorService.executeWorkflow(newJob.getJobId(), CREATE_AS_WORKFLOW_NAME,
109                 getVariables(newJob.getJobId(), createAsRequest));
110
111         final ImmutablePair<String, JobStatusEnum> immutablePair =
112                 waitForJobToFinish(newJob.getJobId(), JOB_FINISHED_STATES);
113
114         if (immutablePair.getRight() == null) {
115             final String message = "Failed to create AS for request: \n" + createAsRequest;
116             logger.error(message);
117             throw new AsRequestProcessingException(message);
118         }
119         final JobStatusEnum finalJobStatus = immutablePair.getRight();
120         final String processInstanceId = immutablePair.getLeft();
121
122         if (!FINISHED.equals(finalJobStatus)) {
123
124             final Optional<ErrorDetails> optional = workflowQueryService.getErrorDetails(processInstanceId);
125             if (optional.isPresent()) {
126                 final ErrorDetails errorDetails = optional.get();
127                 final String message =
128                         "Failed to create AS for request: \n" + createAsRequest + " due to \n" + errorDetails;
129                 logger.error(message);
130                 throw new AsRequestProcessingException(message, errorDetails);
131             }
132
133             final String message = "Received unexpected Job Status: " + finalJobStatus
134                     + " Failed to Create AS for request: \n" + createAsRequest;
135             logger.error(message);
136             throw new AsRequestProcessingException(message);
137         }
138
139         logger.debug("Will query for CreateAsResponse using processInstanceId:{}", processInstanceId);
140         final Optional<AsInstance> optional = workflowQueryService.getCreateNsResponse(processInstanceId);
141         if (optional.isEmpty()) {
142             final String message =
143                     "Unable to find CreateAsReponse in Camunda History for process instance: " + processInstanceId;
144             logger.error(message);
145             throw new AsRequestProcessingException(message);
146         }
147         return optional.get();
148     }
149
150     public String runInstantiateAsJob(final String asInstanceId, final InstantiateAsRequest instantiateAsRequest) {
151         final Job newJob = new Job().startTime(LocalDateTime.now()).jobType("AS").jobAction(JobAction.INSTANTIATE)
152                 .resourceId(asInstanceId).status(JobStatusEnum.STARTING);
153         databaseServiceProvider.addJob(newJob);
154         logger.info("New job created in database :\n{}", newJob);
155
156         final LocalDateTime currentDateTime = LocalDateTime.now();
157         final AsLcmOpOcc newAsLcmOpOcc = new AsLcmOpOcc().id(asInstanceId).operation(AsLcmOpType.INSTANTIATE)
158                 .operationState(OperationStateEnum.PROCESSING).stateEnteredTime(currentDateTime)
159                 .startTime(currentDateTime).asInst(getAsInst(asInstanceId)).isAutoInvocation(false)
160                 .isCancelPending(false).operationParams(gson.toJson(instantiateAsRequest));
161         databaseServiceProvider.addAsLcmOpOcc(newAsLcmOpOcc);
162         logger.info("New AsLcmOpOcc created in database :\n{}", newAsLcmOpOcc);
163
164         workflowExecutorService.executeWorkflow(newJob.getJobId(), INSTANTIATE_AS_WORKFLOW_NAME,
165                 getVariables(asInstanceId, newJob.getJobId(), newAsLcmOpOcc.getId(), instantiateAsRequest));
166
167         final ImmutableSet<JobStatusEnum> jobFinishedStates =
168                 ImmutableSet.of(FINISHED, ERROR, FINISHED_WITH_ERROR, IN_PROGRESS);
169         final ImmutablePair<String, JobStatusEnum> immutablePair =
170                 waitForJobToFinish(newJob.getJobId(), jobFinishedStates);
171
172         if (immutablePair.getRight() == null) {
173             final String message = "Failed to Instantiate AS for request: \n" + instantiateAsRequest;
174             logger.error(message);
175             throw new AsRequestProcessingException(message);
176         }
177
178         final JobStatusEnum finalJobStatus = immutablePair.getRight();
179         if (IN_PROGRESS.equals(finalJobStatus) || FINISHED.equals(finalJobStatus)) {
180             logger.info("Instantiation Job status: {}", finalJobStatus);
181             return newAsLcmOpOcc.getId();
182         }
183
184         final String message = "Received unexpected Job Status: " + finalJobStatus
185                 + " Failed to instantiate AS for request: \n" + instantiateAsRequest;
186         logger.error(message);
187         throw new AsRequestProcessingException(message);
188     }
189
190     public String runTerminateAsJob(final String asInstanceId, final TerminateAsRequest terminateAsRequest) {
191         doInitialTerminateChecks(asInstanceId, terminateAsRequest);
192
193         final Job newJob = new Job().startTime(LocalDateTime.now()).jobType("AS").jobAction(JobAction.TERMINATE)
194                 .resourceId(asInstanceId).status(JobStatusEnum.STARTING);
195         databaseServiceProvider.addJob(newJob);
196         logger.info("New job created in database :\n{}", newJob);
197
198         final LocalDateTime currentDateTime = LocalDateTime.now();
199         final AsLcmOpOcc newAsLcmOpOcc = new AsLcmOpOcc().id(asInstanceId).operation(AsLcmOpType.TERMINATE)
200                 .operationState(OperationStateEnum.PROCESSING).stateEnteredTime(currentDateTime)
201                 .startTime(currentDateTime).asInst(getAsInst(asInstanceId)).isAutoInvocation(false)
202                 .isCancelPending(false).operationParams(gson.toJson(terminateAsRequest));
203         databaseServiceProvider.addAsLcmOpOcc(newAsLcmOpOcc);
204         logger.info("New AsLcmOpOcc created in database :\n{}", newAsLcmOpOcc);
205
206         workflowExecutorService.executeWorkflow(newJob.getJobId(), TERMINATE_AS_WORKFLOW_NAME,
207                 getVariables(asInstanceId, newJob.getJobId(), newAsLcmOpOcc.getId(), terminateAsRequest));
208
209         final ImmutableSet<JobStatusEnum> jobFinishedStates =
210                 ImmutableSet.of(FINISHED, ERROR, FINISHED_WITH_ERROR, IN_PROGRESS);
211         final ImmutablePair<String, JobStatusEnum> immutablePair =
212                 waitForJobToFinish(newJob.getJobId(), jobFinishedStates);
213
214         if (immutablePair.getRight() == null) {
215             final String message =
216                     "Failed to Terminate AS with id: " + asInstanceId + " for request: \n" + terminateAsRequest;
217             logger.error(message);
218             throw new AsRequestProcessingException(message);
219         }
220
221         final JobStatusEnum finalJobStatus = immutablePair.getRight();
222
223         if (IN_PROGRESS.equals(finalJobStatus) || FINISHED.equals(finalJobStatus)) {
224             logger.info("Termination Job status: {}", finalJobStatus);
225             return newAsLcmOpOcc.getId();
226         }
227
228         final String message = "Received unexpected Job Status: " + finalJobStatus + " Failed to Terminate AS with id: "
229                 + asInstanceId + " for request: \n" + terminateAsRequest;
230         logger.error(message);
231         throw new AsRequestProcessingException(message);
232     }
233
234     public void runDeleteAsJob(final String asInstanceId) {
235         final Job newJob = new Job().startTime(LocalDateTime.now()).jobType("AS").jobAction(JobAction.DELETE)
236                 .resourceId(asInstanceId).status(JobStatusEnum.STARTING);
237         databaseServiceProvider.addJob(newJob);
238         logger.info("New job created in database :\n{}", newJob);
239
240         workflowExecutorService.executeWorkflow(newJob.getJobId(), DELETE_AS_WORKFLOW_NAME,
241                 getVariables(asInstanceId, newJob.getJobId()));
242
243         final ImmutablePair<String, JobStatusEnum> immutablePair =
244                 waitForJobToFinish(newJob.getJobId(), JOB_FINISHED_STATES);
245
246         if (immutablePair.getRight() == null) {
247             final String message = "Failed to Delete AS with id: " + asInstanceId;
248             logger.error(message);
249             throw new AsRequestProcessingException(message);
250         }
251
252         final JobStatusEnum finalJobStatus = immutablePair.getRight();
253         final String processInstanceId = immutablePair.getLeft();
254
255         logger.info("Delete Job status: {}", finalJobStatus);
256
257         if (!FINISHED.equals(finalJobStatus)) {
258
259             final Optional<ErrorDetails> optional = workflowQueryService.getErrorDetails(processInstanceId);
260             if (optional.isPresent()) {
261                 final ErrorDetails errorDetails = optional.get();
262                 final String message = "Failed to Delete AS with id: " + asInstanceId + " due to \n" + errorDetails;
263                 logger.error(message);
264                 throw new AsRequestProcessingException(message, errorDetails);
265             }
266
267             final String message = "Received unexpected Job Status: " + finalJobStatus
268                     + " Failed to Delete AS with id: " + asInstanceId;
269             logger.error(message);
270             throw new AsRequestProcessingException(message);
271         }
272
273         logger.debug("Delete AS finished successfully ...");
274     }
275
276     private AsInst getAsInst(final String asInstId) {
277         logger.info("Getting AsInst with nsInstId: {}", asInstId);
278         final Optional<AsInst> optionalNfvoNsInst = databaseServiceProvider.getAsInst(asInstId);
279
280         if (optionalNfvoNsInst.isEmpty()) {
281             final String message = "No matching AS Instance for id: " + asInstId + " found in database.";
282             throw new AsRequestProcessingException(message);
283         }
284
285         return optionalNfvoNsInst.get();
286     }
287
288     private ImmutablePair<String, JobStatusEnum> waitForJobToFinish(final String jobId,
289             final ImmutableSet<JobStatusEnum> jobFinishedStates) {
290         try {
291             final long startTimeInMillis = System.currentTimeMillis();
292             final long timeOutTime = startTimeInMillis + TimeUnit.SECONDS.toMillis(timeOutInSeconds);
293
294             logger.info("Will wait till {} for {} job to finish", Instant.ofEpochMilli(timeOutTime).toString(), jobId);
295             JobStatusEnum currentJobStatus = null;
296             while (timeOutTime > System.currentTimeMillis()) {
297
298                 final Optional<Job> optional = databaseServiceProvider.getRefreshedJob(jobId);
299
300                 if (optional.isEmpty()) {
301                     logger.error("Unable to find Job using jobId: {}", jobId);
302                     return ImmutablePair.nullPair();
303                 }
304
305                 final Job job = optional.get();
306                 currentJobStatus = job.getStatus();
307                 logger.debug("Received job status response: \n {}", job);
308                 if (jobFinishedStates.contains(currentJobStatus)) {
309                     logger.info("Job finished \n {}", currentJobStatus);
310                     return ImmutablePair.of(job.getProcessInstanceId(), currentJobStatus);
311                 }
312
313                 logger.info("Haven't received one of finish state {} yet, will try again in {} seconds",
314                         jobFinishedStates, SLEEP_TIME_IN_SECONDS);
315                 TimeUnit.SECONDS.sleep(SLEEP_TIME_IN_SECONDS);
316
317             }
318             logger.warn("Timeout current job status: {}", currentJobStatus);
319             return ImmutablePair.nullPair();
320         } catch (final InterruptedException interruptedException) {
321             Thread.currentThread().interrupt();
322             logger.error("Sleep was interrupted", interruptedException);
323             return ImmutablePair.nullPair();
324         }
325     }
326
327     private void doInitialTerminateChecks(final String asInstanceId, final TerminateAsRequest terminateAsRequest) {
328         final AsInst asInst = getAsInst(asInstanceId);
329         if (isNotInstantiated(asInst)) {
330             final String message = "TerminateAsRequest received: " + terminateAsRequest + " for asInstanceId: "
331                     + asInstanceId + "\nUnable to terminate.  AS Instance is already in " + State.NOT_INSTANTIATED
332                     + " state." + "\nThis method can only be used with an AS instance in the " + State.INSTANTIATED
333                     + " state.";
334             logger.error(message);
335             throw new AsRequestProcessingException(message);
336         }
337     }
338
339     private boolean isNotInstantiated(final AsInst asInst) {
340         return State.NOT_INSTANTIATED.equals(asInst.getStatus());
341     }
342
343     private Map<String, Object> getVariables(final String jobId, final CreateAsRequest createAsRequest) {
344         final Map<String, Object> variables = new HashMap<>();
345         variables.put(JOB_ID_PARAM_NAME, jobId);
346         variables.put(CREATE_AS_REQUEST_PARAM_NAME, createAsRequest);
347         return variables;
348     }
349
350     private Map<String, Object> getVariables(final String asInstanceId, final String jobId, final String occId,
351             final InstantiateAsRequest instantiateAsRequest) {
352         final Map<String, Object> variables = new HashMap<>();
353         variables.put(AS_INSTANCE_ID_PARAM_NAME, asInstanceId);
354         variables.put(JOB_ID_PARAM_NAME, jobId);
355         variables.put(OCC_ID_PARAM_NAME, occId);
356         variables.put(INSTANTIATE_AS_REQUEST_PARAM_NAME, instantiateAsRequest);
357         return variables;
358     }
359
360     private Map<String, Object> getVariables(final String asInstanceId, final String jobId, final String occId,
361             final TerminateAsRequest terminateAsRequest) {
362         final Map<String, Object> variables = new HashMap<>();
363         variables.put(AS_INSTANCE_ID_PARAM_NAME, asInstanceId);
364         variables.put(JOB_ID_PARAM_NAME, jobId);
365         variables.put(OCC_ID_PARAM_NAME, occId);
366         variables.put(TERMINATE_AS_REQUEST_PARAM_NAME, terminateAsRequest);
367         return variables;
368     }
369
370     private Map<String, Object> getVariables(final String asInstanceId, final String jobId) {
371         final Map<String, Object> variables = new HashMap<>();
372         variables.put(AS_INSTANCE_ID_PARAM_NAME, asInstanceId);
373         variables.put(JOB_ID_PARAM_NAME, jobId);
374         return variables;
375     }
376 }