e286cc4aaab9841b362f9b165c9d96847cb5d257
[vid.git] / vid-app-common / src / main / java / org / onap / vid / job / impl / JobsBrokerServiceInDatabaseImpl.java
1 package org.onap.vid.job.impl;
2
3 import org.apache.commons.lang3.StringUtils;
4 import org.hibernate.SessionFactory;
5 import org.onap.vid.exceptions.GenericUncheckedException;
6 import org.onap.vid.exceptions.OperationNotAllowedException;
7 import org.onap.vid.job.Job;
8 import org.onap.vid.job.JobsBrokerService;
9 import org.onap.vid.properties.VidProperties;
10 import org.onap.vid.utils.DaoUtils;
11 import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate;
12 import org.onap.portalsdk.core.service.DataAccessService;
13 import org.onap.portalsdk.core.util.SystemProperties;
14 import org.springframework.beans.factory.annotation.Autowired;
15 import org.springframework.beans.factory.annotation.Value;
16 import org.springframework.stereotype.Service;
17
18 import javax.annotation.PostConstruct;
19 import java.nio.ByteBuffer;
20 import java.sql.Timestamp;
21 import java.time.LocalDateTime;
22 import java.util.*;
23
24 @Service
25 public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
26
27     static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(JobsBrokerServiceInDatabaseImpl.class);
28
29     private final DataAccessService dataAccessService;
30
31     private final SessionFactory sessionFactory;
32     private int maxOpenedInstantiationRequestsToMso;
33     private int pollingIntervalSeconds;
34
35     @Autowired
36     public JobsBrokerServiceInDatabaseImpl(DataAccessService dataAccessService, SessionFactory sessionFactory,
37                                            @Value("0") int maxOpenedInstantiationRequestsToMso,
38                                            @Value("10") int pollingIntervalSeconds) {
39         // tha @Value will inject conservative defaults; overridden in @PostConstruct from configuration
40         this.dataAccessService = dataAccessService;
41         this.sessionFactory = sessionFactory;
42         this.maxOpenedInstantiationRequestsToMso = maxOpenedInstantiationRequestsToMso;
43         this.pollingIntervalSeconds = pollingIntervalSeconds;
44     }
45
46     @PostConstruct
47     public void configure() {
48         maxOpenedInstantiationRequestsToMso = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_MAX_OPENED_INSTANTIATION_REQUESTS));
49         pollingIntervalSeconds = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_ASYNC_POLLING_INTERVAL_SECONDS));
50     }
51
52     public void deleteAll() {
53         dataAccessService.deleteDomainObjects(JobDaoImpl.class, "1=1", null);
54     }
55
56     @Override
57     public UUID add(Job job) {
58         final JobDaoImpl jobDao = castToJobDaoImpl(job);
59         jobDao.setUuid(UUID.randomUUID());
60         dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
61         return job.getUuid();
62     }
63
64     @Override
65     public Optional<Job> pull(Job.JobStatus topic, String ownerId) {
66         JobDaoImpl daoJob;
67         int updatedEntities;
68         do {
69             String query = sqlQueryForTopic(topic);
70             List<JobDaoImpl> jobs = dataAccessService.executeSQLQuery(query, JobDaoImpl.class, null);
71             if (jobs.isEmpty()) {
72                 return Optional.empty();
73             }
74
75             daoJob = jobs.get(0);
76
77             final UUID uuid = daoJob.getUuid();
78             final Integer age = daoJob.getAge();
79
80             daoJob.setTakenBy(ownerId);
81
82             // It might become that daoJob was taken and pushed-back already, before we
83             // arrived here, so we're verifying the age was not pushed forward.
84             // Age is actually forwarded upon pushBack().
85             String hqlUpdate = "update JobDaoImpl job set job.takenBy = :takenBy where " +
86                     " job.id = :id" +
87                     " and job.age = :age" +
88                     " and takenBy is null";
89             updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
90                     session.createQuery(hqlUpdate)
91                             .setText("id", uuid.toString())
92                             .setInteger("age", age)
93                             .setText("takenBy", ownerId)
94                             .executeUpdate());
95
96         } while (updatedEntities == 0);
97
98         return Optional.ofNullable(daoJob);
99     }
100
101     private java.sql.Timestamp nowMinusInterval() {
102         return Timestamp.valueOf(LocalDateTime.now().minusSeconds(pollingIntervalSeconds));
103     }
104
105     private String sqlQueryForTopic(Job.JobStatus topic) {
106         switch (topic) {
107             case IN_PROGRESS:
108                 return "" +
109                         "select * from VID_JOB" +
110                         " where" +
111                         // select only non-deleted in-progress jobs
112                         "    JOB_STATUS = 'IN_PROGRESS'" +
113                         "    and TAKEN_BY is null" +
114                         "    and DELETED_AT is null" +
115                         // give some breath, don't select jos that were recently reached
116                         "    and MODIFIED_DATE <= '" + nowMinusInterval() +
117                         // take the oldest handled one
118                         "' order by MODIFIED_DATE ASC" +
119                         // select only one result
120                         " limit 1";
121
122             case PENDING:
123                 return "" +
124                         // select only pending jobs
125                         "select vid_job.* from VID_JOB " +
126                         // select users have in_progress jobs
127                         "left join \n" +
128                         " (select user_Id, 1 as has_any_in_progress_job from VID_JOB  where JOB_STATUS = 'IN_PROGRESS' or TAKEN_BY IS NOT NULL \n" +
129                         "group by user_id)  users_have_any_in_progress_job_tbl\n" +
130                         "on vid_job.user_id = users_have_any_in_progress_job_tbl.user_id " +
131                         "where JOB_STATUS = 'PENDING' and TAKEN_BY is null" +
132                         // job is not deleted
133                         "      AND DELETED_AT is null and (\n" +
134                         // limit in-progress to some amount
135                         "select sum(CASE WHEN JOB_STATUS='IN_PROGRESS' or (JOB_STATUS='PENDING' and TAKEN_BY IS NOT NULL) THEN 1 ELSE 0 END) as in_progress\n" +
136                         "from VID_JOB ) <" + maxOpenedInstantiationRequestsToMso + " \n " +
137                         // don't take jobs from templates that already in-progress/failed
138                         "and TEMPLATE_Id not in \n" +
139                         "(select TEMPLATE_Id from vid_job where" +
140                         "   (JOB_STATUS='FAILED' and DELETED_AT is null)" + // failed but not deleted
141                         "   or JOB_STATUS='IN_PROGRESS'" +
142                         "   or TAKEN_BY IS NOT NULL)" + " \n " +
143                         // prefer older jobs, but the earlier in each bulk
144                         "order by has_any_in_progress_job, CREATED_DATE, INDEX_IN_BULK " +
145                         // select only one result
146                         "limit 1";
147             default:
148                 throw new GenericUncheckedException("Unsupported topic to pull from: " + topic);
149         }
150     }
151
152
153     private byte[] getUuidAsByteArray(UUID owner) {
154         ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
155         bb.putLong(owner.getMostSignificantBits());
156         bb.putLong(owner.getLeastSignificantBits());
157         return bb.array();
158     }
159
160     @Override
161     public void pushBack(Job job) {
162         final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, job.getUuid(), null);
163
164         if (remoteDaoJob == null) {
165             throw new IllegalStateException("Can push back only pulled jobs. Add new jobs using add()");
166         }
167
168         if (remoteDaoJob.getTakenBy() == null) {
169             throw new IllegalStateException("Can push back only pulled jobs. This one already pushed back.");
170         }
171
172         final JobDaoImpl jobDao = castToJobDaoImpl(job);
173
174         jobDao.setTakenBy(null);
175
176         Integer age = jobDao.getAge();
177         jobDao.setAge(age + 1);
178
179         logger.debug(EELFLoggerDelegate.debugLogger, "{}/{}", jobDao.getStatus(), jobDao.getType());
180
181         dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
182     }
183
184     private JobDaoImpl castToJobDaoImpl(Job job) {
185         if (!(job instanceof JobDaoImpl)) {
186             throw new UnsupportedOperationException("Can't add " + job.getClass() + " to " + this.getClass());
187         }
188         return (JobDaoImpl) job;
189     }
190
191     @Override
192     public Collection<Job> peek() {
193         return dataAccessService.getList(JobDaoImpl.class, null);
194     }
195
196     @Override
197     public Job peek(UUID jobId) {
198         return (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null);
199     }
200
201     @Override
202     public void delete(UUID jobId) {
203         int updatedEntities;
204         Date now = new Date();
205
206         String hqlUpdate = "update JobDaoImpl job set job.deletedAt = :now where " +
207                 " job.id = :id" +
208                 " and job.status in(:pending, :stopped)" +
209                 " and takenBy is null";
210
211         updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
212                 session.createQuery(hqlUpdate)
213                         .setTimestamp("now", now)
214                         .setText("id", jobId.toString())
215                         .setText("pending", Job.JobStatus.PENDING.toString())
216                         .setText("stopped", Job.JobStatus.STOPPED.toString())
217                         .executeUpdate());
218
219         if (updatedEntities == 0) {
220             final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null);
221
222             if (remoteDaoJob == null || remoteDaoJob.getUuid() == null) {
223                 logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service does not exist", jobId);
224                 throw new OperationNotAllowedException("Service does not exist");
225             }
226
227             if (!remoteDaoJob.getStatus().equals(Job.JobStatus.PENDING) && !remoteDaoJob.getStatus().equals(Job.JobStatus.STOPPED) || !StringUtils.isEmpty(remoteDaoJob.getTakenBy()) ) {
228                 logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service status does not allow deletion from the queue, status = {}", jobId, remoteDaoJob.getStatus() +
229                 ", takenBy " + remoteDaoJob.getTakenBy());
230                 throw new OperationNotAllowedException("Service status does not allow deletion from the queue");
231             }
232
233             throw new OperationNotAllowedException("Service deletion failed");
234         }
235     }
236 }