2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 - 2019 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.vid.job.impl;
23 import org.apache.commons.lang3.StringUtils;
24 import org.hibernate.SessionFactory;
25 import org.jetbrains.annotations.NotNull;
26 import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate;
27 import org.onap.portalsdk.core.service.DataAccessService;
28 import org.onap.portalsdk.core.util.SystemProperties;
29 import org.onap.vid.exceptions.GenericUncheckedException;
30 import org.onap.vid.exceptions.OperationNotAllowedException;
31 import org.onap.vid.job.Job;
32 import org.onap.vid.job.JobsBrokerService;
33 import org.onap.vid.properties.VidProperties;
34 import org.onap.vid.services.VersionService;
35 import org.onap.vid.utils.DaoUtils;
36 import org.springframework.beans.factory.annotation.Autowired;
37 import org.springframework.beans.factory.annotation.Value;
38 import org.springframework.stereotype.Service;
40 import javax.annotation.PostConstruct;
41 import java.sql.Timestamp;
42 import java.time.LocalDateTime;
44 import java.util.stream.Collectors;
46 import static org.onap.vid.job.Job.JobStatus.*;
49 public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
51 static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(JobsBrokerServiceInDatabaseImpl.class);
53 private final DataAccessService dataAccessService;
55 private final SessionFactory sessionFactory;
56 private int maxOpenedInstantiationRequestsToMso;
57 private int pollingIntervalSeconds;
59 private final VersionService versionService;
62 public JobsBrokerServiceInDatabaseImpl(DataAccessService dataAccessService,
63 SessionFactory sessionFactory,
64 @Value("0") int maxOpenedInstantiationRequestsToMso,
65 @Value("10") int pollingIntervalSeconds,
66 VersionService versionService) {
67 // tha @Value will inject conservative defaults; overridden in @PostConstruct from configuration
68 this.dataAccessService = dataAccessService;
69 this.sessionFactory = sessionFactory;
70 this.maxOpenedInstantiationRequestsToMso = maxOpenedInstantiationRequestsToMso;
71 this.pollingIntervalSeconds = pollingIntervalSeconds;
72 this.versionService = versionService;
76 public void configure() {
77 maxOpenedInstantiationRequestsToMso = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_MAX_OPENED_INSTANTIATION_REQUESTS));
78 pollingIntervalSeconds = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_ASYNC_POLLING_INTERVAL_SECONDS));
81 public void deleteAll() {
82 dataAccessService.deleteDomainObjects(JobDaoImpl.class, "1=1", null);
86 public UUID add(Job job) {
87 final JobDaoImpl jobDao = castToJobDaoImpl(job);
88 jobDao.setBuild(versionService.retrieveBuildNumber());
89 dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
94 public Optional<Job> pull(Job.JobStatus topic, String ownerId) {
98 String query = sqlQueryForTopic(topic);
99 List<JobDaoImpl> jobs = dataAccessService.executeSQLQuery(query, JobDaoImpl.class, null);
100 if (jobs.isEmpty()) {
101 return Optional.empty();
104 daoJob = jobs.get(0);
106 final UUID uuid = daoJob.getUuid();
107 final Integer age = daoJob.getAge();
109 daoJob.setTakenBy(ownerId);
111 // It might become that daoJob was taken and pushed-back already, before we
112 // arrived here, so we're verifying the age was not pushed forward.
113 // Age is actually forwarded upon pushBack().
114 String hqlUpdate = "update JobDaoImpl job set job.takenBy = :takenBy where " +
116 " and job.age = :age" +
117 " and takenBy is null";
118 updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
119 session.createQuery(hqlUpdate)
120 .setText("id", uuid.toString())
121 .setInteger("age", age)
122 .setText("takenBy", ownerId)
125 } while (updatedEntities == 0);
127 return Optional.ofNullable(daoJob);
130 private java.sql.Timestamp nowMinusInterval() {
131 return nowMinusInterval(pollingIntervalSeconds);
134 private java.sql.Timestamp nowMinusInterval(long seconds) {
135 return Timestamp.valueOf(LocalDateTime.now().minusSeconds(seconds));
138 private String selectQueryByJobStatus(Job.JobStatus topic){
140 String intervalCondition = (topic==CREATING) ? "" : (" and MODIFIED_DATE <= '" + nowMinusInterval()+"'");
142 "select * from VID_JOB" +
144 // select only non-deleted in-progress jobs
145 filterByStatusNotTakenNotDeleted(topic) +
146 // give some breath, don't select jos that were recently reached
148 // take the oldest handled one
149 " order by MODIFIED_DATE ASC" +
150 // select only one result
155 private String filterByStatusNotTakenNotDeleted(Job.JobStatus topic) {
156 return " JOB_STATUS = '" + topic + "'" +
157 " and TAKEN_BY is null" +
158 " and DELETED_AT is null "+
159 " and BUILD = '"+ versionService.retrieveBuildNumber() +"'";
162 private String sqlQueryForTopic(Job.JobStatus topic) {
165 case RESOURCE_IN_PROGRESS:
167 return selectQueryByJobStatus(topic);
169 return selectQueryForPendingJob();
170 case PENDING_RESOURCE:
171 return selectQueryForPendingResource();
173 throw new GenericUncheckedException("Unsupported topic to pull from: " + topic);
178 private String selectQueryForPendingJob() {
180 // select only pending jobs
181 "select vid_job.* from VID_JOB " +
182 // select users have in_progress jobs
184 " (select user_Id, 1 as has_any_in_progress_job from VID_JOB where JOB_STATUS = 'IN_PROGRESS' or TAKEN_BY IS NOT NULL \n" +
185 "group by user_id) users_have_any_in_progress_job_tbl\n" +
186 "on vid_job.user_id = users_have_any_in_progress_job_tbl.user_id " +
187 "where "+filterByStatusNotTakenNotDeleted(Job.JobStatus.PENDING)+" and (\n" +
188 // limit in-progress to some amount
189 "select sum(CASE WHEN JOB_STATUS='IN_PROGRESS' or (JOB_STATUS='PENDING' and TAKEN_BY IS NOT NULL) THEN 1 ELSE 0 END) as in_progress\n" +
190 "from VID_JOB ) <" + maxOpenedInstantiationRequestsToMso + " \n " +
191 // don't take jobs from templates that already in-progress/failed
192 "and TEMPLATE_Id not in \n" +
193 "(select TEMPLATE_Id from vid_job where" +
194 " TEMPLATE_Id IS NOT NULL and("+
195 " (JOB_STATUS='FAILED' and DELETED_AT is null)" + // failed but not deleted
196 " or JOB_STATUS='IN_PROGRESS'" +
197 " or TAKEN_BY IS NOT NULL))" + " \n " +
198 // prefer older jobs, but the earlier in each bulk
199 "order by has_any_in_progress_job, CREATED_DATE, INDEX_IN_BULK " +
200 // select only one result
205 private String selectQueryForPendingResource() {
206 return "select * from vid_job as JOB left join \n" +
208 "(select template_id,count(*) as in_progress_count from vid_job \n" +
209 String.format("where (\n"+
211 //with job_status IN_PROGRESS or RESOURCE_IN_PROGRESS
212 " (job_status in ('%s','%s') and DELETED_AT is NULL) \n",IN_PROGRESS, RESOURCE_IN_PROGRESS)+
213 //or that with job_status PENDING_RESOURCE that are taken
214 String.format(" or (JOB_STATUS='%s' and TAKEN_BY IS NOT NULL)\n )\n", PENDING_RESOURCE) +
215 //with template ID and are not deleted
216 " and TEMPLATE_ID IS NOT NULL and DELETED_AT is NULL\n)\n" +
217 //join them to vid_job by template_id
218 "group by template_id)\n"+
219 "as COUNTER on COUNTER.template_id=JOB.template_id \n" +
222 //select jobs with job_status PENDING_RESOURCE that are nit taken and not deleted
223 filterByStatusNotTakenNotDeleted(PENDING_RESOURCE) + "\n" +
224 //that have no count in the counter (no other in progress job with same templateId)
225 " and in_progress_count is NULL \n" +
226 //and that have valid templateId
227 " and JOB.template_id is not NULL \n"+
229 //INDEX_IN_BULK is for order them inside same templateId,
230 //template_id - for order between different templateId (just to be deterministic)
231 "order by INDEX_IN_BULK,JOB.template_id \n" +
237 public void pushBack(Job job) {
238 final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, job.getUuid(), null);
240 if (remoteDaoJob == null) {
241 throw new IllegalStateException("Can push back only pulled jobs. Add new jobs using add()");
244 if (remoteDaoJob.getTakenBy() == null) {
245 throw new IllegalStateException("Can push back only pulled jobs. This one already pushed back.");
248 final JobDaoImpl jobDao = castToJobDaoImpl(job);
250 jobDao.setTakenBy(null);
252 Integer age = jobDao.getAge();
253 jobDao.setAge(age + 1);
255 logger.debug(EELFLoggerDelegate.debugLogger, "{}/{}", jobDao.getStatus(), jobDao.getType());
257 dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
260 private JobDaoImpl castToJobDaoImpl(Job job) {
261 if (!(job instanceof JobDaoImpl)) {
262 throw new UnsupportedOperationException("Can't add " + job.getClass() + " to " + this.getClass());
264 return (JobDaoImpl) job;
268 public Collection<Job> peek() {
269 return dataAccessService.getList(JobDaoImpl.class, null);
273 public Job peek(UUID jobId) {
274 return (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null);
278 public void delete(UUID jobId) {
280 Date now = new Date();
282 String hqlUpdate = "update JobDaoImpl job set job.deletedAt = :now where " +
284 " and job.status in(:pending, :stopped)" +
285 " and takenBy is null";
287 updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
288 session.createQuery(hqlUpdate)
289 .setTimestamp("now", now)
290 .setText("id", jobId.toString())
291 .setText("pending", Job.JobStatus.PENDING.toString())
292 .setText("stopped", Job.JobStatus.STOPPED.toString())
295 if (updatedEntities == 0) {
296 final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null);
298 if (remoteDaoJob == null || remoteDaoJob.getUuid() == null) {
299 logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service does not exist", jobId);
300 throw new OperationNotAllowedException("Service does not exist");
303 if ((remoteDaoJob.getStatus() != Job.JobStatus.PENDING) && (remoteDaoJob.getStatus() != Job.JobStatus.STOPPED) || !StringUtils.isEmpty(remoteDaoJob.getTakenBy()) ) {
304 logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service status does not allow deletion from the queue, status = {}", jobId, remoteDaoJob.getStatus() +
305 ", takenBy " + remoteDaoJob.getTakenBy());
306 throw new OperationNotAllowedException("Service status does not allow deletion from the queue");
309 throw new OperationNotAllowedException("Service deletion failed");
314 public boolean mute(UUID jobId) {
319 final String prefix = "DUMP";
322 // Changing the topic (i.e. `job.status`) makes the job non-fetchable.
323 String hqlUpdate = "" +
324 "update JobDaoImpl job set" +
325 " job.status = concat('" + prefix + "_', job.status)," +
326 // empty `takenBy`, because some logics treat taken as in-progress
330 // if prefix already on the topic -- no need to do it twice.
331 " and job.status NOT LIKE '" + prefix + "\\_%'";
333 updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
334 session.createQuery(hqlUpdate)
335 .setText("id", jobId.toString())
338 return updatedEntities != 0;
341 private static String sqlListOfFinalStatus =
342 String.format("(%s)",
343 FINAL_STATUS.stream().
344 map(x->String.format("'%s'",x)).
345 collect(Collectors.joining(","))
349 public void deleteOldFinalJobs(long secondsAgo) {
350 String select = String.format(" MODIFIED_DATE <= '%s' and JOB_STATUS in %s", nowMinusInterval(secondsAgo), sqlListOfFinalStatus);
351 dataAccessService.deleteDomainObjects(JobDaoImpl.class, select, null);