5b5d1e13d7cfd5c41277259e5acc81749e8c1f8b
[vid.git] / vid-app-common / src / main / java / org / onap / vid / job / impl / JobWorker.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * VID
4  * ================================================================================
5  * Copyright (C) 2017 - 2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.vid.job.impl;
22
23 import org.apache.commons.lang3.StringUtils;
24 import org.apache.commons.lang3.exception.ExceptionUtils;
25 import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate;
26 import org.onap.vid.job.*;
27 import org.onap.vid.job.command.JobCommandFactory;
28 import org.onap.vid.properties.Features;
29 import org.quartz.JobExecutionContext;
30 import org.springframework.scheduling.quartz.QuartzJobBean;
31 import org.springframework.stereotype.Component;
32 import org.togglz.core.manager.FeatureManager;
33
34 import java.util.Optional;
35 import java.util.UUID;
36
37 import static org.onap.vid.job.Job.JobStatus.FAILED;
38 import static org.onap.vid.job.Job.JobStatus.STOPPED;
39
40 @Component
41 public class JobWorker extends QuartzJobBean {
42
43     private static final EELFLoggerDelegate LOGGER = EELFLoggerDelegate.getLogger(JobWorker.class);
44
45     private JobsBrokerService jobsBrokerService;
46     private FeatureManager featureManager;
47     private JobCommandFactory jobCommandFactory;
48     private Job.JobStatus topic;
49
50     @Override
51     protected void executeInternal(JobExecutionContext context) {
52         Optional<Job> job;
53
54         if (!isMsoNewApiActive()) {
55             return;
56         }
57
58         job = pullJob();
59
60         while (job.isPresent()) {
61             Job nextJob = executeJobAndGetNext(job.get());
62             pushBack(nextJob);
63
64             job = pullJob();
65         }
66     }
67
68     private Optional<Job> pullJob() {
69         try {
70             return jobsBrokerService.pull(topic, UUID.randomUUID().toString());
71         } catch (Exception e) {
72             LOGGER.error(EELFLoggerDelegate.errorLogger, "failed to pull job from queue, breaking: {}", e, e);
73             tryMutingJobFromException(e);
74
75             return Optional.empty();
76         }
77     }
78
79     private void pushBack(Job nextJob) {
80         try {
81             jobsBrokerService.pushBack(nextJob);
82         } catch (Exception e) {
83             LOGGER.error(EELFLoggerDelegate.errorLogger, "failed pushing back job to queue: {}", e, e);
84         }
85     }
86
87     protected Job executeJobAndGetNext(Job job) {
88         LOGGER.debug(EELFLoggerDelegate.debugLogger, "going to execute job {} of {}: {}/{}",
89                 StringUtils.substring(String.valueOf(job.getUuid()), 0, 8),
90                 StringUtils.substring(String.valueOf(job.getTemplateId()), 0, 8),
91                 job.getStatus(), job.getType());
92
93         NextCommand nextCommand = executeCommandAndGetNext(job);
94
95         return setNextCommandInJob(nextCommand, job);
96     }
97
98     private NextCommand executeCommandAndGetNext(Job job) {
99         NextCommand nextCommand;
100         try {
101             final JobCommand jobCommand = jobCommandFactory.toCommand(job);
102             nextCommand = jobCommand.call();
103         } catch (Exception e) {
104             LOGGER.error("error while executing job from queue: {}", e);
105             nextCommand = new NextCommand(FAILED);
106         }
107
108         if (nextCommand == null) {
109             nextCommand = new NextCommand(STOPPED);
110         }
111         return nextCommand;
112     }
113
114     private Job setNextCommandInJob(NextCommand nextCommand, Job job) {
115         LOGGER.debug(EELFLoggerDelegate.debugLogger, "transforming job {} of {}: {}/{} -> {}{}",
116                 StringUtils.substring(String.valueOf(job.getUuid()), 0, 8),
117                 StringUtils.substring(String.valueOf(job.getTemplateId()), 0, 8),
118                 job.getStatus(), job.getType(),
119                 nextCommand.getStatus(),
120                 nextCommand.getCommand() != null ? ("/" + nextCommand.getCommand().getType()) : "");
121
122         job.setStatus(nextCommand.getStatus());
123
124         if (nextCommand.getCommand() != null) {
125             job.setTypeAndData(nextCommand.getCommand().getType(), nextCommand.getCommand().getData());
126         }
127
128         return job;
129     }
130
131     private boolean isMsoNewApiActive() {
132         return featureManager.isActive(Features.FLAG_ASYNC_INSTANTIATION);
133     }
134
135     private void tryMutingJobFromException(Exception e) {
136         // If there's JobException in the stack, read job uuid from
137         // the exception, and mute it in DB.
138         final int indexOfJobException =
139                 ExceptionUtils.indexOfThrowable(e, JobException.class);
140
141         if (indexOfJobException >= 0) {
142             try {
143                 final JobException jobException = (JobException) ExceptionUtils.getThrowableList(e).get(indexOfJobException);
144                 LOGGER.info(EELFLoggerDelegate.debugLogger, "muting job: {} ({})", jobException.getJobUuid(), jobException.toString());
145                 final boolean success = jobsBrokerService.mute(jobException.getJobUuid());
146                 if (!success) {
147                     LOGGER.error(EELFLoggerDelegate.errorLogger, "failed to mute job {}", jobException.getJobUuid());
148                 }
149             } catch (Exception e1) {
150                 LOGGER.error(EELFLoggerDelegate.errorLogger, "failed to mute job: {}", e1, e1);
151             }
152         }
153     }
154
155     //used by quartz to inject JobsBrokerService into the job
156     //see JobSchedulerInitializer
157     public void setJobsBrokerService(JobsBrokerService jobsBrokerService) {
158         this.jobsBrokerService = jobsBrokerService;
159     }
160
161     public void setFeatureManager(FeatureManager featureManager) {
162         this.featureManager = featureManager;
163     }
164
165     public void setJobCommandFactory(JobCommandFactory jobCommandFactory) {
166         this.jobCommandFactory = jobCommandFactory;
167     }
168
169     public void setTopic(Job.JobStatus topic) {
170         this.topic = topic;
171     }
172 }