Implant vid-app-common org.onap.vid.job (main and test)
[vid.git] / vid-app-common / src / main / java / org / onap / vid / job / impl / JobsBrokerServiceInDatabaseImpl.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * VID
4  * ================================================================================
5  * Copyright (C) 2017 - 2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.vid.job.impl;
22
23 import org.apache.commons.lang3.StringUtils;
24 import org.hibernate.SessionFactory;
25 import org.jetbrains.annotations.NotNull;
26 import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate;
27 import org.onap.portalsdk.core.service.DataAccessService;
28 import org.onap.portalsdk.core.util.SystemProperties;
29 import org.onap.vid.exceptions.GenericUncheckedException;
30 import org.onap.vid.exceptions.OperationNotAllowedException;
31 import org.onap.vid.job.Job;
32 import org.onap.vid.job.JobsBrokerService;
33 import org.onap.vid.properties.VidProperties;
34 import org.onap.vid.services.VersionService;
35 import org.onap.vid.utils.DaoUtils;
36 import org.springframework.beans.factory.annotation.Autowired;
37 import org.springframework.beans.factory.annotation.Value;
38 import org.springframework.stereotype.Service;
39
40 import javax.annotation.PostConstruct;
41 import java.sql.Timestamp;
42 import java.time.LocalDateTime;
43 import java.util.*;
44 import java.util.stream.Collectors;
45
46 import static org.onap.vid.job.Job.JobStatus.*;
47
48 @Service
49 public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
50
51     static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(JobsBrokerServiceInDatabaseImpl.class);
52
53     private final DataAccessService dataAccessService;
54
55     private final SessionFactory sessionFactory;
56     private int maxOpenedInstantiationRequestsToMso;
57     private int pollingIntervalSeconds;
58
59     private final VersionService versionService;
60
61     @Autowired
62     public JobsBrokerServiceInDatabaseImpl(DataAccessService dataAccessService,
63                                            SessionFactory sessionFactory,
64                                            @Value("0") int maxOpenedInstantiationRequestsToMso,
65                                            @Value("10") int pollingIntervalSeconds,
66                                            VersionService versionService) {
67         // tha @Value will inject conservative defaults; overridden in @PostConstruct from configuration
68         this.dataAccessService = dataAccessService;
69         this.sessionFactory = sessionFactory;
70         this.maxOpenedInstantiationRequestsToMso = maxOpenedInstantiationRequestsToMso;
71         this.pollingIntervalSeconds = pollingIntervalSeconds;
72         this.versionService = versionService;
73     }
74
75     @PostConstruct
76     public void configure() {
77         maxOpenedInstantiationRequestsToMso = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_MAX_OPENED_INSTANTIATION_REQUESTS));
78         pollingIntervalSeconds = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_ASYNC_POLLING_INTERVAL_SECONDS));
79     }
80
81     public void deleteAll() {
82         dataAccessService.deleteDomainObjects(JobDaoImpl.class, "1=1", null);
83     }
84
85     @Override
86     public UUID add(Job job) {
87         final JobDaoImpl jobDao = castToJobDaoImpl(job);
88         jobDao.setBuild(versionService.retrieveBuildNumber());
89         dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
90         return job.getUuid();
91     }
92
93     @Override
94     public Optional<Job> pull(Job.JobStatus topic, String ownerId) {
95         JobDaoImpl daoJob;
96         int updatedEntities;
97         do {
98             String query = sqlQueryForTopic(topic);
99             List<JobDaoImpl> jobs = dataAccessService.executeSQLQuery(query, JobDaoImpl.class, null);
100             if (jobs.isEmpty()) {
101                 return Optional.empty();
102             }
103
104             daoJob = jobs.get(0);
105
106             final UUID uuid = daoJob.getUuid();
107             final Integer age = daoJob.getAge();
108
109             daoJob.setTakenBy(ownerId);
110
111             // It might become that daoJob was taken and pushed-back already, before we
112             // arrived here, so we're verifying the age was not pushed forward.
113             // Age is actually forwarded upon pushBack().
114             String hqlUpdate = "update JobDaoImpl job set job.takenBy = :takenBy where " +
115                     " job.id = :id" +
116                     " and job.age = :age" +
117                     " and takenBy is null";
118             updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
119                     session.createQuery(hqlUpdate)
120                             .setText("id", uuid.toString())
121                             .setInteger("age", age)
122                             .setText("takenBy", ownerId)
123                             .executeUpdate());
124
125         } while (updatedEntities == 0);
126
127         return Optional.ofNullable(daoJob);
128     }
129
130     private java.sql.Timestamp nowMinusInterval() {
131         return nowMinusInterval(pollingIntervalSeconds);
132     }
133
134     private java.sql.Timestamp nowMinusInterval(long seconds) {
135         return Timestamp.valueOf(LocalDateTime.now().minusSeconds(seconds));
136     }
137
138     private String selectQueryByJobStatus(Job.JobStatus topic){
139
140         String intervalCondition = (topic==CREATING) ? "" : (" and MODIFIED_DATE <= '" + nowMinusInterval()+"'");
141         return "" +
142                 "select * from VID_JOB" +
143                 " where" +
144                 // select only non-deleted in-progress jobs
145                 filterByStatusNotTakenNotDeleted(topic) +
146                 // give some breath, don't select jos that were recently reached
147                 intervalCondition +
148                 // take the oldest handled one
149                 " order by MODIFIED_DATE ASC" +
150                 // select only one result
151                 " limit 1";
152     }
153
154     @NotNull
155     private String filterByStatusNotTakenNotDeleted(Job.JobStatus topic) {
156         return  "    JOB_STATUS = '" + topic + "'" +
157                 "    and TAKEN_BY is null" +
158                 "    and DELETED_AT is null "+
159                 "    and BUILD = '"+ versionService.retrieveBuildNumber() +"'";
160     }
161
162     private String sqlQueryForTopic(Job.JobStatus topic) {
163         switch (topic) {
164             case IN_PROGRESS:
165             case RESOURCE_IN_PROGRESS:
166             case CREATING:
167                 return selectQueryByJobStatus(topic);
168             case PENDING:
169                 return selectQueryForPendingJob();
170             case PENDING_RESOURCE:
171                 return selectQueryForPendingResource();
172             default:
173                 throw new GenericUncheckedException("Unsupported topic to pull from: " + topic);
174         }
175     }
176
177     @NotNull
178     private String selectQueryForPendingJob() {
179         return "" +
180                 // select only pending jobs
181                 "select vid_job.* from VID_JOB " +
182                 // select users have in_progress jobs
183                 "left join \n" +
184                 " (select user_Id, 1 as has_any_in_progress_job from VID_JOB  where JOB_STATUS = 'IN_PROGRESS' or TAKEN_BY IS NOT NULL \n" +
185                 "group by user_id)  users_have_any_in_progress_job_tbl\n" +
186                 "on vid_job.user_id = users_have_any_in_progress_job_tbl.user_id " +
187                 "where "+filterByStatusNotTakenNotDeleted(Job.JobStatus.PENDING)+" and (\n" +
188                 // limit in-progress to some amount
189                 "select sum(CASE WHEN JOB_STATUS='IN_PROGRESS' or (JOB_STATUS='PENDING' and TAKEN_BY IS NOT NULL) THEN 1 ELSE 0 END) as in_progress\n" +
190                 "from VID_JOB ) <" + maxOpenedInstantiationRequestsToMso + " \n " +
191                 // don't take jobs from templates that already in-progress/failed
192                 "and TEMPLATE_Id not in \n" +
193                 "(select TEMPLATE_Id from vid_job where" +
194                 "   TEMPLATE_Id IS NOT NULL and("+
195                 "   (JOB_STATUS='FAILED' and DELETED_AT is null)" + // failed but not deleted
196                 "   or JOB_STATUS='IN_PROGRESS'" +
197                 "   or TAKEN_BY IS NOT NULL))" + " \n " +
198                 // prefer older jobs, but the earlier in each bulk
199                 "order by has_any_in_progress_job, CREATED_DATE, INDEX_IN_BULK " +
200                 // select only one result
201                 "limit 1";
202     }
203
204     @NotNull
205     private String selectQueryForPendingResource() {
206         return "select * from vid_job as JOB left join \n" +
207                 //count jobs
208                 "(select template_id,count(*) as in_progress_count from vid_job \n" +
209                 String.format("where (\n"+
210                 "    (\n"+
211                 //with job_status IN_PROGRESS or RESOURCE_IN_PROGRESS
212                 "        (job_status in ('%s','%s') and DELETED_AT is NULL) \n",IN_PROGRESS, RESOURCE_IN_PROGRESS)+
213                 //or that with job_status PENDING_RESOURCE that are taken
214                 String.format("        or (JOB_STATUS='%s' and TAKEN_BY IS NOT NULL)\n    )\n", PENDING_RESOURCE) +
215                 //with template ID and are not deleted
216                 "    and TEMPLATE_ID IS NOT NULL and DELETED_AT is NULL\n)\n" +
217                 //join them to vid_job by template_id
218                 "group by template_id)\n"+
219                 "as COUNTER on COUNTER.template_id=JOB.template_id \n" +
220
221                 "where (\n"+
222                 //select jobs with job_status PENDING_RESOURCE that are nit taken and not deleted
223                 filterByStatusNotTakenNotDeleted(PENDING_RESOURCE) + "\n" +
224                 //that have no count in the counter (no other in progress job with same templateId)
225                 "    and in_progress_count is NULL \n" +
226                 //and that have valid templateId
227                 "    and JOB.template_id is not NULL \n"+
228                 ")\n" +
229                 //INDEX_IN_BULK is for order them inside same templateId,
230                 //template_id - for order between different templateId (just to be deterministic)
231                 "order by INDEX_IN_BULK,JOB.template_id \n" +
232                 "limit 1;";
233     }
234
235
236     @Override
237     public void pushBack(Job job) {
238         final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, job.getUuid(), null);
239
240         if (remoteDaoJob == null) {
241             throw new IllegalStateException("Can push back only pulled jobs. Add new jobs using add()");
242         }
243
244         if (remoteDaoJob.getTakenBy() == null) {
245             throw new IllegalStateException("Can push back only pulled jobs. This one already pushed back.");
246         }
247
248         final JobDaoImpl jobDao = castToJobDaoImpl(job);
249
250         jobDao.setTakenBy(null);
251
252         Integer age = jobDao.getAge();
253         jobDao.setAge(age + 1);
254
255         logger.debug(EELFLoggerDelegate.debugLogger, "{}/{}", jobDao.getStatus(), jobDao.getType());
256
257         dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
258     }
259
260     private JobDaoImpl castToJobDaoImpl(Job job) {
261         if (!(job instanceof JobDaoImpl)) {
262             throw new UnsupportedOperationException("Can't add " + job.getClass() + " to " + this.getClass());
263         }
264         return (JobDaoImpl) job;
265     }
266
267     @Override
268     public Collection<Job> peek() {
269         return dataAccessService.getList(JobDaoImpl.class, null);
270     }
271
272     @Override
273     public Job peek(UUID jobId) {
274         return (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null);
275     }
276
277     @Override
278     public void delete(UUID jobId) {
279         int updatedEntities;
280         Date now = new Date();
281
282         String hqlUpdate = "update JobDaoImpl job set job.deletedAt = :now where " +
283                 " job.id = :id" +
284                 " and job.status in(:pending, :stopped)" +
285                 " and takenBy is null";
286
287         updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
288                 session.createQuery(hqlUpdate)
289                         .setTimestamp("now", now)
290                         .setText("id", jobId.toString())
291                         .setText("pending", Job.JobStatus.PENDING.toString())
292                         .setText("stopped", Job.JobStatus.STOPPED.toString())
293                         .executeUpdate());
294
295         if (updatedEntities == 0) {
296             final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null);
297
298             if (remoteDaoJob == null || remoteDaoJob.getUuid() == null) {
299                 logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service does not exist", jobId);
300                 throw new OperationNotAllowedException("Service does not exist");
301             }
302
303             if ((remoteDaoJob.getStatus() != Job.JobStatus.PENDING) && (remoteDaoJob.getStatus() != Job.JobStatus.STOPPED) || !StringUtils.isEmpty(remoteDaoJob.getTakenBy()) ) {
304                 logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service status does not allow deletion from the queue, status = {}", jobId, remoteDaoJob.getStatus() +
305                 ", takenBy " + remoteDaoJob.getTakenBy());
306                 throw new OperationNotAllowedException("Service status does not allow deletion from the queue");
307             }
308
309             throw new OperationNotAllowedException("Service deletion failed");
310         }
311     }
312
313     @Override
314     public boolean mute(UUID jobId) {
315         if (jobId == null) {
316             return false;
317         }
318
319         final String prefix = "DUMP";
320         int updatedEntities;
321
322         // Changing the topic (i.e. `job.status`) makes the job non-fetchable.
323         String hqlUpdate = "" +
324                 "update JobDaoImpl job set" +
325                 "   job.status = concat('" + prefix + "_', job.status)," +
326                 //  empty `takenBy`, because some logics treat taken as in-progress
327                 "   takenBy = null" +
328                 " where " +
329                 "   job.id = :id" +
330                 //  if prefix already on the topic -- no need to do it twice.
331                 "   and job.status NOT LIKE '" + prefix + "\\_%'";
332
333         updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
334                 session.createQuery(hqlUpdate)
335                         .setText("id", jobId.toString())
336                         .executeUpdate());
337
338         return updatedEntities != 0;
339     }
340
341     private static String sqlListOfFinalStatus =
342             String.format("(%s)",
343                 FINAL_STATUS.stream().
344                 map(x->String.format("'%s'",x)).
345                 collect(Collectors.joining(","))
346             );
347
348     @Override
349     public void deleteOldFinalJobs(long secondsAgo) {
350         String select = String.format(" MODIFIED_DATE <= '%s' and JOB_STATUS in %s", nowMinusInterval(secondsAgo), sqlListOfFinalStatus);
351         dataAccessService.deleteDomainObjects(JobDaoImpl.class, select, null);
352     }
353 }