Merge from ECOMP's repository
[vid.git] / vid-app-common / src / main / java / org / onap / vid / job / impl / JobWorker.java
1 package org.onap.vid.job.impl;
2
3 import org.apache.commons.lang3.StringUtils;
4 import org.apache.commons.lang3.exception.ExceptionUtils;
5 import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate;
6 import org.onap.vid.job.*;
7 import org.onap.vid.job.command.JobCommandFactory;
8 import org.onap.vid.properties.Features;
9 import org.quartz.JobExecutionContext;
10 import org.springframework.scheduling.quartz.QuartzJobBean;
11 import org.springframework.stereotype.Component;
12 import org.togglz.core.manager.FeatureManager;
13
14 import java.util.Optional;
15 import java.util.UUID;
16
17 import static org.onap.vid.job.Job.JobStatus.FAILED;
18 import static org.onap.vid.job.Job.JobStatus.STOPPED;
19
20 @Component
21 public class JobWorker extends QuartzJobBean {
22
23     private static final EELFLoggerDelegate LOGGER = EELFLoggerDelegate.getLogger(JobWorker.class);
24
25     private JobsBrokerService jobsBrokerService;
26     private FeatureManager featureManager;
27     private JobCommandFactory jobCommandFactory;
28     private Job.JobStatus topic;
29
30     @Override
31     protected void executeInternal(JobExecutionContext context) {
32         Optional<Job> job;
33
34         if (!isMsoNewApiActive()) {
35             return;
36         }
37
38         job = pullJob();
39
40         while (job.isPresent()) {
41             Job nextJob = executeJobAndGetNext(job.get());
42             pushBack(nextJob);
43
44             job = pullJob();
45         }
46     }
47
48     private Optional<Job> pullJob() {
49         try {
50             return jobsBrokerService.pull(topic, UUID.randomUUID().toString());
51         } catch (Exception e) {
52             LOGGER.error(EELFLoggerDelegate.errorLogger, "failed to pull job from queue, breaking: {}", e, e);
53             tryMutingJobFromException(e);
54
55             return Optional.empty();
56         }
57     }
58
59     private void pushBack(Job nextJob) {
60         try {
61             jobsBrokerService.pushBack(nextJob);
62         } catch (Exception e) {
63             LOGGER.error(EELFLoggerDelegate.errorLogger, "failed pushing back job to queue: {}", e, e);
64         }
65     }
66
67     protected Job executeJobAndGetNext(Job job) {
68         LOGGER.debug(EELFLoggerDelegate.debugLogger, "going to execute job {} of {}: {}/{}",
69                 StringUtils.substring(String.valueOf(job.getUuid()), 0, 8),
70                 StringUtils.substring(String.valueOf(job.getTemplateId()), 0, 8),
71                 job.getStatus(), job.getType());
72
73         NextCommand nextCommand = executeCommandAndGetNext(job);
74
75         return setNextCommandInJob(nextCommand, job);
76     }
77
78     private NextCommand executeCommandAndGetNext(Job job) {
79         NextCommand nextCommand;
80         try {
81             final JobCommand jobCommand = jobCommandFactory.toCommand(job);
82             nextCommand = jobCommand.call();
83         } catch (Exception e) {
84             LOGGER.error("error while executing job from queue: {}", e);
85             nextCommand = new NextCommand(FAILED);
86         }
87
88         if (nextCommand == null) {
89             nextCommand = new NextCommand(STOPPED);
90         }
91         return nextCommand;
92     }
93
94     private Job setNextCommandInJob(NextCommand nextCommand, Job job) {
95         LOGGER.debug(EELFLoggerDelegate.debugLogger, "transforming job {} of {}: {}/{} -> {}{}",
96                 StringUtils.substring(String.valueOf(job.getUuid()), 0, 8),
97                 StringUtils.substring(String.valueOf(job.getTemplateId()), 0, 8),
98                 job.getStatus(), job.getType(),
99                 nextCommand.getStatus(),
100                 nextCommand.getCommand() != null ? ("/" + nextCommand.getCommand().getType()) : "");
101
102         job.setStatus(nextCommand.getStatus());
103
104         if (nextCommand.getCommand() != null) {
105             job.setTypeAndData(nextCommand.getCommand().getType(), nextCommand.getCommand().getData());
106         }
107
108         return job;
109     }
110
111     private boolean isMsoNewApiActive() {
112         return featureManager.isActive(Features.FLAG_ASYNC_INSTANTIATION);
113     }
114
115     private void tryMutingJobFromException(Exception e) {
116         // If there's JobException in the stack, read job uuid from
117         // the exception, and mute it in DB.
118         final int indexOfJobException =
119                 ExceptionUtils.indexOfThrowable(e, JobException.class);
120
121         if (indexOfJobException >= 0) {
122             try {
123                 final JobException jobException = (JobException) ExceptionUtils.getThrowableList(e).get(indexOfJobException);
124                 LOGGER.info(EELFLoggerDelegate.debugLogger, "muting job: {} ({})", jobException.getJobUuid(), jobException.toString());
125                 final boolean success = jobsBrokerService.mute(jobException.getJobUuid());
126                 if (!success) {
127                     LOGGER.error(EELFLoggerDelegate.errorLogger, "failed to mute job {}", jobException.getJobUuid());
128                 }
129             } catch (Exception e1) {
130                 LOGGER.error(EELFLoggerDelegate.errorLogger, "failed to mute job: {}", e1, e1);
131             }
132         }
133     }
134
135     //used by quartz to inject JobsBrokerService into the job
136     //see JobSchedulerInitializer
137     public void setJobsBrokerService(JobsBrokerService jobsBrokerService) {
138         this.jobsBrokerService = jobsBrokerService;
139     }
140
141     public void setFeatureManager(FeatureManager featureManager) {
142         this.featureManager = featureManager;
143     }
144
145     public void setJobCommandFactory(JobCommandFactory jobCommandFactory) {
146         this.jobCommandFactory = jobCommandFactory;
147     }
148
149     public void setTopic(Job.JobStatus topic) {
150         this.topic = topic;
151     }
152 }