Pause Upon VF Module Failure
[vid.git] / vid-app-common / src / main / java / org / onap / vid / job / impl / JobsBrokerServiceInDatabaseImpl.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * VID
4  * ================================================================================
5  * Copyright (C) 2017 - 2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.vid.job.impl;
22
23 import static org.onap.vid.job.Job.JobStatus.CREATING;
24 import static org.onap.vid.job.Job.JobStatus.FINAL_STATUS;
25 import static org.onap.vid.job.Job.JobStatus.IN_PROGRESS;
26 import static org.onap.vid.job.Job.JobStatus.PENDING_RESOURCE;
27 import static org.onap.vid.job.Job.JobStatus.RESOURCE_IN_PROGRESS;
28
29 import java.sql.Timestamp;
30 import java.time.LocalDateTime;
31 import java.util.Collection;
32 import java.util.Date;
33 import java.util.List;
34 import java.util.Optional;
35 import java.util.UUID;
36 import java.util.stream.Collectors;
37 import javax.annotation.PostConstruct;
38 import org.apache.commons.lang3.StringUtils;
39 import org.hibernate.SessionFactory;
40 import org.jetbrains.annotations.NotNull;
41 import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate;
42 import org.onap.portalsdk.core.service.DataAccessService;
43 import org.onap.portalsdk.core.util.SystemProperties;
44 import org.onap.vid.exceptions.GenericUncheckedException;
45 import org.onap.vid.exceptions.OperationNotAllowedException;
46 import org.onap.vid.job.Job;
47 import org.onap.vid.job.JobsBrokerService;
48 import org.onap.vid.properties.Features;
49 import org.onap.vid.properties.VidProperties;
50 import org.onap.vid.services.VersionService;
51 import org.onap.vid.utils.DaoUtils;
52 import org.springframework.beans.factory.annotation.Autowired;
53 import org.springframework.beans.factory.annotation.Value;
54 import org.springframework.stereotype.Service;
55 import org.togglz.core.manager.FeatureManager;
56
57 @Service
58 public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
59
60     static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(JobsBrokerServiceInDatabaseImpl.class);
61
62     private final DataAccessService dataAccessService;
63
64     private final SessionFactory sessionFactory;
65     private int maxOpenedInstantiationRequestsToMso;
66     private int pollingIntervalSeconds;
67
68     private final VersionService versionService;
69     private final FeatureManager featureManager;
70     @Autowired
71     public JobsBrokerServiceInDatabaseImpl(DataAccessService dataAccessService,
72                                            SessionFactory sessionFactory,
73                                            @Value("0") int maxOpenedInstantiationRequestsToMso,
74                                            @Value("10") int pollingIntervalSeconds,
75                                            VersionService versionService,
76                                            FeatureManager featureManager) {
77         // tha @Value will inject conservative defaults; overridden in @PostConstruct from configuration
78         this.dataAccessService = dataAccessService;
79         this.sessionFactory = sessionFactory;
80         this.maxOpenedInstantiationRequestsToMso = maxOpenedInstantiationRequestsToMso;
81         this.pollingIntervalSeconds = pollingIntervalSeconds;
82         this.versionService = versionService;
83         this.featureManager = featureManager;
84     }
85
86     @PostConstruct
87     public void configure() {
88         maxOpenedInstantiationRequestsToMso = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_MAX_OPENED_INSTANTIATION_REQUESTS));
89         pollingIntervalSeconds = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_ASYNC_POLLING_INTERVAL_SECONDS));
90     }
91
92     public void deleteAll() {
93         dataAccessService.deleteDomainObjects(JobDaoImpl.class, "1=1", null);
94     }
95
96     @Override
97     public UUID add(Job job) {
98         final JobDaoImpl jobDao = castToJobDaoImpl(job);
99         jobDao.setBuild(versionService.retrieveBuildNumber());
100         dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
101         return job.getUuid();
102     }
103
104     @Override
105     public Optional<Job> pull(Job.JobStatus topic, String ownerId) {
106         JobDaoImpl daoJob;
107         int updatedEntities;
108         do {
109             String query = sqlQueryForTopic(topic);
110             List<JobDaoImpl> jobs = dataAccessService.executeSQLQuery(query, JobDaoImpl.class, null);
111             if (jobs.isEmpty()) {
112                 return Optional.empty();
113             }
114
115             daoJob = jobs.get(0);
116
117             final UUID uuid = daoJob.getUuid();
118             final Integer age = daoJob.getAge();
119
120             daoJob.setTakenBy(ownerId);
121
122             // It might become that daoJob was taken and pushed-back already, before we
123             // arrived here, so we're verifying the age was not pushed forward.
124             // Age is actually forwarded upon pushBack().
125             String hqlUpdate = "update JobDaoImpl job set job.takenBy = :takenBy where " +
126                     " job.id = :id" +
127                     " and job.age = :age" +
128                     " and takenBy is null";
129             updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
130                     session.createQuery(hqlUpdate)
131                             .setText("id", uuid.toString())
132                             .setInteger("age", age)
133                             .setText("takenBy", ownerId)
134                             .executeUpdate());
135
136         } while (updatedEntities == 0);
137
138         return Optional.ofNullable(daoJob);
139     }
140
141     private java.sql.Timestamp nowMinusInterval() {
142         return nowMinusInterval(pollingIntervalSeconds);
143     }
144
145     private java.sql.Timestamp nowMinusInterval(long seconds) {
146         return Timestamp.valueOf(LocalDateTime.now().minusSeconds(seconds));
147     }
148
149     private String selectQueryByJobStatus(Job.JobStatus topic){
150
151         String intervalCondition = (topic==CREATING) ? "" : (" and MODIFIED_DATE <= '" + nowMinusInterval()+"'");
152         return "" +
153                 "select * from VID_JOB" +
154                 " where" +
155                 // select only non-deleted in-progress jobs
156                 filterByStatusNotTakenNotDeleted(topic) +
157                 // give some breath, don't select jos that were recently reached
158                 intervalCondition +
159                 // take the oldest handled one
160                 " order by MODIFIED_DATE ASC" +
161                 // select only one result
162                 " limit 1";
163     }
164
165     @NotNull
166     private String filterByStatusNotTakenNotDeleted(Job.JobStatus topic) {
167         return  "    JOB_STATUS = '" + topic + "'" +
168                 "    and TAKEN_BY is null" +
169                 "    and DELETED_AT is null "+
170                 "    and BUILD = '"+ versionService.retrieveBuildNumber() +"'";
171     }
172
173     private String sqlQueryForTopic(Job.JobStatus topic) {
174         switch (topic) {
175             case IN_PROGRESS:
176             case RESOURCE_IN_PROGRESS:
177             case CREATING:
178                 return selectQueryByJobStatus(topic);
179             case PENDING:
180                 return selectQueryForPendingJob();
181             case PENDING_RESOURCE:
182                 return selectQueryForPendingResource();
183             default:
184                 throw new GenericUncheckedException("Unsupported topic to pull from: " + topic);
185         }
186     }
187
188     @NotNull
189     private String selectQueryForPendingJob() {
190         return "" +
191                 // select only pending jobs
192                 "select vid_job.* from VID_JOB " +
193                 // select users have in_progress jobs
194                 "left join \n" +
195                 " (select user_Id, 1 as has_any_in_progress_job from VID_JOB  where JOB_STATUS = 'IN_PROGRESS' or TAKEN_BY IS NOT NULL \n" +
196                 "group by user_id)  users_have_any_in_progress_job_tbl\n" +
197                 "on vid_job.user_id = users_have_any_in_progress_job_tbl.user_id " +
198                 "where "+filterByStatusNotTakenNotDeleted(Job.JobStatus.PENDING)+" and (\n" +
199                 // limit in-progress to some amount
200                 "select sum(CASE WHEN JOB_STATUS='IN_PROGRESS' or (JOB_STATUS='PENDING' and TAKEN_BY IS NOT NULL) THEN 1 ELSE 0 END) as in_progress\n" +
201                 "from VID_JOB ) <" + maxOpenedInstantiationRequestsToMso + " \n " +
202                 // don't take jobs from templates that already in-progress/failed
203                 "and TEMPLATE_Id not in \n" +
204                 "(select TEMPLATE_Id from vid_job where" +
205                 "   TEMPLATE_Id IS NOT NULL and("+
206                 "   (JOB_STATUS IN('FAILED','FAILED_AND_PAUSED') and DELETED_AT is null)" + // failed but not deleted
207                 "   or JOB_STATUS='IN_PROGRESS'" +
208                 "   or TAKEN_BY IS NOT NULL))" + " \n " +
209                 // prefer older jobs, but the earlier in each bulk
210                 "order by has_any_in_progress_job, CREATED_DATE, INDEX_IN_BULK " +
211                 // select only one result
212                 "limit 1";
213     }
214
215     @NotNull
216     private String selectQueryForPendingResource() {
217         return "select * from vid_job as JOB left join \n" +
218                 //count jobs
219                 "(select template_id,count(*) as in_progress_count from vid_job \n" +
220                 String.format("where (\n"+
221                 "    (\n"+
222                 //with job_status IN_PROGRESS or RESOURCE_IN_PROGRESS
223                 "        (job_status in ('%s','%s') and DELETED_AT is NULL) \n",IN_PROGRESS, RESOURCE_IN_PROGRESS)+
224                 //or that with job_status PENDING_RESOURCE that are taken
225                 String.format("        or (JOB_STATUS='%s' and TAKEN_BY IS NOT NULL)\n    )\n", PENDING_RESOURCE) +
226                 //with template ID and are not deleted
227                 "    and TEMPLATE_ID IS NOT NULL and DELETED_AT is NULL\n)\n" +
228                 //join them to vid_job by template_id
229                 "group by template_id)\n"+
230                 "as COUNTER on COUNTER.template_id=JOB.template_id \n" +
231
232                 "where (\n"+
233                 //select jobs with job_status PENDING_RESOURCE that are nit taken and not deleted
234                 filterByStatusNotTakenNotDeleted(PENDING_RESOURCE) + "\n" +
235                 //that have no count in the counter (no other in progress job with same templateId)
236                 "    and in_progress_count is NULL \n" +
237                 //and that have valid templateId
238                 "    and JOB.template_id is not NULL \n"+
239
240                 filterFailedStatusForPendingResource()
241
242                 + ")" +
243                 //INDEX_IN_BULK is for order them inside same templateId,
244                 //template_id - for order between different templateId (just to be deterministic)
245                 "order by INDEX_IN_BULK,JOB.template_id \n" +
246                 "limit 1;";
247     }
248     private String filterFailedStatusForPendingResource() {
249         String sql = "and JOB.template_id not in \n" +
250                 "(select TEMPLATE_Id from vid_job where" +
251                 "   TEMPLATE_Id IS NOT NULL and (JOB_STATUS IN('FAILED','FAILED_AND_PAUSED') "
252                 + " AND JOB_TYPE NOT IN('NetworkInstantiation','InstanceGroup','InstanceGroupMember') and DELETED_AT is null)" + // failed but not deleted
253                 "   or TAKEN_BY IS NOT NULL)";
254         return featureManager.isActive(Features.FLAG_2008_PAUSE_VFMODULE_INSTANTIATION_FAILURE) ?
255                 sql : "";
256     }
257
258     @Override
259     public void pushBack(Job job) {
260         final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, job.getUuid(), null);
261
262         if (remoteDaoJob == null) {
263             throw new IllegalStateException("Can push back only pulled jobs. Add new jobs using add()");
264         }
265
266         if (remoteDaoJob.getTakenBy() == null) {
267             throw new IllegalStateException("Can push back only pulled jobs. This one already pushed back.");
268         }
269
270         final JobDaoImpl jobDao = castToJobDaoImpl(job);
271
272         jobDao.setTakenBy(null);
273
274         Integer age = jobDao.getAge();
275         jobDao.setAge(age + 1);
276
277         logger.debug(EELFLoggerDelegate.debugLogger, "pushing back jobDao {} of {}: {}/{}",
278             StringUtils.substring(String.valueOf(jobDao.getUuid()), 0, 8),
279             StringUtils.substring(String.valueOf(jobDao.getTemplateId()), 0, 8),
280             jobDao.getStatus(), jobDao.getType());
281
282         dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
283     }
284
285     private JobDaoImpl castToJobDaoImpl(Job job) {
286         if (!(job instanceof JobDaoImpl)) {
287             throw new UnsupportedOperationException("Can't add " + job.getClass() + " to " + this.getClass());
288         }
289         return (JobDaoImpl) job;
290     }
291
292     @Override
293     public Collection<Job> peek() {
294         return dataAccessService.getList(JobDaoImpl.class, null);
295     }
296
297     @Override
298     public Job peek(UUID jobId) {
299         return (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null);
300     }
301
302     @Override
303     public void delete(UUID jobId) {
304         int updatedEntities;
305         Date now = new Date();
306
307         String hqlUpdate = "update JobDaoImpl job set job.deletedAt = :now where " +
308                 " job.id = :id" +
309                 " and job.status in(:pending, :stopped)" +
310                 " and takenBy is null";
311
312         updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
313                 session.createQuery(hqlUpdate)
314                         .setTimestamp("now", now)
315                         .setText("id", jobId.toString())
316                         .setText("pending", Job.JobStatus.PENDING.toString())
317                         .setText("stopped", Job.JobStatus.STOPPED.toString())
318                         .executeUpdate());
319
320         if (updatedEntities == 0) {
321             final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null);
322
323             if (remoteDaoJob == null || remoteDaoJob.getUuid() == null) {
324                 logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service does not exist", jobId);
325                 throw new OperationNotAllowedException("Service does not exist");
326             }
327
328             if ((remoteDaoJob.getStatus() != Job.JobStatus.PENDING) && (remoteDaoJob.getStatus() != Job.JobStatus.STOPPED) || !StringUtils.isEmpty(remoteDaoJob.getTakenBy()) ) {
329                 logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service status does not allow deletion from the queue, status = {}", jobId, remoteDaoJob.getStatus() +
330                 ", takenBy " + remoteDaoJob.getTakenBy());
331                 throw new OperationNotAllowedException("Service status does not allow deletion from the queue");
332             }
333
334             throw new OperationNotAllowedException("Service deletion failed");
335         }
336     }
337
338     @Override
339     public boolean mute(UUID jobId) {
340         if (jobId == null) {
341             return false;
342         }
343
344         final String prefix = "DUMP";
345         int updatedEntities;
346
347         // Changing the topic (i.e. `job.status`) makes the job non-fetchable.
348         String hqlUpdate = "" +
349                 "update JobDaoImpl job set" +
350                 "   job.status = concat('" + prefix + "_', job.status)," +
351                 //  empty `takenBy`, because some logics treat taken as in-progress
352                 "   takenBy = null" +
353                 " where " +
354                 "   job.id = :id" +
355                 //  if prefix already on the topic -- no need to do it twice.
356                 "   and job.status NOT LIKE '" + prefix + "\\_%'";
357
358         updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
359                 session.createQuery(hqlUpdate)
360                         .setText("id", jobId.toString())
361                         .executeUpdate());
362
363         return updatedEntities != 0;
364     }
365
366     private static String sqlListOfFinalStatus =
367             String.format("(%s)",
368                 FINAL_STATUS.stream().
369                 map(x->String.format("'%s'",x)).
370                 collect(Collectors.joining(","))
371             );
372
373     @Override
374     public void deleteOldFinalJobs(long secondsAgo) {
375         String select = String.format(" MODIFIED_DATE <= '%s' and JOB_STATUS in %s", nowMinusInterval(secondsAgo), sqlListOfFinalStatus);
376         dataAccessService.deleteDomainObjects(JobDaoImpl.class, select, null);
377     }
378 }