59ca43743cf45a2a16cd3842944aa852d3ad1df0
[vid.git] / vid-app-common / src / main / java / org / onap / vid / job / impl / JobsBrokerServiceInDatabaseImpl.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * VID
4  * ================================================================================
5  * Copyright (C) 2017 - 2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.vid.job.impl;
22
23 import org.apache.commons.lang3.StringUtils;
24 import org.hibernate.SessionFactory;
25 import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate;
26 import org.onap.portalsdk.core.service.DataAccessService;
27 import org.onap.portalsdk.core.util.SystemProperties;
28 import org.onap.vid.exceptions.GenericUncheckedException;
29 import org.onap.vid.exceptions.OperationNotAllowedException;
30 import org.onap.vid.job.Job;
31 import org.onap.vid.job.JobsBrokerService;
32 import org.onap.vid.properties.VidProperties;
33 import org.onap.vid.utils.DaoUtils;
34 import org.springframework.beans.factory.annotation.Autowired;
35 import org.springframework.beans.factory.annotation.Value;
36 import org.springframework.stereotype.Service;
37
38 import javax.annotation.PostConstruct;
39 import java.nio.ByteBuffer;
40 import java.sql.Timestamp;
41 import java.time.LocalDateTime;
42 import java.util.*;
43
44 import static org.onap.vid.job.Job.JobStatus.CREATING;
45
46 @Service
47 public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
48
49     static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(JobsBrokerServiceInDatabaseImpl.class);
50
51     private final DataAccessService dataAccessService;
52
53     private final SessionFactory sessionFactory;
54     private int maxOpenedInstantiationRequestsToMso;
55     private int pollingIntervalSeconds;
56
57     @Autowired
58     public JobsBrokerServiceInDatabaseImpl(DataAccessService dataAccessService, SessionFactory sessionFactory,
59                                            @Value("0") int maxOpenedInstantiationRequestsToMso,
60                                            @Value("10") int pollingIntervalSeconds) {
61         // tha @Value will inject conservative defaults; overridden in @PostConstruct from configuration
62         this.dataAccessService = dataAccessService;
63         this.sessionFactory = sessionFactory;
64         this.maxOpenedInstantiationRequestsToMso = maxOpenedInstantiationRequestsToMso;
65         this.pollingIntervalSeconds = pollingIntervalSeconds;
66     }
67
68     @PostConstruct
69     public void configure() {
70         maxOpenedInstantiationRequestsToMso = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_MAX_OPENED_INSTANTIATION_REQUESTS));
71         pollingIntervalSeconds = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_ASYNC_POLLING_INTERVAL_SECONDS));
72     }
73
74     public void deleteAll() {
75         dataAccessService.deleteDomainObjects(JobDaoImpl.class, "1=1", null);
76     }
77
78     @Override
79     public UUID add(Job job) {
80         final JobDaoImpl jobDao = castToJobDaoImpl(job);
81         dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
82         return job.getUuid();
83     }
84
85     @Override
86     public Optional<Job> pull(Job.JobStatus topic, String ownerId) {
87         JobDaoImpl daoJob;
88         int updatedEntities;
89         do {
90             String query = sqlQueryForTopic(topic);
91             List<JobDaoImpl> jobs = dataAccessService.executeSQLQuery(query, JobDaoImpl.class, null);
92             if (jobs.isEmpty()) {
93                 return Optional.empty();
94             }
95
96             daoJob = jobs.get(0);
97
98             final UUID uuid = daoJob.getUuid();
99             final Integer age = daoJob.getAge();
100
101             daoJob.setTakenBy(ownerId);
102
103             // It might become that daoJob was taken and pushed-back already, before we
104             // arrived here, so we're verifying the age was not pushed forward.
105             // Age is actually forwarded upon pushBack().
106             String hqlUpdate = "update JobDaoImpl job set job.takenBy = :takenBy where " +
107                     " job.id = :id" +
108                     " and job.age = :age" +
109                     " and takenBy is null";
110             updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
111                     session.createQuery(hqlUpdate)
112                             .setText("id", uuid.toString())
113                             .setInteger("age", age)
114                             .setText("takenBy", ownerId)
115                             .executeUpdate());
116
117         } while (updatedEntities == 0);
118
119         return Optional.ofNullable(daoJob);
120     }
121
122     private java.sql.Timestamp nowMinusInterval() {
123         return Timestamp.valueOf(LocalDateTime.now().minusSeconds(pollingIntervalSeconds));
124     }
125
126     private String selectQueryByJobStatus(Job.JobStatus topic){
127
128         String intervalCondition = (topic==CREATING) ? "" : (" and MODIFIED_DATE <= '" + nowMinusInterval()+"'");
129         return "" +
130                 "select * from VID_JOB" +
131                 " where" +
132                 // select only non-deleted in-progress jobs
133                 "    JOB_STATUS = '" + topic + "'" +
134                 "    and TAKEN_BY is null" +
135                 "    and DELETED_AT is null" +
136                 // give some breath, don't select jos that were recently reached
137                  intervalCondition +
138                 // take the oldest handled one
139                 " order by MODIFIED_DATE ASC" +
140                 // select only one result
141                 " limit 1";
142     }
143
144     private String sqlQueryForTopic(Job.JobStatus topic) {
145         switch (topic) {
146             case IN_PROGRESS:
147             case RESOURCE_IN_PROGRESS:
148             case CREATING:
149                 return selectQueryByJobStatus(topic);
150             case PENDING:
151                 return "" +
152                         // select only pending jobs
153                         "select vid_job.* from VID_JOB " +
154                         // select users have in_progress jobs
155                         "left join \n" +
156                         " (select user_Id, 1 as has_any_in_progress_job from VID_JOB  where JOB_STATUS = 'IN_PROGRESS' or TAKEN_BY IS NOT NULL \n" +
157                         "group by user_id)  users_have_any_in_progress_job_tbl\n" +
158                         "on vid_job.user_id = users_have_any_in_progress_job_tbl.user_id " +
159                         "where JOB_STATUS = 'PENDING' and TAKEN_BY is null" +
160                         // job is not deleted
161                         "      AND DELETED_AT is null and (\n" +
162                         // limit in-progress to some amount
163                         "select sum(CASE WHEN JOB_STATUS='IN_PROGRESS' or (JOB_STATUS='PENDING' and TAKEN_BY IS NOT NULL) THEN 1 ELSE 0 END) as in_progress\n" +
164                         "from VID_JOB ) <" + maxOpenedInstantiationRequestsToMso + " \n " +
165                         // don't take jobs from templates that already in-progress/failed
166                         "and TEMPLATE_Id not in \n" +
167                         "(select TEMPLATE_Id from vid_job where" +
168                         "   TEMPLATE_Id IS NOT NULL and("+
169                         "   (JOB_STATUS='FAILED' and DELETED_AT is null)" + // failed but not deleted
170                         "   or JOB_STATUS='IN_PROGRESS'" +
171                         "   or TAKEN_BY IS NOT NULL))" + " \n " +
172                         // prefer older jobs, but the earlier in each bulk
173                         "order by has_any_in_progress_job, CREATED_DATE, INDEX_IN_BULK " +
174                         // select only one result
175                         "limit 1";
176             default:
177                 throw new GenericUncheckedException("Unsupported topic to pull from: " + topic);
178         }
179     }
180
181
182     private byte[] getUuidAsByteArray(UUID owner) {
183         ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
184         bb.putLong(owner.getMostSignificantBits());
185         bb.putLong(owner.getLeastSignificantBits());
186         return bb.array();
187     }
188
189     @Override
190     public void pushBack(Job job) {
191         final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, job.getUuid(), null);
192
193         if (remoteDaoJob == null) {
194             throw new IllegalStateException("Can push back only pulled jobs. Add new jobs using add()");
195         }
196
197         if (remoteDaoJob.getTakenBy() == null) {
198             throw new IllegalStateException("Can push back only pulled jobs. This one already pushed back.");
199         }
200
201         final JobDaoImpl jobDao = castToJobDaoImpl(job);
202
203         jobDao.setTakenBy(null);
204
205         Integer age = jobDao.getAge();
206         jobDao.setAge(age + 1);
207
208         logger.debug(EELFLoggerDelegate.debugLogger, "{}/{}", jobDao.getStatus(), jobDao.getType());
209
210         dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
211     }
212
213     private JobDaoImpl castToJobDaoImpl(Job job) {
214         if (!(job instanceof JobDaoImpl)) {
215             throw new UnsupportedOperationException("Can't add " + job.getClass() + " to " + this.getClass());
216         }
217         return (JobDaoImpl) job;
218     }
219
220     @Override
221     public Collection<Job> peek() {
222         return dataAccessService.getList(JobDaoImpl.class, null);
223     }
224
225     @Override
226     public Job peek(UUID jobId) {
227         return (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null);
228     }
229
230     @Override
231     public void delete(UUID jobId) {
232         int updatedEntities;
233         Date now = new Date();
234
235         String hqlUpdate = "update JobDaoImpl job set job.deletedAt = :now where " +
236                 " job.id = :id" +
237                 " and job.status in(:pending, :stopped)" +
238                 " and takenBy is null";
239
240         updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
241                 session.createQuery(hqlUpdate)
242                         .setTimestamp("now", now)
243                         .setText("id", jobId.toString())
244                         .setText("pending", Job.JobStatus.PENDING.toString())
245                         .setText("stopped", Job.JobStatus.STOPPED.toString())
246                         .executeUpdate());
247
248         if (updatedEntities == 0) {
249             final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null);
250
251             if (remoteDaoJob == null || remoteDaoJob.getUuid() == null) {
252                 logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service does not exist", jobId);
253                 throw new OperationNotAllowedException("Service does not exist");
254             }
255
256             if (!remoteDaoJob.getStatus().equals(Job.JobStatus.PENDING) && !remoteDaoJob.getStatus().equals(Job.JobStatus.STOPPED) || !StringUtils.isEmpty(remoteDaoJob.getTakenBy()) ) {
257                 logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service status does not allow deletion from the queue, status = {}", jobId, remoteDaoJob.getStatus() +
258                 ", takenBy " + remoteDaoJob.getTakenBy());
259                 throw new OperationNotAllowedException("Service status does not allow deletion from the queue");
260             }
261
262             throw new OperationNotAllowedException("Service deletion failed");
263         }
264     }
265
266     @Override
267     public boolean mute(UUID jobId) {
268         if (jobId == null) {
269             return false;
270         }
271
272         final String prefix = "DUMP";
273         int updatedEntities;
274
275         // Changing the topic (i.e. `job.status`) makes the job non-fetchable.
276         String hqlUpdate = "" +
277                 "update JobDaoImpl job set" +
278                 "   job.status = concat('" + prefix + "_', job.status)," +
279                 //  empty `takenBy`, because some logics treat taken as in-progress
280                 "   takenBy = null" +
281                 " where " +
282                 "   job.id = :id" +
283                 //  if prefix already on the topic -- no need to do it twice.
284                 "   and job.status NOT LIKE '" + prefix + "\\_%'";
285
286         updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
287                 session.createQuery(hqlUpdate)
288                         .setText("id", jobId.toString())
289                         .executeUpdate());
290
291         return updatedEntities != 0;
292     }
293 }