2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 - 2019 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.vid.job.impl;
23 import static org.onap.vid.job.Job.JobStatus.CREATING;
24 import static org.onap.vid.job.Job.JobStatus.FINAL_STATUS;
25 import static org.onap.vid.job.Job.JobStatus.IN_PROGRESS;
26 import static org.onap.vid.job.Job.JobStatus.PENDING_RESOURCE;
27 import static org.onap.vid.job.Job.JobStatus.RESOURCE_IN_PROGRESS;
29 import java.sql.Timestamp;
30 import java.time.LocalDateTime;
31 import java.util.Collection;
32 import java.util.Date;
33 import java.util.List;
34 import java.util.Optional;
35 import java.util.UUID;
36 import java.util.stream.Collectors;
37 import javax.annotation.PostConstruct;
38 import org.apache.commons.lang3.StringUtils;
39 import org.hibernate.SessionFactory;
40 import org.jetbrains.annotations.NotNull;
41 import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate;
42 import org.onap.portalsdk.core.service.DataAccessService;
43 import org.onap.portalsdk.core.util.SystemProperties;
44 import org.onap.vid.exceptions.GenericUncheckedException;
45 import org.onap.vid.exceptions.OperationNotAllowedException;
46 import org.onap.vid.job.Job;
47 import org.onap.vid.job.JobsBrokerService;
48 import org.onap.vid.properties.VidProperties;
49 import org.onap.vid.services.VersionService;
50 import org.onap.vid.utils.DaoUtils;
51 import org.springframework.beans.factory.annotation.Autowired;
52 import org.springframework.beans.factory.annotation.Value;
53 import org.springframework.stereotype.Service;
56 public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
58 static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(JobsBrokerServiceInDatabaseImpl.class);
60 private final DataAccessService dataAccessService;
62 private final SessionFactory sessionFactory;
63 private int maxOpenedInstantiationRequestsToMso;
64 private int pollingIntervalSeconds;
66 private final VersionService versionService;
69 public JobsBrokerServiceInDatabaseImpl(DataAccessService dataAccessService,
70 SessionFactory sessionFactory,
71 @Value("0") int maxOpenedInstantiationRequestsToMso,
72 @Value("10") int pollingIntervalSeconds,
73 VersionService versionService) {
74 // tha @Value will inject conservative defaults; overridden in @PostConstruct from configuration
75 this.dataAccessService = dataAccessService;
76 this.sessionFactory = sessionFactory;
77 this.maxOpenedInstantiationRequestsToMso = maxOpenedInstantiationRequestsToMso;
78 this.pollingIntervalSeconds = pollingIntervalSeconds;
79 this.versionService = versionService;
83 public void configure() {
84 maxOpenedInstantiationRequestsToMso = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_MAX_OPENED_INSTANTIATION_REQUESTS));
85 pollingIntervalSeconds = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_ASYNC_POLLING_INTERVAL_SECONDS));
88 public void deleteAll() {
89 dataAccessService.deleteDomainObjects(JobDaoImpl.class, "1=1", null);
93 public UUID add(Job job) {
94 final JobDaoImpl jobDao = castToJobDaoImpl(job);
95 jobDao.setBuild(versionService.retrieveBuildNumber());
96 dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
101 public Optional<Job> pull(Job.JobStatus topic, String ownerId) {
105 String query = sqlQueryForTopic(topic);
106 List<JobDaoImpl> jobs = dataAccessService.executeSQLQuery(query, JobDaoImpl.class, null);
107 if (jobs.isEmpty()) {
108 return Optional.empty();
111 daoJob = jobs.get(0);
113 final UUID uuid = daoJob.getUuid();
114 final Integer age = daoJob.getAge();
116 daoJob.setTakenBy(ownerId);
118 // It might become that daoJob was taken and pushed-back already, before we
119 // arrived here, so we're verifying the age was not pushed forward.
120 // Age is actually forwarded upon pushBack().
121 String hqlUpdate = "update JobDaoImpl job set job.takenBy = :takenBy where " +
123 " and job.age = :age" +
124 " and takenBy is null";
125 updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
126 session.createQuery(hqlUpdate)
127 .setText("id", uuid.toString())
128 .setInteger("age", age)
129 .setText("takenBy", ownerId)
132 } while (updatedEntities == 0);
134 return Optional.ofNullable(daoJob);
137 private java.sql.Timestamp nowMinusInterval() {
138 return nowMinusInterval(pollingIntervalSeconds);
141 private java.sql.Timestamp nowMinusInterval(long seconds) {
142 return Timestamp.valueOf(LocalDateTime.now().minusSeconds(seconds));
145 private String selectQueryByJobStatus(Job.JobStatus topic){
147 String intervalCondition = (topic==CREATING) ? "" : (" and MODIFIED_DATE <= '" + nowMinusInterval()+"'");
149 "select * from VID_JOB" +
151 // select only non-deleted in-progress jobs
152 filterByStatusNotTakenNotDeleted(topic) +
153 // give some breath, don't select jos that were recently reached
155 // take the oldest handled one
156 " order by MODIFIED_DATE ASC" +
157 // select only one result
162 private String filterByStatusNotTakenNotDeleted(Job.JobStatus topic) {
163 return " JOB_STATUS = '" + topic + "'" +
164 " and TAKEN_BY is null" +
165 " and DELETED_AT is null "+
166 " and BUILD = '"+ versionService.retrieveBuildNumber() +"'";
169 private String sqlQueryForTopic(Job.JobStatus topic) {
172 case RESOURCE_IN_PROGRESS:
174 return selectQueryByJobStatus(topic);
176 return selectQueryForPendingJob();
177 case PENDING_RESOURCE:
178 return selectQueryForPendingResource();
180 throw new GenericUncheckedException("Unsupported topic to pull from: " + topic);
185 private String selectQueryForPendingJob() {
187 // select only pending jobs
188 "select vid_job.* from VID_JOB " +
189 // select users have in_progress jobs
191 " (select user_Id, 1 as has_any_in_progress_job from VID_JOB where JOB_STATUS = 'IN_PROGRESS' or TAKEN_BY IS NOT NULL \n" +
192 "group by user_id) users_have_any_in_progress_job_tbl\n" +
193 "on vid_job.user_id = users_have_any_in_progress_job_tbl.user_id " +
194 "where "+filterByStatusNotTakenNotDeleted(Job.JobStatus.PENDING)+" and (\n" +
195 // limit in-progress to some amount
196 "select sum(CASE WHEN JOB_STATUS='IN_PROGRESS' or (JOB_STATUS='PENDING' and TAKEN_BY IS NOT NULL) THEN 1 ELSE 0 END) as in_progress\n" +
197 "from VID_JOB ) <" + maxOpenedInstantiationRequestsToMso + " \n " +
198 // don't take jobs from templates that already in-progress/failed
199 "and TEMPLATE_Id not in \n" +
200 "(select TEMPLATE_Id from vid_job where" +
201 " TEMPLATE_Id IS NOT NULL and("+
202 " (JOB_STATUS='FAILED' and DELETED_AT is null)" + // failed but not deleted
203 " or JOB_STATUS='IN_PROGRESS'" +
204 " or TAKEN_BY IS NOT NULL))" + " \n " +
205 // prefer older jobs, but the earlier in each bulk
206 "order by has_any_in_progress_job, CREATED_DATE, INDEX_IN_BULK " +
207 // select only one result
212 private String selectQueryForPendingResource() {
213 return "select * from vid_job as JOB left join \n" +
215 "(select template_id,count(*) as in_progress_count from vid_job \n" +
216 String.format("where (\n"+
218 //with job_status IN_PROGRESS or RESOURCE_IN_PROGRESS
219 " (job_status in ('%s','%s') and DELETED_AT is NULL) \n",IN_PROGRESS, RESOURCE_IN_PROGRESS)+
220 //or that with job_status PENDING_RESOURCE that are taken
221 String.format(" or (JOB_STATUS='%s' and TAKEN_BY IS NOT NULL)\n )\n", PENDING_RESOURCE) +
222 //with template ID and are not deleted
223 " and TEMPLATE_ID IS NOT NULL and DELETED_AT is NULL\n)\n" +
224 //join them to vid_job by template_id
225 "group by template_id)\n"+
226 "as COUNTER on COUNTER.template_id=JOB.template_id \n" +
229 //select jobs with job_status PENDING_RESOURCE that are nit taken and not deleted
230 filterByStatusNotTakenNotDeleted(PENDING_RESOURCE) + "\n" +
231 //that have no count in the counter (no other in progress job with same templateId)
232 " and in_progress_count is NULL \n" +
233 //and that have valid templateId
234 " and JOB.template_id is not NULL \n"+
236 //INDEX_IN_BULK is for order them inside same templateId,
237 //template_id - for order between different templateId (just to be deterministic)
238 "order by INDEX_IN_BULK,JOB.template_id \n" +
244 public void pushBack(Job job) {
245 final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, job.getUuid(), null);
247 if (remoteDaoJob == null) {
248 throw new IllegalStateException("Can push back only pulled jobs. Add new jobs using add()");
251 if (remoteDaoJob.getTakenBy() == null) {
252 throw new IllegalStateException("Can push back only pulled jobs. This one already pushed back.");
255 final JobDaoImpl jobDao = castToJobDaoImpl(job);
257 jobDao.setTakenBy(null);
259 Integer age = jobDao.getAge();
260 jobDao.setAge(age + 1);
262 logger.debug(EELFLoggerDelegate.debugLogger, "pushing back jobDao {} of {}: {}/{}",
263 StringUtils.substring(String.valueOf(jobDao.getUuid()), 0, 8),
264 StringUtils.substring(String.valueOf(jobDao.getTemplateId()), 0, 8),
265 jobDao.getStatus(), jobDao.getType());
267 dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
270 private JobDaoImpl castToJobDaoImpl(Job job) {
271 if (!(job instanceof JobDaoImpl)) {
272 throw new UnsupportedOperationException("Can't add " + job.getClass() + " to " + this.getClass());
274 return (JobDaoImpl) job;
278 public Collection<Job> peek() {
279 return dataAccessService.getList(JobDaoImpl.class, null);
283 public Job peek(UUID jobId) {
284 return (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null);
288 public void delete(UUID jobId) {
290 Date now = new Date();
292 String hqlUpdate = "update JobDaoImpl job set job.deletedAt = :now where " +
294 " and job.status in(:pending, :stopped)" +
295 " and takenBy is null";
297 updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
298 session.createQuery(hqlUpdate)
299 .setTimestamp("now", now)
300 .setText("id", jobId.toString())
301 .setText("pending", Job.JobStatus.PENDING.toString())
302 .setText("stopped", Job.JobStatus.STOPPED.toString())
305 if (updatedEntities == 0) {
306 final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null);
308 if (remoteDaoJob == null || remoteDaoJob.getUuid() == null) {
309 logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service does not exist", jobId);
310 throw new OperationNotAllowedException("Service does not exist");
313 if ((remoteDaoJob.getStatus() != Job.JobStatus.PENDING) && (remoteDaoJob.getStatus() != Job.JobStatus.STOPPED) || !StringUtils.isEmpty(remoteDaoJob.getTakenBy()) ) {
314 logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service status does not allow deletion from the queue, status = {}", jobId, remoteDaoJob.getStatus() +
315 ", takenBy " + remoteDaoJob.getTakenBy());
316 throw new OperationNotAllowedException("Service status does not allow deletion from the queue");
319 throw new OperationNotAllowedException("Service deletion failed");
324 public boolean mute(UUID jobId) {
329 final String prefix = "DUMP";
332 // Changing the topic (i.e. `job.status`) makes the job non-fetchable.
333 String hqlUpdate = "" +
334 "update JobDaoImpl job set" +
335 " job.status = concat('" + prefix + "_', job.status)," +
336 // empty `takenBy`, because some logics treat taken as in-progress
340 // if prefix already on the topic -- no need to do it twice.
341 " and job.status NOT LIKE '" + prefix + "\\_%'";
343 updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
344 session.createQuery(hqlUpdate)
345 .setText("id", jobId.toString())
348 return updatedEntities != 0;
351 private static String sqlListOfFinalStatus =
352 String.format("(%s)",
353 FINAL_STATUS.stream().
354 map(x->String.format("'%s'",x)).
355 collect(Collectors.joining(","))
359 public void deleteOldFinalJobs(long secondsAgo) {
360 String select = String.format(" MODIFIED_DATE <= '%s' and JOB_STATUS in %s", nowMinusInterval(secondsAgo), sqlListOfFinalStatus);
361 dataAccessService.deleteDomainObjects(JobDaoImpl.class, select, null);