1 package org.onap.vid.job.impl;
3 import org.apache.commons.lang3.StringUtils;
4 import org.hibernate.SessionFactory;
5 import org.onap.vid.exceptions.GenericUncheckedException;
6 import org.onap.vid.exceptions.OperationNotAllowedException;
7 import org.onap.vid.job.Job;
8 import org.onap.vid.job.JobsBrokerService;
9 import org.onap.vid.properties.VidProperties;
10 import org.onap.vid.utils.DaoUtils;
11 import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate;
12 import org.onap.portalsdk.core.service.DataAccessService;
13 import org.onap.portalsdk.core.util.SystemProperties;
14 import org.springframework.beans.factory.annotation.Autowired;
15 import org.springframework.beans.factory.annotation.Value;
16 import org.springframework.stereotype.Service;
18 import javax.annotation.PostConstruct;
19 import java.nio.ByteBuffer;
20 import java.sql.Timestamp;
21 import java.time.LocalDateTime;
25 public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
27 static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(JobsBrokerServiceInDatabaseImpl.class);
29 private final DataAccessService dataAccessService;
31 private final SessionFactory sessionFactory;
32 private int maxOpenedInstantiationRequestsToMso;
33 private int pollingIntervalSeconds;
36 public JobsBrokerServiceInDatabaseImpl(DataAccessService dataAccessService, SessionFactory sessionFactory,
37 @Value("0") int maxOpenedInstantiationRequestsToMso,
38 @Value("10") int pollingIntervalSeconds) {
39 // tha @Value will inject conservative defaults; overridden in @PostConstruct from configuration
40 this.dataAccessService = dataAccessService;
41 this.sessionFactory = sessionFactory;
42 this.maxOpenedInstantiationRequestsToMso = maxOpenedInstantiationRequestsToMso;
43 this.pollingIntervalSeconds = pollingIntervalSeconds;
47 public void configure() {
48 maxOpenedInstantiationRequestsToMso = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_MAX_OPENED_INSTANTIATION_REQUESTS));
49 pollingIntervalSeconds = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_ASYNC_POLLING_INTERVAL_SECONDS));
52 public void deleteAll() {
53 dataAccessService.deleteDomainObjects(JobDaoImpl.class, "1=1", null);
57 public UUID add(Job job) {
58 final JobDaoImpl jobDao = castToJobDaoImpl(job);
59 jobDao.setUuid(UUID.randomUUID());
60 dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
65 public Optional<Job> pull(Job.JobStatus topic, String ownerId) {
69 String query = sqlQueryForTopic(topic);
70 List<JobDaoImpl> jobs = dataAccessService.executeSQLQuery(query, JobDaoImpl.class, null);
72 return Optional.empty();
77 final UUID uuid = daoJob.getUuid();
78 final Integer age = daoJob.getAge();
80 daoJob.setTakenBy(ownerId);
82 // It might become that daoJob was taken and pushed-back already, before we
83 // arrived here, so we're verifying the age was not pushed forward.
84 // Age is actually forwarded upon pushBack().
85 String hqlUpdate = "update JobDaoImpl job set job.takenBy = :takenBy where " +
87 " and job.age = :age" +
88 " and takenBy is null";
89 updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
90 session.createQuery(hqlUpdate)
91 .setText("id", uuid.toString())
92 .setInteger("age", age)
93 .setText("takenBy", ownerId)
96 } while (updatedEntities == 0);
98 return Optional.ofNullable(daoJob);
101 private java.sql.Timestamp nowMinusInterval() {
102 return Timestamp.valueOf(LocalDateTime.now().minusSeconds(pollingIntervalSeconds));
105 private String sqlQueryForTopic(Job.JobStatus topic) {
109 "select * from VID_JOB" +
111 // select only non-deleted in-progress jobs
112 " JOB_STATUS = 'IN_PROGRESS'" +
113 " and TAKEN_BY is null" +
114 " and DELETED_AT is null" +
115 // give some breath, don't select jos that were recently reached
116 " and MODIFIED_DATE <= '" + nowMinusInterval() +
117 // take the oldest handled one
118 "' order by MODIFIED_DATE ASC" +
119 // select only one result
124 // select only pending jobs
125 "select vid_job.* from VID_JOB " +
126 // select users have in_progress jobs
128 " (select user_Id, 1 as has_any_in_progress_job from VID_JOB where JOB_STATUS = 'IN_PROGRESS' or TAKEN_BY IS NOT NULL \n" +
129 "group by user_id) users_have_any_in_progress_job_tbl\n" +
130 "on vid_job.user_id = users_have_any_in_progress_job_tbl.user_id " +
131 "where JOB_STATUS = 'PENDING' and TAKEN_BY is null" +
132 // job is not deleted
133 " AND DELETED_AT is null and (\n" +
134 // limit in-progress to some amount
135 "select sum(CASE WHEN JOB_STATUS='IN_PROGRESS' or (JOB_STATUS='PENDING' and TAKEN_BY IS NOT NULL) THEN 1 ELSE 0 END) as in_progress\n" +
136 "from VID_JOB ) <" + maxOpenedInstantiationRequestsToMso + " \n " +
137 // don't take jobs from templates that already in-progress/failed
138 "and TEMPLATE_Id not in \n" +
139 "(select TEMPLATE_Id from vid_job where" +
140 " (JOB_STATUS='FAILED' and DELETED_AT is null)" + // failed but not deleted
141 " or JOB_STATUS='IN_PROGRESS'" +
142 " or TAKEN_BY IS NOT NULL)" + " \n " +
143 // prefer older jobs, but the earlier in each bulk
144 "order by has_any_in_progress_job, CREATED_DATE, INDEX_IN_BULK " +
145 // select only one result
148 throw new GenericUncheckedException("Unsupported topic to pull from: " + topic);
153 private byte[] getUuidAsByteArray(UUID owner) {
154 ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
155 bb.putLong(owner.getMostSignificantBits());
156 bb.putLong(owner.getLeastSignificantBits());
161 public void pushBack(Job job) {
162 final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, job.getUuid(), null);
164 if (remoteDaoJob == null) {
165 throw new IllegalStateException("Can push back only pulled jobs. Add new jobs using add()");
168 if (remoteDaoJob.getTakenBy() == null) {
169 throw new IllegalStateException("Can push back only pulled jobs. This one already pushed back.");
172 final JobDaoImpl jobDao = castToJobDaoImpl(job);
174 jobDao.setTakenBy(null);
176 Integer age = jobDao.getAge();
177 jobDao.setAge(age + 1);
179 logger.debug(EELFLoggerDelegate.debugLogger, "{}/{}", jobDao.getStatus(), jobDao.getType());
181 dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
184 private JobDaoImpl castToJobDaoImpl(Job job) {
185 if (!(job instanceof JobDaoImpl)) {
186 throw new UnsupportedOperationException("Can't add " + job.getClass() + " to " + this.getClass());
188 return (JobDaoImpl) job;
192 public Collection<Job> peek() {
193 return dataAccessService.getList(JobDaoImpl.class, null);
197 public Job peek(UUID jobId) {
198 return (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null);
202 public void delete(UUID jobId) {
204 Date now = new Date();
206 String hqlUpdate = "update JobDaoImpl job set job.deletedAt = :now where " +
208 " and job.status in(:pending, :stopped)" +
209 " and takenBy is null";
211 updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
212 session.createQuery(hqlUpdate)
213 .setTimestamp("now", now)
214 .setText("id", jobId.toString())
215 .setText("pending", Job.JobStatus.PENDING.toString())
216 .setText("stopped", Job.JobStatus.STOPPED.toString())
219 if (updatedEntities == 0) {
220 final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null);
222 if (remoteDaoJob == null || remoteDaoJob.getUuid() == null) {
223 logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service does not exist", jobId);
224 throw new OperationNotAllowedException("Service does not exist");
227 if (!remoteDaoJob.getStatus().equals(Job.JobStatus.PENDING) && !remoteDaoJob.getStatus().equals(Job.JobStatus.STOPPED) || !StringUtils.isEmpty(remoteDaoJob.getTakenBy()) ) {
228 logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service status does not allow deletion from the queue, status = {}", jobId, remoteDaoJob.getStatus() +
229 ", takenBy " + remoteDaoJob.getTakenBy());
230 throw new OperationNotAllowedException("Service status does not allow deletion from the queue");
233 throw new OperationNotAllowedException("Service deletion failed");