51803891a8f2ce636ae8e7073cdbf5d06e2ff756
[vid.git] / vid-app-common / src / main / java / org / onap / vid / job / impl / JobsBrokerServiceInDatabaseImpl.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * VID
4  * ================================================================================
5  * Copyright (C) 2017 - 2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.vid.job.impl;
22
23 import static org.onap.vid.job.Job.JobStatus.CREATING;
24 import static org.onap.vid.job.Job.JobStatus.FINAL_STATUS;
25 import static org.onap.vid.job.Job.JobStatus.IN_PROGRESS;
26 import static org.onap.vid.job.Job.JobStatus.PENDING_RESOURCE;
27 import static org.onap.vid.job.Job.JobStatus.RESOURCE_IN_PROGRESS;
28
29 import java.sql.Timestamp;
30 import java.time.LocalDateTime;
31 import java.util.Collection;
32 import java.util.Date;
33 import java.util.List;
34 import java.util.Optional;
35 import java.util.UUID;
36 import java.util.stream.Collectors;
37 import javax.annotation.PostConstruct;
38 import org.apache.commons.lang3.StringUtils;
39 import org.hibernate.SessionFactory;
40 import org.jetbrains.annotations.NotNull;
41 import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate;
42 import org.onap.portalsdk.core.service.DataAccessService;
43 import org.onap.portalsdk.core.util.SystemProperties;
44 import org.onap.vid.exceptions.GenericUncheckedException;
45 import org.onap.vid.exceptions.OperationNotAllowedException;
46 import org.onap.vid.job.Job;
47 import org.onap.vid.job.JobsBrokerService;
48 import org.onap.vid.properties.VidProperties;
49 import org.onap.vid.services.VersionService;
50 import org.onap.vid.utils.DaoUtils;
51 import org.springframework.beans.factory.annotation.Autowired;
52 import org.springframework.beans.factory.annotation.Value;
53 import org.springframework.stereotype.Service;
54
55 @Service
56 public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
57
58     static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(JobsBrokerServiceInDatabaseImpl.class);
59
60     private final DataAccessService dataAccessService;
61
62     private final SessionFactory sessionFactory;
63     private int maxOpenedInstantiationRequestsToMso;
64     private int pollingIntervalSeconds;
65
66     private final VersionService versionService;
67
68     @Autowired
69     public JobsBrokerServiceInDatabaseImpl(DataAccessService dataAccessService,
70                                            SessionFactory sessionFactory,
71                                            @Value("0") int maxOpenedInstantiationRequestsToMso,
72                                            @Value("10") int pollingIntervalSeconds,
73                                            VersionService versionService) {
74         // tha @Value will inject conservative defaults; overridden in @PostConstruct from configuration
75         this.dataAccessService = dataAccessService;
76         this.sessionFactory = sessionFactory;
77         this.maxOpenedInstantiationRequestsToMso = maxOpenedInstantiationRequestsToMso;
78         this.pollingIntervalSeconds = pollingIntervalSeconds;
79         this.versionService = versionService;
80     }
81
82     @PostConstruct
83     public void configure() {
84         maxOpenedInstantiationRequestsToMso = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_MAX_OPENED_INSTANTIATION_REQUESTS));
85         pollingIntervalSeconds = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_ASYNC_POLLING_INTERVAL_SECONDS));
86     }
87
88     public void deleteAll() {
89         dataAccessService.deleteDomainObjects(JobDaoImpl.class, "1=1", null);
90     }
91
92     @Override
93     public UUID add(Job job) {
94         final JobDaoImpl jobDao = castToJobDaoImpl(job);
95         jobDao.setBuild(versionService.retrieveBuildNumber());
96         dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
97         return job.getUuid();
98     }
99
100     @Override
101     public Optional<Job> pull(Job.JobStatus topic, String ownerId) {
102         JobDaoImpl daoJob;
103         int updatedEntities;
104         do {
105             String query = sqlQueryForTopic(topic);
106             List<JobDaoImpl> jobs = dataAccessService.executeSQLQuery(query, JobDaoImpl.class, null);
107             if (jobs.isEmpty()) {
108                 return Optional.empty();
109             }
110
111             daoJob = jobs.get(0);
112
113             final UUID uuid = daoJob.getUuid();
114             final Integer age = daoJob.getAge();
115
116             daoJob.setTakenBy(ownerId);
117
118             // It might become that daoJob was taken and pushed-back already, before we
119             // arrived here, so we're verifying the age was not pushed forward.
120             // Age is actually forwarded upon pushBack().
121             String hqlUpdate = "update JobDaoImpl job set job.takenBy = :takenBy where " +
122                     " job.id = :id" +
123                     " and job.age = :age" +
124                     " and takenBy is null";
125             updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
126                     session.createQuery(hqlUpdate)
127                             .setText("id", uuid.toString())
128                             .setInteger("age", age)
129                             .setText("takenBy", ownerId)
130                             .executeUpdate());
131
132         } while (updatedEntities == 0);
133
134         return Optional.ofNullable(daoJob);
135     }
136
137     private java.sql.Timestamp nowMinusInterval() {
138         return nowMinusInterval(pollingIntervalSeconds);
139     }
140
141     private java.sql.Timestamp nowMinusInterval(long seconds) {
142         return Timestamp.valueOf(LocalDateTime.now().minusSeconds(seconds));
143     }
144
145     private String selectQueryByJobStatus(Job.JobStatus topic){
146
147         String intervalCondition = (topic==CREATING) ? "" : (" and MODIFIED_DATE <= '" + nowMinusInterval()+"'");
148         return "" +
149                 "select * from VID_JOB" +
150                 " where" +
151                 // select only non-deleted in-progress jobs
152                 filterByStatusNotTakenNotDeleted(topic) +
153                 // give some breath, don't select jos that were recently reached
154                 intervalCondition +
155                 // take the oldest handled one
156                 " order by MODIFIED_DATE ASC" +
157                 // select only one result
158                 " limit 1";
159     }
160
161     @NotNull
162     private String filterByStatusNotTakenNotDeleted(Job.JobStatus topic) {
163         return  "    JOB_STATUS = '" + topic + "'" +
164                 "    and TAKEN_BY is null" +
165                 "    and DELETED_AT is null "+
166                 "    and BUILD = '"+ versionService.retrieveBuildNumber() +"'";
167     }
168
169     private String sqlQueryForTopic(Job.JobStatus topic) {
170         switch (topic) {
171             case IN_PROGRESS:
172             case RESOURCE_IN_PROGRESS:
173             case CREATING:
174                 return selectQueryByJobStatus(topic);
175             case PENDING:
176                 return selectQueryForPendingJob();
177             case PENDING_RESOURCE:
178                 return selectQueryForPendingResource();
179             default:
180                 throw new GenericUncheckedException("Unsupported topic to pull from: " + topic);
181         }
182     }
183
184     @NotNull
185     private String selectQueryForPendingJob() {
186         return "" +
187                 // select only pending jobs
188                 "select vid_job.* from VID_JOB " +
189                 // select users have in_progress jobs
190                 "left join \n" +
191                 " (select user_Id, 1 as has_any_in_progress_job from VID_JOB  where JOB_STATUS = 'IN_PROGRESS' or TAKEN_BY IS NOT NULL \n" +
192                 "group by user_id)  users_have_any_in_progress_job_tbl\n" +
193                 "on vid_job.user_id = users_have_any_in_progress_job_tbl.user_id " +
194                 "where "+filterByStatusNotTakenNotDeleted(Job.JobStatus.PENDING)+" and (\n" +
195                 // limit in-progress to some amount
196                 "select sum(CASE WHEN JOB_STATUS='IN_PROGRESS' or (JOB_STATUS='PENDING' and TAKEN_BY IS NOT NULL) THEN 1 ELSE 0 END) as in_progress\n" +
197                 "from VID_JOB ) <" + maxOpenedInstantiationRequestsToMso + " \n " +
198                 // don't take jobs from templates that already in-progress/failed
199                 "and TEMPLATE_Id not in \n" +
200                 "(select TEMPLATE_Id from vid_job where" +
201                 "   TEMPLATE_Id IS NOT NULL and("+
202                 "   (JOB_STATUS='FAILED' and DELETED_AT is null)" + // failed but not deleted
203                 "   or JOB_STATUS='IN_PROGRESS'" +
204                 "   or TAKEN_BY IS NOT NULL))" + " \n " +
205                 // prefer older jobs, but the earlier in each bulk
206                 "order by has_any_in_progress_job, CREATED_DATE, INDEX_IN_BULK " +
207                 // select only one result
208                 "limit 1";
209     }
210
211     @NotNull
212     private String selectQueryForPendingResource() {
213         return "select * from vid_job as JOB left join \n" +
214                 //count jobs
215                 "(select template_id,count(*) as in_progress_count from vid_job \n" +
216                 String.format("where (\n"+
217                 "    (\n"+
218                 //with job_status IN_PROGRESS or RESOURCE_IN_PROGRESS
219                 "        (job_status in ('%s','%s') and DELETED_AT is NULL) \n",IN_PROGRESS, RESOURCE_IN_PROGRESS)+
220                 //or that with job_status PENDING_RESOURCE that are taken
221                 String.format("        or (JOB_STATUS='%s' and TAKEN_BY IS NOT NULL)\n    )\n", PENDING_RESOURCE) +
222                 //with template ID and are not deleted
223                 "    and TEMPLATE_ID IS NOT NULL and DELETED_AT is NULL\n)\n" +
224                 //join them to vid_job by template_id
225                 "group by template_id)\n"+
226                 "as COUNTER on COUNTER.template_id=JOB.template_id \n" +
227
228                 "where (\n"+
229                 //select jobs with job_status PENDING_RESOURCE that are nit taken and not deleted
230                 filterByStatusNotTakenNotDeleted(PENDING_RESOURCE) + "\n" +
231                 //that have no count in the counter (no other in progress job with same templateId)
232                 "    and in_progress_count is NULL \n" +
233                 //and that have valid templateId
234                 "    and JOB.template_id is not NULL \n"+
235                 ")\n" +
236                 //INDEX_IN_BULK is for order them inside same templateId,
237                 //template_id - for order between different templateId (just to be deterministic)
238                 "order by INDEX_IN_BULK,JOB.template_id \n" +
239                 "limit 1;";
240     }
241
242
243     @Override
244     public void pushBack(Job job) {
245         final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, job.getUuid(), null);
246
247         if (remoteDaoJob == null) {
248             throw new IllegalStateException("Can push back only pulled jobs. Add new jobs using add()");
249         }
250
251         if (remoteDaoJob.getTakenBy() == null) {
252             throw new IllegalStateException("Can push back only pulled jobs. This one already pushed back.");
253         }
254
255         final JobDaoImpl jobDao = castToJobDaoImpl(job);
256
257         jobDao.setTakenBy(null);
258
259         Integer age = jobDao.getAge();
260         jobDao.setAge(age + 1);
261
262         logger.debug(EELFLoggerDelegate.debugLogger, "pushing back jobDao {} of {}: {}/{}",
263             StringUtils.substring(String.valueOf(jobDao.getUuid()), 0, 8),
264             StringUtils.substring(String.valueOf(jobDao.getTemplateId()), 0, 8),
265             jobDao.getStatus(), jobDao.getType());
266
267         dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
268     }
269
270     private JobDaoImpl castToJobDaoImpl(Job job) {
271         if (!(job instanceof JobDaoImpl)) {
272             throw new UnsupportedOperationException("Can't add " + job.getClass() + " to " + this.getClass());
273         }
274         return (JobDaoImpl) job;
275     }
276
277     @Override
278     public Collection<Job> peek() {
279         return dataAccessService.getList(JobDaoImpl.class, null);
280     }
281
282     @Override
283     public Job peek(UUID jobId) {
284         return (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null);
285     }
286
287     @Override
288     public void delete(UUID jobId) {
289         int updatedEntities;
290         Date now = new Date();
291
292         String hqlUpdate = "update JobDaoImpl job set job.deletedAt = :now where " +
293                 " job.id = :id" +
294                 " and job.status in(:pending, :stopped)" +
295                 " and takenBy is null";
296
297         updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
298                 session.createQuery(hqlUpdate)
299                         .setTimestamp("now", now)
300                         .setText("id", jobId.toString())
301                         .setText("pending", Job.JobStatus.PENDING.toString())
302                         .setText("stopped", Job.JobStatus.STOPPED.toString())
303                         .executeUpdate());
304
305         if (updatedEntities == 0) {
306             final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null);
307
308             if (remoteDaoJob == null || remoteDaoJob.getUuid() == null) {
309                 logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service does not exist", jobId);
310                 throw new OperationNotAllowedException("Service does not exist");
311             }
312
313             if ((remoteDaoJob.getStatus() != Job.JobStatus.PENDING) && (remoteDaoJob.getStatus() != Job.JobStatus.STOPPED) || !StringUtils.isEmpty(remoteDaoJob.getTakenBy()) ) {
314                 logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service status does not allow deletion from the queue, status = {}", jobId, remoteDaoJob.getStatus() +
315                 ", takenBy " + remoteDaoJob.getTakenBy());
316                 throw new OperationNotAllowedException("Service status does not allow deletion from the queue");
317             }
318
319             throw new OperationNotAllowedException("Service deletion failed");
320         }
321     }
322
323     @Override
324     public boolean mute(UUID jobId) {
325         if (jobId == null) {
326             return false;
327         }
328
329         final String prefix = "DUMP";
330         int updatedEntities;
331
332         // Changing the topic (i.e. `job.status`) makes the job non-fetchable.
333         String hqlUpdate = "" +
334                 "update JobDaoImpl job set" +
335                 "   job.status = concat('" + prefix + "_', job.status)," +
336                 //  empty `takenBy`, because some logics treat taken as in-progress
337                 "   takenBy = null" +
338                 " where " +
339                 "   job.id = :id" +
340                 //  if prefix already on the topic -- no need to do it twice.
341                 "   and job.status NOT LIKE '" + prefix + "\\_%'";
342
343         updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
344                 session.createQuery(hqlUpdate)
345                         .setText("id", jobId.toString())
346                         .executeUpdate());
347
348         return updatedEntities != 0;
349     }
350
351     private static String sqlListOfFinalStatus =
352             String.format("(%s)",
353                 FINAL_STATUS.stream().
354                 map(x->String.format("'%s'",x)).
355                 collect(Collectors.joining(","))
356             );
357
358     @Override
359     public void deleteOldFinalJobs(long secondsAgo) {
360         String select = String.format(" MODIFIED_DATE <= '%s' and JOB_STATUS in %s", nowMinusInterval(secondsAgo), sqlListOfFinalStatus);
361         dataAccessService.deleteDomainObjects(JobDaoImpl.class, select, null);
362     }
363 }