2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 - 2019 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.vid.job.impl;
23 import static org.onap.vid.job.Job.JobStatus.CREATING;
24 import static org.onap.vid.job.Job.JobStatus.FINAL_STATUS;
25 import static org.onap.vid.job.Job.JobStatus.IN_PROGRESS;
26 import static org.onap.vid.job.Job.JobStatus.PENDING_RESOURCE;
27 import static org.onap.vid.job.Job.JobStatus.RESOURCE_IN_PROGRESS;
29 import java.sql.Timestamp;
30 import java.time.LocalDateTime;
31 import java.util.Collection;
32 import java.util.Date;
33 import java.util.List;
34 import java.util.Optional;
35 import java.util.UUID;
36 import java.util.stream.Collectors;
37 import javax.annotation.PostConstruct;
38 import org.apache.commons.lang3.StringUtils;
39 import org.hibernate.SessionFactory;
40 import org.jetbrains.annotations.NotNull;
41 import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate;
42 import org.onap.portalsdk.core.service.DataAccessService;
43 import org.onap.portalsdk.core.util.SystemProperties;
44 import org.onap.vid.exceptions.GenericUncheckedException;
45 import org.onap.vid.exceptions.OperationNotAllowedException;
46 import org.onap.vid.job.Job;
47 import org.onap.vid.job.JobsBrokerService;
48 import org.onap.vid.properties.Features;
49 import org.onap.vid.properties.VidProperties;
50 import org.onap.vid.services.VersionService;
51 import org.onap.vid.utils.DaoUtils;
52 import org.springframework.beans.factory.annotation.Autowired;
53 import org.springframework.beans.factory.annotation.Value;
54 import org.springframework.stereotype.Service;
55 import org.togglz.core.manager.FeatureManager;
58 public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
60 static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(JobsBrokerServiceInDatabaseImpl.class);
62 private final DataAccessService dataAccessService;
64 private final SessionFactory sessionFactory;
65 private int maxOpenedInstantiationRequestsToMso;
66 private int pollingIntervalSeconds;
68 private final VersionService versionService;
69 private final FeatureManager featureManager;
71 public JobsBrokerServiceInDatabaseImpl(DataAccessService dataAccessService,
72 SessionFactory sessionFactory,
73 @Value("0") int maxOpenedInstantiationRequestsToMso,
74 @Value("10") int pollingIntervalSeconds,
75 VersionService versionService,
76 FeatureManager featureManager) {
77 // tha @Value will inject conservative defaults; overridden in @PostConstruct from configuration
78 this.dataAccessService = dataAccessService;
79 this.sessionFactory = sessionFactory;
80 this.maxOpenedInstantiationRequestsToMso = maxOpenedInstantiationRequestsToMso;
81 this.pollingIntervalSeconds = pollingIntervalSeconds;
82 this.versionService = versionService;
83 this.featureManager = featureManager;
87 public void configure() {
88 maxOpenedInstantiationRequestsToMso = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_MAX_OPENED_INSTANTIATION_REQUESTS));
89 pollingIntervalSeconds = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_ASYNC_POLLING_INTERVAL_SECONDS));
92 public void deleteAll() {
93 dataAccessService.deleteDomainObjects(JobDaoImpl.class, "1=1", null);
97 public UUID add(Job job) {
98 final JobDaoImpl jobDao = castToJobDaoImpl(job);
99 jobDao.setBuild(versionService.retrieveBuildNumber());
100 dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
101 return job.getUuid();
105 public Optional<Job> pull(Job.JobStatus topic, String ownerId) {
109 String query = sqlQueryForTopic(topic);
110 List<JobDaoImpl> jobs = dataAccessService.executeSQLQuery(query, JobDaoImpl.class, null);
111 if (jobs.isEmpty()) {
112 return Optional.empty();
115 daoJob = jobs.get(0);
117 final UUID uuid = daoJob.getUuid();
118 final Integer age = daoJob.getAge();
120 daoJob.setTakenBy(ownerId);
122 // It might become that daoJob was taken and pushed-back already, before we
123 // arrived here, so we're verifying the age was not pushed forward.
124 // Age is actually forwarded upon pushBack().
125 String hqlUpdate = "update JobDaoImpl job set job.takenBy = :takenBy where " +
127 " and job.age = :age" +
128 " and takenBy is null";
129 updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
130 session.createQuery(hqlUpdate)
131 .setText("id", uuid.toString())
132 .setInteger("age", age)
133 .setText("takenBy", ownerId)
136 } while (updatedEntities == 0);
138 return Optional.ofNullable(daoJob);
141 private java.sql.Timestamp nowMinusInterval() {
142 return nowMinusInterval(pollingIntervalSeconds);
145 private java.sql.Timestamp nowMinusInterval(long seconds) {
146 return Timestamp.valueOf(LocalDateTime.now().minusSeconds(seconds));
149 private String selectQueryByJobStatus(Job.JobStatus topic){
151 String intervalCondition = (topic==CREATING) ? "" : (" and MODIFIED_DATE <= '" + nowMinusInterval()+"'");
153 "select * from VID_JOB" +
155 // select only non-deleted in-progress jobs
156 filterByStatusNotTakenNotDeleted(topic) +
157 // give some breath, don't select jos that were recently reached
159 // take the oldest handled one
160 " order by MODIFIED_DATE ASC" +
161 // select only one result
166 private String filterByStatusNotTakenNotDeleted(Job.JobStatus topic) {
167 return " JOB_STATUS = '" + topic + "'" +
168 " and TAKEN_BY is null" +
169 " and DELETED_AT is null "+
170 " and BUILD = '"+ versionService.retrieveBuildNumber() +"'";
173 private String sqlQueryForTopic(Job.JobStatus topic) {
176 case RESOURCE_IN_PROGRESS:
178 return selectQueryByJobStatus(topic);
180 return selectQueryForPendingJob();
181 case PENDING_RESOURCE:
182 return selectQueryForPendingResource();
184 throw new GenericUncheckedException("Unsupported topic to pull from: " + topic);
189 private String selectQueryForPendingJob() {
191 // select only pending jobs
192 "select vid_job.* from VID_JOB " +
193 // select users have in_progress jobs
195 " (select user_Id, 1 as has_any_in_progress_job from VID_JOB where JOB_STATUS = 'IN_PROGRESS' or TAKEN_BY IS NOT NULL \n" +
196 "group by user_id) users_have_any_in_progress_job_tbl\n" +
197 "on vid_job.user_id = users_have_any_in_progress_job_tbl.user_id " +
198 "where "+filterByStatusNotTakenNotDeleted(Job.JobStatus.PENDING)+" and (\n" +
199 // limit in-progress to some amount
200 "select sum(CASE WHEN JOB_STATUS='IN_PROGRESS' or (JOB_STATUS='PENDING' and TAKEN_BY IS NOT NULL) THEN 1 ELSE 0 END) as in_progress\n" +
201 "from VID_JOB ) <" + maxOpenedInstantiationRequestsToMso + " \n " +
202 // don't take jobs from templates that already in-progress/failed
203 "and TEMPLATE_Id not in \n" +
204 "(select TEMPLATE_Id from vid_job where" +
205 " TEMPLATE_Id IS NOT NULL and("+
206 " (JOB_STATUS IN('FAILED','FAILED_AND_PAUSED') and DELETED_AT is null)" + // failed but not deleted
207 " or JOB_STATUS='IN_PROGRESS'" +
208 " or TAKEN_BY IS NOT NULL))" + " \n " +
209 // prefer older jobs, but the earlier in each bulk
210 "order by has_any_in_progress_job, CREATED_DATE, INDEX_IN_BULK " +
211 // select only one result
216 private String selectQueryForPendingResource() {
217 return "select * from vid_job as JOB left join \n" +
219 "(select template_id,count(*) as in_progress_count from vid_job \n" +
220 String.format("where (\n"+
222 //with job_status IN_PROGRESS or RESOURCE_IN_PROGRESS
223 " (job_status in ('%s','%s') and DELETED_AT is NULL) \n",IN_PROGRESS, RESOURCE_IN_PROGRESS)+
224 //or that with job_status PENDING_RESOURCE that are taken
225 String.format(" or (JOB_STATUS='%s' and TAKEN_BY IS NOT NULL)\n )\n", PENDING_RESOURCE) +
226 //with template ID and are not deleted
227 " and TEMPLATE_ID IS NOT NULL and DELETED_AT is NULL\n)\n" +
228 //join them to vid_job by template_id
229 "group by template_id)\n"+
230 "as COUNTER on COUNTER.template_id=JOB.template_id \n" +
233 //select jobs with job_status PENDING_RESOURCE that are nit taken and not deleted
234 filterByStatusNotTakenNotDeleted(PENDING_RESOURCE) + "\n" +
235 //that have no count in the counter (no other in progress job with same templateId)
236 " and in_progress_count is NULL \n" +
237 //and that have valid templateId
238 " and JOB.template_id is not NULL \n"+
240 filterFailedStatusForPendingResource()
243 //INDEX_IN_BULK is for order them inside same templateId,
244 //template_id - for order between different templateId (just to be deterministic)
245 "order by INDEX_IN_BULK,JOB.template_id \n" +
248 private String filterFailedStatusForPendingResource() {
249 String sql = "and JOB.template_id not in \n" +
250 "(select TEMPLATE_Id from vid_job where" +
251 " TEMPLATE_Id IS NOT NULL and (JOB_STATUS IN('FAILED','FAILED_AND_PAUSED') "
252 + " AND JOB_TYPE NOT IN('NetworkInstantiation','InstanceGroup','InstanceGroupMember') and DELETED_AT is null)" + // failed but not deleted
253 " or TAKEN_BY IS NOT NULL)";
254 return featureManager.isActive(Features.FLAG_2008_PAUSE_VFMODULE_INSTANTIATION_FAILURE) ?
259 public void pushBack(Job job) {
260 final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, job.getUuid(), null);
262 if (remoteDaoJob == null) {
263 throw new IllegalStateException("Can push back only pulled jobs. Add new jobs using add()");
266 if (remoteDaoJob.getTakenBy() == null) {
267 throw new IllegalStateException("Can push back only pulled jobs. This one already pushed back.");
270 final JobDaoImpl jobDao = castToJobDaoImpl(job);
272 jobDao.setTakenBy(null);
274 Integer age = jobDao.getAge();
275 jobDao.setAge(age + 1);
277 logger.debug(EELFLoggerDelegate.debugLogger, "pushing back jobDao {} of {}: {}/{}",
278 StringUtils.substring(String.valueOf(jobDao.getUuid()), 0, 8),
279 StringUtils.substring(String.valueOf(jobDao.getTemplateId()), 0, 8),
280 jobDao.getStatus(), jobDao.getType());
282 dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
285 private JobDaoImpl castToJobDaoImpl(Job job) {
286 if (!(job instanceof JobDaoImpl)) {
287 throw new UnsupportedOperationException("Can't add " + job.getClass() + " to " + this.getClass());
289 return (JobDaoImpl) job;
293 public Collection<Job> peek() {
294 return dataAccessService.getList(JobDaoImpl.class, null);
298 public Job peek(UUID jobId) {
299 return (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null);
303 public void delete(UUID jobId) {
305 Date now = new Date();
307 String hqlUpdate = "update JobDaoImpl job set job.deletedAt = :now where " +
309 " and job.status in(:pending, :stopped)" +
310 " and takenBy is null";
312 updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
313 session.createQuery(hqlUpdate)
314 .setTimestamp("now", now)
315 .setText("id", jobId.toString())
316 .setText("pending", Job.JobStatus.PENDING.toString())
317 .setText("stopped", Job.JobStatus.STOPPED.toString())
320 if (updatedEntities == 0) {
321 final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null);
323 if (remoteDaoJob == null || remoteDaoJob.getUuid() == null) {
324 logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service does not exist", jobId);
325 throw new OperationNotAllowedException("Service does not exist");
328 if ((remoteDaoJob.getStatus() != Job.JobStatus.PENDING) && (remoteDaoJob.getStatus() != Job.JobStatus.STOPPED) || !StringUtils.isEmpty(remoteDaoJob.getTakenBy()) ) {
329 logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service status does not allow deletion from the queue, status = {}", jobId, remoteDaoJob.getStatus() +
330 ", takenBy " + remoteDaoJob.getTakenBy());
331 throw new OperationNotAllowedException("Service status does not allow deletion from the queue");
334 throw new OperationNotAllowedException("Service deletion failed");
339 public boolean mute(UUID jobId) {
344 final String prefix = "DUMP";
347 // Changing the topic (i.e. `job.status`) makes the job non-fetchable.
348 String hqlUpdate = "" +
349 "update JobDaoImpl job set" +
350 " job.status = concat('" + prefix + "_', job.status)," +
351 // empty `takenBy`, because some logics treat taken as in-progress
355 // if prefix already on the topic -- no need to do it twice.
356 " and job.status NOT LIKE '" + prefix + "\\_%'";
358 updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
359 session.createQuery(hqlUpdate)
360 .setText("id", jobId.toString())
363 return updatedEntities != 0;
366 private static String sqlListOfFinalStatus =
367 String.format("(%s)",
368 FINAL_STATUS.stream().
369 map(x->String.format("'%s'",x)).
370 collect(Collectors.joining(","))
374 public void deleteOldFinalJobs(long secondsAgo) {
375 String select = String.format(" MODIFIED_DATE <= '%s' and JOB_STATUS in %s", nowMinusInterval(secondsAgo), sqlListOfFinalStatus);
376 dataAccessService.deleteDomainObjects(JobDaoImpl.class, select, null);