Implant vid-app-common org.onap.vid.job (main and test)
[vid.git] / vid-app-common / src / main / java / org / onap / vid / job / impl / JobWorker.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * VID
4  * ================================================================================
5  * Copyright (C) 2017 - 2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.vid.job.impl;
22
23 import org.apache.commons.lang3.StringUtils;
24 import org.apache.commons.lang3.exception.ExceptionUtils;
25 import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate;
26 import org.onap.vid.job.*;
27 import org.onap.vid.job.command.JobCommandFactory;
28 import org.quartz.JobExecutionContext;
29 import org.springframework.scheduling.quartz.QuartzJobBean;
30 import org.springframework.stereotype.Component;
31
32 import java.util.Optional;
33 import java.util.UUID;
34
35 import static org.onap.vid.job.Job.JobStatus.FAILED;
36 import static org.onap.vid.job.Job.JobStatus.STOPPED;
37 import static org.onap.vid.job.command.ResourceCommandKt.ACTION_PHASE;
38 import static org.onap.vid.job.command.ResourceCommandKt.INTERNAL_STATE;
39
40 @Component
41 public class JobWorker extends QuartzJobBean {
42
43     private static final EELFLoggerDelegate LOGGER = EELFLoggerDelegate.getLogger(JobWorker.class);
44
45     private JobsBrokerService jobsBrokerService;
46     private JobCommandFactory jobCommandFactory;
47     private Job.JobStatus topic;
48
49     @Override
50     protected void executeInternal(JobExecutionContext context) {
51         Optional<Job> job;
52
53         job = pullJob();
54
55         while (job.isPresent()) {
56             Job nextJob = executeJobAndGetNext(job.get());
57             pushBack(nextJob);
58
59             job = pullJob();
60         }
61     }
62
63     private Optional<Job> pullJob() {
64         try {
65             return jobsBrokerService.pull(topic, UUID.randomUUID().toString());
66         } catch (Exception e) {
67             LOGGER.error(EELFLoggerDelegate.errorLogger, "failed to pull job from queue, breaking: {}", e, e);
68             tryMutingJobFromException(e);
69
70             return Optional.empty();
71         }
72     }
73
74     private void pushBack(Job nextJob) {
75         try {
76             jobsBrokerService.pushBack(nextJob);
77         } catch (Exception e) {
78             LOGGER.error(EELFLoggerDelegate.errorLogger, "failed pushing back job to queue: {}", e, e);
79         }
80     }
81
82     protected Job executeJobAndGetNext(Job job) {
83         Object internalState = job.getData().get(INTERNAL_STATE);
84         Object actionPhase = job.getData().get(ACTION_PHASE);
85         LOGGER.debug(EELFLoggerDelegate.debugLogger, "going to execute job {} of {}: {}/{}  {}/{}",
86                 StringUtils.substring(String.valueOf(job.getUuid()), 0, 8),
87                 StringUtils.substring(String.valueOf(job.getTemplateId()), 0, 8),
88                 job.getStatus(),
89                 job.getType(),
90                 actionPhase,
91                 internalState
92                 );
93
94         NextCommand nextCommand = executeCommandAndGetNext(job);
95
96         return setNextCommandInJob(nextCommand, job);
97     }
98
99     private NextCommand executeCommandAndGetNext(Job job) {
100         NextCommand nextCommand;
101         try {
102             final JobCommand jobCommand = jobCommandFactory.toCommand(job);
103             nextCommand = jobCommand.call();
104         } catch (Exception e) {
105             LOGGER.error("error while executing job from queue: {}", e);
106             nextCommand = new NextCommand(FAILED);
107         }
108
109         if (nextCommand == null) {
110             nextCommand = new NextCommand(STOPPED);
111         }
112         return nextCommand;
113     }
114
115     private Job setNextCommandInJob(NextCommand nextCommand, Job job) {
116         LOGGER.debug(EELFLoggerDelegate.debugLogger, "transforming job {} of {}: {}/{} -> {}{}",
117                 StringUtils.substring(String.valueOf(job.getUuid()), 0, 8),
118                 StringUtils.substring(String.valueOf(job.getTemplateId()), 0, 8),
119                 job.getStatus(), job.getType(),
120                 nextCommand.getStatus(),
121                 nextCommand.getCommand() != null ? ("/" + nextCommand.getCommand().getType()) : "");
122
123         job.setStatus(nextCommand.getStatus());
124
125         if (nextCommand.getCommand() != null) {
126             job.setTypeAndData(nextCommand.getCommand().getType(), nextCommand.getCommand().getData());
127         }
128
129         return job;
130     }
131
132     private void tryMutingJobFromException(Exception e) {
133         // If there's JobException in the stack, read job uuid from
134         // the exception, and mute it in DB.
135         final int indexOfJobException =
136                 ExceptionUtils.indexOfThrowable(e, JobException.class);
137
138         if (indexOfJobException >= 0) {
139             try {
140                 final JobException jobException = (JobException) ExceptionUtils.getThrowableList(e).get(indexOfJobException);
141                 LOGGER.info(EELFLoggerDelegate.debugLogger, "muting job: {} ({})", jobException.getJobUuid(), jobException.toString());
142                 final boolean success = jobsBrokerService.mute(jobException.getJobUuid());
143                 if (!success) {
144                     LOGGER.error(EELFLoggerDelegate.errorLogger, "failed to mute job {}", jobException.getJobUuid());
145                 }
146             } catch (Exception e1) {
147                 LOGGER.error(EELFLoggerDelegate.errorLogger, "failed to mute job: {}", e1, e1);
148             }
149         }
150     }
151
152     //used by quartz to inject JobsBrokerService into the job
153     //see JobSchedulerInitializer
154     public void setJobsBrokerService(JobsBrokerService jobsBrokerService) {
155         this.jobsBrokerService = jobsBrokerService;
156     }
157
158     public void setJobCommandFactory(JobCommandFactory jobCommandFactory) {
159         this.jobCommandFactory = jobCommandFactory;
160     }
161
162     public void setTopic(Job.JobStatus topic) {
163         this.topic = topic;
164     }
165 }