Merge from ECOMP's repository
[vid.git] / vid-app-common / src / main / java / org / onap / vid / job / impl / JobsBrokerServiceInDatabaseImpl.java
1 package org.onap.vid.job.impl;
2
3 import org.apache.commons.lang3.StringUtils;
4 import org.hibernate.SessionFactory;
5 import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate;
6 import org.onap.portalsdk.core.service.DataAccessService;
7 import org.onap.portalsdk.core.util.SystemProperties;
8 import org.onap.vid.exceptions.GenericUncheckedException;
9 import org.onap.vid.exceptions.OperationNotAllowedException;
10 import org.onap.vid.job.Job;
11 import org.onap.vid.job.JobsBrokerService;
12 import org.onap.vid.properties.VidProperties;
13 import org.onap.vid.utils.DaoUtils;
14 import org.springframework.beans.factory.annotation.Autowired;
15 import org.springframework.beans.factory.annotation.Value;
16 import org.springframework.stereotype.Service;
17
18 import javax.annotation.PostConstruct;
19 import java.nio.ByteBuffer;
20 import java.sql.Timestamp;
21 import java.time.LocalDateTime;
22 import java.util.*;
23
24 import static org.onap.vid.job.Job.JobStatus.CREATING;
25
26 @Service
27 public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
28
29     static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(JobsBrokerServiceInDatabaseImpl.class);
30
31     private final DataAccessService dataAccessService;
32
33     private final SessionFactory sessionFactory;
34     private int maxOpenedInstantiationRequestsToMso;
35     private int pollingIntervalSeconds;
36
37     @Autowired
38     public JobsBrokerServiceInDatabaseImpl(DataAccessService dataAccessService, SessionFactory sessionFactory,
39                                            @Value("0") int maxOpenedInstantiationRequestsToMso,
40                                            @Value("10") int pollingIntervalSeconds) {
41         // tha @Value will inject conservative defaults; overridden in @PostConstruct from configuration
42         this.dataAccessService = dataAccessService;
43         this.sessionFactory = sessionFactory;
44         this.maxOpenedInstantiationRequestsToMso = maxOpenedInstantiationRequestsToMso;
45         this.pollingIntervalSeconds = pollingIntervalSeconds;
46     }
47
48     @PostConstruct
49     public void configure() {
50         maxOpenedInstantiationRequestsToMso = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_MAX_OPENED_INSTANTIATION_REQUESTS));
51         pollingIntervalSeconds = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_ASYNC_POLLING_INTERVAL_SECONDS));
52     }
53
54     public void deleteAll() {
55         dataAccessService.deleteDomainObjects(JobDaoImpl.class, "1=1", null);
56     }
57
58     @Override
59     public UUID add(Job job) {
60         final JobDaoImpl jobDao = castToJobDaoImpl(job);
61         dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
62         return job.getUuid();
63     }
64
65     @Override
66     public Optional<Job> pull(Job.JobStatus topic, String ownerId) {
67         JobDaoImpl daoJob;
68         int updatedEntities;
69         do {
70             String query = sqlQueryForTopic(topic);
71             List<JobDaoImpl> jobs = dataAccessService.executeSQLQuery(query, JobDaoImpl.class, null);
72             if (jobs.isEmpty()) {
73                 return Optional.empty();
74             }
75
76             daoJob = jobs.get(0);
77
78             final UUID uuid = daoJob.getUuid();
79             final Integer age = daoJob.getAge();
80
81             daoJob.setTakenBy(ownerId);
82
83             // It might become that daoJob was taken and pushed-back already, before we
84             // arrived here, so we're verifying the age was not pushed forward.
85             // Age is actually forwarded upon pushBack().
86             String hqlUpdate = "update JobDaoImpl job set job.takenBy = :takenBy where " +
87                     " job.id = :id" +
88                     " and job.age = :age" +
89                     " and takenBy is null";
90             updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
91                     session.createQuery(hqlUpdate)
92                             .setText("id", uuid.toString())
93                             .setInteger("age", age)
94                             .setText("takenBy", ownerId)
95                             .executeUpdate());
96
97         } while (updatedEntities == 0);
98
99         return Optional.ofNullable(daoJob);
100     }
101
102     private java.sql.Timestamp nowMinusInterval() {
103         return Timestamp.valueOf(LocalDateTime.now().minusSeconds(pollingIntervalSeconds));
104     }
105
106     private String selectQueryByJobStatus(Job.JobStatus topic){
107
108         String intervalCondition = (topic==CREATING) ? "" : (" and MODIFIED_DATE <= '" + nowMinusInterval()+"'");
109         return "" +
110                 "select * from VID_JOB" +
111                 " where" +
112                 // select only non-deleted in-progress jobs
113                 "    JOB_STATUS = '" + topic + "'" +
114                 "    and TAKEN_BY is null" +
115                 "    and DELETED_AT is null" +
116                 // give some breath, don't select jos that were recently reached
117                  intervalCondition +
118                 // take the oldest handled one
119                 " order by MODIFIED_DATE ASC" +
120                 // select only one result
121                 " limit 1";
122     }
123
124     private String sqlQueryForTopic(Job.JobStatus topic) {
125         switch (topic) {
126             case IN_PROGRESS:
127             case RESOURCE_IN_PROGRESS:
128             case CREATING:
129                 return selectQueryByJobStatus(topic);
130             case PENDING:
131                 return "" +
132                         // select only pending jobs
133                         "select vid_job.* from VID_JOB " +
134                         // select users have in_progress jobs
135                         "left join \n" +
136                         " (select user_Id, 1 as has_any_in_progress_job from VID_JOB  where JOB_STATUS = 'IN_PROGRESS' or TAKEN_BY IS NOT NULL \n" +
137                         "group by user_id)  users_have_any_in_progress_job_tbl\n" +
138                         "on vid_job.user_id = users_have_any_in_progress_job_tbl.user_id " +
139                         "where JOB_STATUS = 'PENDING' and TAKEN_BY is null" +
140                         // job is not deleted
141                         "      AND DELETED_AT is null and (\n" +
142                         // limit in-progress to some amount
143                         "select sum(CASE WHEN JOB_STATUS='IN_PROGRESS' or (JOB_STATUS='PENDING' and TAKEN_BY IS NOT NULL) THEN 1 ELSE 0 END) as in_progress\n" +
144                         "from VID_JOB ) <" + maxOpenedInstantiationRequestsToMso + " \n " +
145                         // don't take jobs from templates that already in-progress/failed
146                         "and TEMPLATE_Id not in \n" +
147                         "(select TEMPLATE_Id from vid_job where" +
148                         "   TEMPLATE_Id IS NOT NULL and("+
149                         "   (JOB_STATUS='FAILED' and DELETED_AT is null)" + // failed but not deleted
150                         "   or JOB_STATUS='IN_PROGRESS'" +
151                         "   or TAKEN_BY IS NOT NULL))" + " \n " +
152                         // prefer older jobs, but the earlier in each bulk
153                         "order by has_any_in_progress_job, CREATED_DATE, INDEX_IN_BULK " +
154                         // select only one result
155                         "limit 1";
156             default:
157                 throw new GenericUncheckedException("Unsupported topic to pull from: " + topic);
158         }
159     }
160
161
162     private byte[] getUuidAsByteArray(UUID owner) {
163         ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
164         bb.putLong(owner.getMostSignificantBits());
165         bb.putLong(owner.getLeastSignificantBits());
166         return bb.array();
167     }
168
169     @Override
170     public void pushBack(Job job) {
171         final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, job.getUuid(), null);
172
173         if (remoteDaoJob == null) {
174             throw new IllegalStateException("Can push back only pulled jobs. Add new jobs using add()");
175         }
176
177         if (remoteDaoJob.getTakenBy() == null) {
178             throw new IllegalStateException("Can push back only pulled jobs. This one already pushed back.");
179         }
180
181         final JobDaoImpl jobDao = castToJobDaoImpl(job);
182
183         jobDao.setTakenBy(null);
184
185         Integer age = jobDao.getAge();
186         jobDao.setAge(age + 1);
187
188         logger.debug(EELFLoggerDelegate.debugLogger, "{}/{}", jobDao.getStatus(), jobDao.getType());
189
190         dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
191     }
192
193     private JobDaoImpl castToJobDaoImpl(Job job) {
194         if (!(job instanceof JobDaoImpl)) {
195             throw new UnsupportedOperationException("Can't add " + job.getClass() + " to " + this.getClass());
196         }
197         return (JobDaoImpl) job;
198     }
199
200     @Override
201     public Collection<Job> peek() {
202         return dataAccessService.getList(JobDaoImpl.class, null);
203     }
204
205     @Override
206     public Job peek(UUID jobId) {
207         return (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null);
208     }
209
210     @Override
211     public void delete(UUID jobId) {
212         int updatedEntities;
213         Date now = new Date();
214
215         String hqlUpdate = "update JobDaoImpl job set job.deletedAt = :now where " +
216                 " job.id = :id" +
217                 " and job.status in(:pending, :stopped)" +
218                 " and takenBy is null";
219
220         updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
221                 session.createQuery(hqlUpdate)
222                         .setTimestamp("now", now)
223                         .setText("id", jobId.toString())
224                         .setText("pending", Job.JobStatus.PENDING.toString())
225                         .setText("stopped", Job.JobStatus.STOPPED.toString())
226                         .executeUpdate());
227
228         if (updatedEntities == 0) {
229             final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null);
230
231             if (remoteDaoJob == null || remoteDaoJob.getUuid() == null) {
232                 logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service does not exist", jobId);
233                 throw new OperationNotAllowedException("Service does not exist");
234             }
235
236             if (!remoteDaoJob.getStatus().equals(Job.JobStatus.PENDING) && !remoteDaoJob.getStatus().equals(Job.JobStatus.STOPPED) || !StringUtils.isEmpty(remoteDaoJob.getTakenBy()) ) {
237                 logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service status does not allow deletion from the queue, status = {}", jobId, remoteDaoJob.getStatus() +
238                 ", takenBy " + remoteDaoJob.getTakenBy());
239                 throw new OperationNotAllowedException("Service status does not allow deletion from the queue");
240             }
241
242             throw new OperationNotAllowedException("Service deletion failed");
243         }
244     }
245
246     @Override
247     public boolean mute(UUID jobId) {
248         if (jobId == null) {
249             return false;
250         }
251
252         final String prefix = "DUMP";
253         int updatedEntities;
254
255         // Changing the topic (i.e. `job.status`) makes the job non-fetchable.
256         String hqlUpdate = "" +
257                 "update JobDaoImpl job set" +
258                 "   job.status = concat('" + prefix + "_', job.status)," +
259                 //  empty `takenBy`, because some logics treat taken as in-progress
260                 "   takenBy = null" +
261                 " where " +
262                 "   job.id = :id" +
263                 //  if prefix already on the topic -- no need to do it twice.
264                 "   and job.status NOT LIKE '" + prefix + "\\_%'";
265
266         updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
267                 session.createQuery(hqlUpdate)
268                         .setText("id", jobId.toString())
269                         .executeUpdate());
270
271         return updatedEntities != 0;
272     }
273 }