2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 - 2019 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.vid.job.impl;
23 import org.apache.commons.lang3.StringUtils;
24 import org.hibernate.SessionFactory;
25 import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate;
26 import org.onap.portalsdk.core.service.DataAccessService;
27 import org.onap.portalsdk.core.util.SystemProperties;
28 import org.onap.vid.exceptions.GenericUncheckedException;
29 import org.onap.vid.exceptions.OperationNotAllowedException;
30 import org.onap.vid.job.Job;
31 import org.onap.vid.job.JobsBrokerService;
32 import org.onap.vid.properties.VidProperties;
33 import org.onap.vid.utils.DaoUtils;
34 import org.springframework.beans.factory.annotation.Autowired;
35 import org.springframework.beans.factory.annotation.Value;
36 import org.springframework.stereotype.Service;
38 import javax.annotation.PostConstruct;
39 import java.nio.ByteBuffer;
40 import java.sql.Timestamp;
41 import java.time.LocalDateTime;
44 import static org.onap.vid.job.Job.JobStatus.CREATING;
47 public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
49 static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(JobsBrokerServiceInDatabaseImpl.class);
51 private final DataAccessService dataAccessService;
53 private final SessionFactory sessionFactory;
54 private int maxOpenedInstantiationRequestsToMso;
55 private int pollingIntervalSeconds;
58 public JobsBrokerServiceInDatabaseImpl(DataAccessService dataAccessService, SessionFactory sessionFactory,
59 @Value("0") int maxOpenedInstantiationRequestsToMso,
60 @Value("10") int pollingIntervalSeconds) {
61 // tha @Value will inject conservative defaults; overridden in @PostConstruct from configuration
62 this.dataAccessService = dataAccessService;
63 this.sessionFactory = sessionFactory;
64 this.maxOpenedInstantiationRequestsToMso = maxOpenedInstantiationRequestsToMso;
65 this.pollingIntervalSeconds = pollingIntervalSeconds;
69 public void configure() {
70 maxOpenedInstantiationRequestsToMso = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_MAX_OPENED_INSTANTIATION_REQUESTS));
71 pollingIntervalSeconds = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_ASYNC_POLLING_INTERVAL_SECONDS));
74 public void deleteAll() {
75 dataAccessService.deleteDomainObjects(JobDaoImpl.class, "1=1", null);
79 public UUID add(Job job) {
80 final JobDaoImpl jobDao = castToJobDaoImpl(job);
81 dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
86 public Optional<Job> pull(Job.JobStatus topic, String ownerId) {
90 String query = sqlQueryForTopic(topic);
91 List<JobDaoImpl> jobs = dataAccessService.executeSQLQuery(query, JobDaoImpl.class, null);
93 return Optional.empty();
98 final UUID uuid = daoJob.getUuid();
99 final Integer age = daoJob.getAge();
101 daoJob.setTakenBy(ownerId);
103 // It might become that daoJob was taken and pushed-back already, before we
104 // arrived here, so we're verifying the age was not pushed forward.
105 // Age is actually forwarded upon pushBack().
106 String hqlUpdate = "update JobDaoImpl job set job.takenBy = :takenBy where " +
108 " and job.age = :age" +
109 " and takenBy is null";
110 updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
111 session.createQuery(hqlUpdate)
112 .setText("id", uuid.toString())
113 .setInteger("age", age)
114 .setText("takenBy", ownerId)
117 } while (updatedEntities == 0);
119 return Optional.ofNullable(daoJob);
122 private java.sql.Timestamp nowMinusInterval() {
123 return Timestamp.valueOf(LocalDateTime.now().minusSeconds(pollingIntervalSeconds));
126 private String selectQueryByJobStatus(Job.JobStatus topic){
128 String intervalCondition = (topic==CREATING) ? "" : (" and MODIFIED_DATE <= '" + nowMinusInterval()+"'");
130 "select * from VID_JOB" +
132 // select only non-deleted in-progress jobs
133 " JOB_STATUS = '" + topic + "'" +
134 " and TAKEN_BY is null" +
135 " and DELETED_AT is null" +
136 // give some breath, don't select jos that were recently reached
138 // take the oldest handled one
139 " order by MODIFIED_DATE ASC" +
140 // select only one result
144 private String sqlQueryForTopic(Job.JobStatus topic) {
147 case RESOURCE_IN_PROGRESS:
149 return selectQueryByJobStatus(topic);
152 // select only pending jobs
153 "select vid_job.* from VID_JOB " +
154 // select users have in_progress jobs
156 " (select user_Id, 1 as has_any_in_progress_job from VID_JOB where JOB_STATUS = 'IN_PROGRESS' or TAKEN_BY IS NOT NULL \n" +
157 "group by user_id) users_have_any_in_progress_job_tbl\n" +
158 "on vid_job.user_id = users_have_any_in_progress_job_tbl.user_id " +
159 "where JOB_STATUS = 'PENDING' and TAKEN_BY is null" +
160 // job is not deleted
161 " AND DELETED_AT is null and (\n" +
162 // limit in-progress to some amount
163 "select sum(CASE WHEN JOB_STATUS='IN_PROGRESS' or (JOB_STATUS='PENDING' and TAKEN_BY IS NOT NULL) THEN 1 ELSE 0 END) as in_progress\n" +
164 "from VID_JOB ) <" + maxOpenedInstantiationRequestsToMso + " \n " +
165 // don't take jobs from templates that already in-progress/failed
166 "and TEMPLATE_Id not in \n" +
167 "(select TEMPLATE_Id from vid_job where" +
168 " TEMPLATE_Id IS NOT NULL and("+
169 " (JOB_STATUS='FAILED' and DELETED_AT is null)" + // failed but not deleted
170 " or JOB_STATUS='IN_PROGRESS'" +
171 " or TAKEN_BY IS NOT NULL))" + " \n " +
172 // prefer older jobs, but the earlier in each bulk
173 "order by has_any_in_progress_job, CREATED_DATE, INDEX_IN_BULK " +
174 // select only one result
177 throw new GenericUncheckedException("Unsupported topic to pull from: " + topic);
182 private byte[] getUuidAsByteArray(UUID owner) {
183 ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
184 bb.putLong(owner.getMostSignificantBits());
185 bb.putLong(owner.getLeastSignificantBits());
190 public void pushBack(Job job) {
191 final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, job.getUuid(), null);
193 if (remoteDaoJob == null) {
194 throw new IllegalStateException("Can push back only pulled jobs. Add new jobs using add()");
197 if (remoteDaoJob.getTakenBy() == null) {
198 throw new IllegalStateException("Can push back only pulled jobs. This one already pushed back.");
201 final JobDaoImpl jobDao = castToJobDaoImpl(job);
203 jobDao.setTakenBy(null);
205 Integer age = jobDao.getAge();
206 jobDao.setAge(age + 1);
208 logger.debug(EELFLoggerDelegate.debugLogger, "{}/{}", jobDao.getStatus(), jobDao.getType());
210 dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
213 private JobDaoImpl castToJobDaoImpl(Job job) {
214 if (!(job instanceof JobDaoImpl)) {
215 throw new UnsupportedOperationException("Can't add " + job.getClass() + " to " + this.getClass());
217 return (JobDaoImpl) job;
221 public Collection<Job> peek() {
222 return dataAccessService.getList(JobDaoImpl.class, null);
226 public Job peek(UUID jobId) {
227 return (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null);
231 public void delete(UUID jobId) {
233 Date now = new Date();
235 String hqlUpdate = "update JobDaoImpl job set job.deletedAt = :now where " +
237 " and job.status in(:pending, :stopped)" +
238 " and takenBy is null";
240 updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
241 session.createQuery(hqlUpdate)
242 .setTimestamp("now", now)
243 .setText("id", jobId.toString())
244 .setText("pending", Job.JobStatus.PENDING.toString())
245 .setText("stopped", Job.JobStatus.STOPPED.toString())
248 if (updatedEntities == 0) {
249 final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null);
251 if (remoteDaoJob == null || remoteDaoJob.getUuid() == null) {
252 logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service does not exist", jobId);
253 throw new OperationNotAllowedException("Service does not exist");
256 if (!remoteDaoJob.getStatus().equals(Job.JobStatus.PENDING) && !remoteDaoJob.getStatus().equals(Job.JobStatus.STOPPED) || !StringUtils.isEmpty(remoteDaoJob.getTakenBy()) ) {
257 logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service status does not allow deletion from the queue, status = {}", jobId, remoteDaoJob.getStatus() +
258 ", takenBy " + remoteDaoJob.getTakenBy());
259 throw new OperationNotAllowedException("Service status does not allow deletion from the queue");
262 throw new OperationNotAllowedException("Service deletion failed");
267 public boolean mute(UUID jobId) {
272 final String prefix = "DUMP";
275 // Changing the topic (i.e. `job.status`) makes the job non-fetchable.
276 String hqlUpdate = "" +
277 "update JobDaoImpl job set" +
278 " job.status = concat('" + prefix + "_', job.status)," +
279 // empty `takenBy`, because some logics treat taken as in-progress
283 // if prefix already on the topic -- no need to do it twice.
284 " and job.status NOT LIKE '" + prefix + "\\_%'";
286 updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
287 session.createQuery(hqlUpdate)
288 .setText("id", jobId.toString())
291 return updatedEntities != 0;