X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=vid-app-common%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fvid%2Fjob%2Fimpl%2FJobsBrokerServiceInDatabaseImpl.java;h=74a7294940619e138219d0ae5c5bf7690c08066c;hb=e601bbdc43bae9a08e2e10c5139a6f76b47860d7;hp=59ca43743cf45a2a16cd3842944aa852d3ad1df0;hpb=76c6ee4a697617ec4cdee2f3b48bc83136c858c5;p=vid.git diff --git a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java index 59ca43743..74a729494 100644 --- a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java +++ b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -22,6 +22,7 @@ package org.onap.vid.job.impl; import org.apache.commons.lang3.StringUtils; import org.hibernate.SessionFactory; +import org.jetbrains.annotations.NotNull; import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate; import org.onap.portalsdk.core.service.DataAccessService; import org.onap.portalsdk.core.util.SystemProperties; @@ -30,18 +31,19 @@ import org.onap.vid.exceptions.OperationNotAllowedException; import org.onap.vid.job.Job; import org.onap.vid.job.JobsBrokerService; import org.onap.vid.properties.VidProperties; +import org.onap.vid.services.VersionService; import org.onap.vid.utils.DaoUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; -import java.nio.ByteBuffer; import java.sql.Timestamp; import java.time.LocalDateTime; import java.util.*; +import java.util.stream.Collectors; -import static org.onap.vid.job.Job.JobStatus.CREATING; +import static org.onap.vid.job.Job.JobStatus.*; @Service public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService { @@ -54,15 +56,20 @@ public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService { private int maxOpenedInstantiationRequestsToMso; private int pollingIntervalSeconds; + private final VersionService versionService; + @Autowired - public JobsBrokerServiceInDatabaseImpl(DataAccessService dataAccessService, SessionFactory sessionFactory, + public JobsBrokerServiceInDatabaseImpl(DataAccessService dataAccessService, + SessionFactory sessionFactory, @Value("0") int maxOpenedInstantiationRequestsToMso, - @Value("10") int pollingIntervalSeconds) { + @Value("10") int pollingIntervalSeconds, + VersionService versionService) { // tha @Value will inject conservative defaults; overridden in @PostConstruct from configuration this.dataAccessService = dataAccessService; this.sessionFactory = sessionFactory; this.maxOpenedInstantiationRequestsToMso = maxOpenedInstantiationRequestsToMso; this.pollingIntervalSeconds = pollingIntervalSeconds; + this.versionService = versionService; } @PostConstruct @@ -78,6 +85,7 @@ public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService { @Override public UUID add(Job job) { final JobDaoImpl jobDao = castToJobDaoImpl(job); + jobDao.setBuild(versionService.retrieveBuildNumber()); dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap()); return job.getUuid(); } @@ -120,7 +128,11 @@ public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService { } private java.sql.Timestamp nowMinusInterval() { - return Timestamp.valueOf(LocalDateTime.now().minusSeconds(pollingIntervalSeconds)); + return nowMinusInterval(pollingIntervalSeconds); + } + + private java.sql.Timestamp nowMinusInterval(long seconds) { + return Timestamp.valueOf(LocalDateTime.now().minusSeconds(seconds)); } private String selectQueryByJobStatus(Job.JobStatus topic){ @@ -130,17 +142,23 @@ public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService { "select * from VID_JOB" + " where" + // select only non-deleted in-progress jobs - " JOB_STATUS = '" + topic + "'" + - " and TAKEN_BY is null" + - " and DELETED_AT is null" + + filterByStatusNotTakenNotDeleted(topic) + // give some breath, don't select jos that were recently reached - intervalCondition + + intervalCondition + // take the oldest handled one " order by MODIFIED_DATE ASC" + // select only one result " limit 1"; } + @NotNull + private String filterByStatusNotTakenNotDeleted(Job.JobStatus topic) { + return " JOB_STATUS = '" + topic + "'" + + " and TAKEN_BY is null" + + " and DELETED_AT is null "+ + " and BUILD = '"+ versionService.retrieveBuildNumber() +"'"; + } + private String sqlQueryForTopic(Job.JobStatus topic) { switch (topic) { case IN_PROGRESS: @@ -148,44 +166,73 @@ public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService { case CREATING: return selectQueryByJobStatus(topic); case PENDING: - return "" + - // select only pending jobs - "select vid_job.* from VID_JOB " + - // select users have in_progress jobs - "left join \n" + - " (select user_Id, 1 as has_any_in_progress_job from VID_JOB where JOB_STATUS = 'IN_PROGRESS' or TAKEN_BY IS NOT NULL \n" + - "group by user_id) users_have_any_in_progress_job_tbl\n" + - "on vid_job.user_id = users_have_any_in_progress_job_tbl.user_id " + - "where JOB_STATUS = 'PENDING' and TAKEN_BY is null" + - // job is not deleted - " AND DELETED_AT is null and (\n" + - // limit in-progress to some amount - "select sum(CASE WHEN JOB_STATUS='IN_PROGRESS' or (JOB_STATUS='PENDING' and TAKEN_BY IS NOT NULL) THEN 1 ELSE 0 END) as in_progress\n" + - "from VID_JOB ) <" + maxOpenedInstantiationRequestsToMso + " \n " + - // don't take jobs from templates that already in-progress/failed - "and TEMPLATE_Id not in \n" + - "(select TEMPLATE_Id from vid_job where" + - " TEMPLATE_Id IS NOT NULL and("+ - " (JOB_STATUS='FAILED' and DELETED_AT is null)" + // failed but not deleted - " or JOB_STATUS='IN_PROGRESS'" + - " or TAKEN_BY IS NOT NULL))" + " \n " + - // prefer older jobs, but the earlier in each bulk - "order by has_any_in_progress_job, CREATED_DATE, INDEX_IN_BULK " + - // select only one result - "limit 1"; + return selectQueryForPendingJob(); + case PENDING_RESOURCE: + return selectQueryForPendingResource(); default: throw new GenericUncheckedException("Unsupported topic to pull from: " + topic); } } + @NotNull + private String selectQueryForPendingJob() { + return "" + + // select only pending jobs + "select vid_job.* from VID_JOB " + + // select users have in_progress jobs + "left join \n" + + " (select user_Id, 1 as has_any_in_progress_job from VID_JOB where JOB_STATUS = 'IN_PROGRESS' or TAKEN_BY IS NOT NULL \n" + + "group by user_id) users_have_any_in_progress_job_tbl\n" + + "on vid_job.user_id = users_have_any_in_progress_job_tbl.user_id " + + "where "+filterByStatusNotTakenNotDeleted(Job.JobStatus.PENDING)+" and (\n" + + // limit in-progress to some amount + "select sum(CASE WHEN JOB_STATUS='IN_PROGRESS' or (JOB_STATUS='PENDING' and TAKEN_BY IS NOT NULL) THEN 1 ELSE 0 END) as in_progress\n" + + "from VID_JOB ) <" + maxOpenedInstantiationRequestsToMso + " \n " + + // don't take jobs from templates that already in-progress/failed + "and TEMPLATE_Id not in \n" + + "(select TEMPLATE_Id from vid_job where" + + " TEMPLATE_Id IS NOT NULL and("+ + " (JOB_STATUS='FAILED' and DELETED_AT is null)" + // failed but not deleted + " or JOB_STATUS='IN_PROGRESS'" + + " or TAKEN_BY IS NOT NULL))" + " \n " + + // prefer older jobs, but the earlier in each bulk + "order by has_any_in_progress_job, CREATED_DATE, INDEX_IN_BULK " + + // select only one result + "limit 1"; + } - private byte[] getUuidAsByteArray(UUID owner) { - ByteBuffer bb = ByteBuffer.wrap(new byte[16]); - bb.putLong(owner.getMostSignificantBits()); - bb.putLong(owner.getLeastSignificantBits()); - return bb.array(); + @NotNull + private String selectQueryForPendingResource() { + return "select * from vid_job as JOB left join \n" + + //count jobs + "(select template_id,count(*) as in_progress_count from vid_job \n" + + String.format("where (\n"+ + " (\n"+ + //with job_status IN_PROGRESS or RESOURCE_IN_PROGRESS + " (job_status in ('%s','%s') and DELETED_AT is NULL) \n",IN_PROGRESS, RESOURCE_IN_PROGRESS)+ + //or that with job_status PENDING_RESOURCE that are taken + String.format(" or (JOB_STATUS='%s' and TAKEN_BY IS NOT NULL)\n )\n", PENDING_RESOURCE) + + //with template ID and are not deleted + " and TEMPLATE_ID IS NOT NULL and DELETED_AT is NULL\n)\n" + + //join them to vid_job by template_id + "group by template_id)\n"+ + "as COUNTER on COUNTER.template_id=JOB.template_id \n" + + + "where (\n"+ + //select jobs with job_status PENDING_RESOURCE that are nit taken and not deleted + filterByStatusNotTakenNotDeleted(PENDING_RESOURCE) + "\n" + + //that have no count in the counter (no other in progress job with same templateId) + " and in_progress_count is NULL \n" + + //and that have valid templateId + " and JOB.template_id is not NULL \n"+ + ")\n" + + //INDEX_IN_BULK is for order them inside same templateId, + //template_id - for order between different templateId (just to be deterministic) + "order by INDEX_IN_BULK,JOB.template_id \n" + + "limit 1;"; } + @Override public void pushBack(Job job) { final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, job.getUuid(), null); @@ -253,7 +300,7 @@ public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService { throw new OperationNotAllowedException("Service does not exist"); } - if (!remoteDaoJob.getStatus().equals(Job.JobStatus.PENDING) && !remoteDaoJob.getStatus().equals(Job.JobStatus.STOPPED) || !StringUtils.isEmpty(remoteDaoJob.getTakenBy()) ) { + if ((remoteDaoJob.getStatus() != Job.JobStatus.PENDING) && (remoteDaoJob.getStatus() != Job.JobStatus.STOPPED) || !StringUtils.isEmpty(remoteDaoJob.getTakenBy()) ) { logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service status does not allow deletion from the queue, status = {}", jobId, remoteDaoJob.getStatus() + ", takenBy " + remoteDaoJob.getTakenBy()); throw new OperationNotAllowedException("Service status does not allow deletion from the queue"); @@ -290,4 +337,17 @@ public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService { return updatedEntities != 0; } + + private static String sqlListOfFinalStatus = + String.format("(%s)", + FINAL_STATUS.stream(). + map(x->String.format("'%s'",x)). + collect(Collectors.joining(",")) + ); + + @Override + public void deleteOldFinalJobs(long secondsAgo) { + String select = String.format(" MODIFIED_DATE <= '%s' and JOB_STATUS in %s", nowMinusInterval(secondsAgo), sqlListOfFinalStatus); + dataAccessService.deleteDomainObjects(JobDaoImpl.class, select, null); + } }