Implant vid-app-common org.onap.vid.job (main and test)
[vid.git] / vid-app-common / src / main / java / org / onap / vid / job / impl / JobsBrokerServiceInDatabaseImpl.java
index 59ca437..74a7294 100644 (file)
@@ -7,9 +7,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,6 +22,7 @@ package org.onap.vid.job.impl;
 
 import org.apache.commons.lang3.StringUtils;
 import org.hibernate.SessionFactory;
+import org.jetbrains.annotations.NotNull;
 import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate;
 import org.onap.portalsdk.core.service.DataAccessService;
 import org.onap.portalsdk.core.util.SystemProperties;
@@ -30,18 +31,19 @@ import org.onap.vid.exceptions.OperationNotAllowedException;
 import org.onap.vid.job.Job;
 import org.onap.vid.job.JobsBrokerService;
 import org.onap.vid.properties.VidProperties;
+import org.onap.vid.services.VersionService;
 import org.onap.vid.utils.DaoUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.PostConstruct;
-import java.nio.ByteBuffer;
 import java.sql.Timestamp;
 import java.time.LocalDateTime;
 import java.util.*;
+import java.util.stream.Collectors;
 
-import static org.onap.vid.job.Job.JobStatus.CREATING;
+import static org.onap.vid.job.Job.JobStatus.*;
 
 @Service
 public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
@@ -54,15 +56,20 @@ public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
     private int maxOpenedInstantiationRequestsToMso;
     private int pollingIntervalSeconds;
 
+    private final VersionService versionService;
+
     @Autowired
-    public JobsBrokerServiceInDatabaseImpl(DataAccessService dataAccessService, SessionFactory sessionFactory,
+    public JobsBrokerServiceInDatabaseImpl(DataAccessService dataAccessService,
+                                           SessionFactory sessionFactory,
                                            @Value("0") int maxOpenedInstantiationRequestsToMso,
-                                           @Value("10") int pollingIntervalSeconds) {
+                                           @Value("10") int pollingIntervalSeconds,
+                                           VersionService versionService) {
         // tha @Value will inject conservative defaults; overridden in @PostConstruct from configuration
         this.dataAccessService = dataAccessService;
         this.sessionFactory = sessionFactory;
         this.maxOpenedInstantiationRequestsToMso = maxOpenedInstantiationRequestsToMso;
         this.pollingIntervalSeconds = pollingIntervalSeconds;
+        this.versionService = versionService;
     }
 
     @PostConstruct
@@ -78,6 +85,7 @@ public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
     @Override
     public UUID add(Job job) {
         final JobDaoImpl jobDao = castToJobDaoImpl(job);
+        jobDao.setBuild(versionService.retrieveBuildNumber());
         dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
         return job.getUuid();
     }
@@ -120,7 +128,11 @@ public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
     }
 
     private java.sql.Timestamp nowMinusInterval() {
-        return Timestamp.valueOf(LocalDateTime.now().minusSeconds(pollingIntervalSeconds));
+        return nowMinusInterval(pollingIntervalSeconds);
+    }
+
+    private java.sql.Timestamp nowMinusInterval(long seconds) {
+        return Timestamp.valueOf(LocalDateTime.now().minusSeconds(seconds));
     }
 
     private String selectQueryByJobStatus(Job.JobStatus topic){
@@ -130,17 +142,23 @@ public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
                 "select * from VID_JOB" +
                 " where" +
                 // select only non-deleted in-progress jobs
-                "    JOB_STATUS = '" + topic + "'" +
-                "    and TAKEN_BY is null" +
-                "    and DELETED_AT is null" +
+                filterByStatusNotTakenNotDeleted(topic) +
                 // give some breath, don't select jos that were recently reached
-                 intervalCondition +
+                intervalCondition +
                 // take the oldest handled one
                 " order by MODIFIED_DATE ASC" +
                 // select only one result
                 " limit 1";
     }
 
+    @NotNull
+    private String filterByStatusNotTakenNotDeleted(Job.JobStatus topic) {
+        return  "    JOB_STATUS = '" + topic + "'" +
+                "    and TAKEN_BY is null" +
+                "    and DELETED_AT is null "+
+                "    and BUILD = '"+ versionService.retrieveBuildNumber() +"'";
+    }
+
     private String sqlQueryForTopic(Job.JobStatus topic) {
         switch (topic) {
             case IN_PROGRESS:
@@ -148,44 +166,73 @@ public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
             case CREATING:
                 return selectQueryByJobStatus(topic);
             case PENDING:
-                return "" +
-                        // select only pending jobs
-                        "select vid_job.* from VID_JOB " +
-                        // select users have in_progress jobs
-                        "left join \n" +
-                        " (select user_Id, 1 as has_any_in_progress_job from VID_JOB  where JOB_STATUS = 'IN_PROGRESS' or TAKEN_BY IS NOT NULL \n" +
-                        "group by user_id)  users_have_any_in_progress_job_tbl\n" +
-                        "on vid_job.user_id = users_have_any_in_progress_job_tbl.user_id " +
-                        "where JOB_STATUS = 'PENDING' and TAKEN_BY is null" +
-                        // job is not deleted
-                        "      AND DELETED_AT is null and (\n" +
-                        // limit in-progress to some amount
-                        "select sum(CASE WHEN JOB_STATUS='IN_PROGRESS' or (JOB_STATUS='PENDING' and TAKEN_BY IS NOT NULL) THEN 1 ELSE 0 END) as in_progress\n" +
-                        "from VID_JOB ) <" + maxOpenedInstantiationRequestsToMso + " \n " +
-                        // don't take jobs from templates that already in-progress/failed
-                        "and TEMPLATE_Id not in \n" +
-                        "(select TEMPLATE_Id from vid_job where" +
-                        "   TEMPLATE_Id IS NOT NULL and("+
-                        "   (JOB_STATUS='FAILED' and DELETED_AT is null)" + // failed but not deleted
-                        "   or JOB_STATUS='IN_PROGRESS'" +
-                        "   or TAKEN_BY IS NOT NULL))" + " \n " +
-                        // prefer older jobs, but the earlier in each bulk
-                        "order by has_any_in_progress_job, CREATED_DATE, INDEX_IN_BULK " +
-                        // select only one result
-                        "limit 1";
+                return selectQueryForPendingJob();
+            case PENDING_RESOURCE:
+                return selectQueryForPendingResource();
             default:
                 throw new GenericUncheckedException("Unsupported topic to pull from: " + topic);
         }
     }
 
+    @NotNull
+    private String selectQueryForPendingJob() {
+        return "" +
+                // select only pending jobs
+                "select vid_job.* from VID_JOB " +
+                // select users have in_progress jobs
+                "left join \n" +
+                " (select user_Id, 1 as has_any_in_progress_job from VID_JOB  where JOB_STATUS = 'IN_PROGRESS' or TAKEN_BY IS NOT NULL \n" +
+                "group by user_id)  users_have_any_in_progress_job_tbl\n" +
+                "on vid_job.user_id = users_have_any_in_progress_job_tbl.user_id " +
+                "where "+filterByStatusNotTakenNotDeleted(Job.JobStatus.PENDING)+" and (\n" +
+                // limit in-progress to some amount
+                "select sum(CASE WHEN JOB_STATUS='IN_PROGRESS' or (JOB_STATUS='PENDING' and TAKEN_BY IS NOT NULL) THEN 1 ELSE 0 END) as in_progress\n" +
+                "from VID_JOB ) <" + maxOpenedInstantiationRequestsToMso + " \n " +
+                // don't take jobs from templates that already in-progress/failed
+                "and TEMPLATE_Id not in \n" +
+                "(select TEMPLATE_Id from vid_job where" +
+                "   TEMPLATE_Id IS NOT NULL and("+
+                "   (JOB_STATUS='FAILED' and DELETED_AT is null)" + // failed but not deleted
+                "   or JOB_STATUS='IN_PROGRESS'" +
+                "   or TAKEN_BY IS NOT NULL))" + " \n " +
+                // prefer older jobs, but the earlier in each bulk
+                "order by has_any_in_progress_job, CREATED_DATE, INDEX_IN_BULK " +
+                // select only one result
+                "limit 1";
+    }
 
-    private byte[] getUuidAsByteArray(UUID owner) {
-        ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
-        bb.putLong(owner.getMostSignificantBits());
-        bb.putLong(owner.getLeastSignificantBits());
-        return bb.array();
+    @NotNull
+    private String selectQueryForPendingResource() {
+        return "select * from vid_job as JOB left join \n" +
+                //count jobs
+                "(select template_id,count(*) as in_progress_count from vid_job \n" +
+                String.format("where (\n"+
+                "    (\n"+
+                //with job_status IN_PROGRESS or RESOURCE_IN_PROGRESS
+                "        (job_status in ('%s','%s') and DELETED_AT is NULL) \n",IN_PROGRESS, RESOURCE_IN_PROGRESS)+
+                //or that with job_status PENDING_RESOURCE that are taken
+                String.format("        or (JOB_STATUS='%s' and TAKEN_BY IS NOT NULL)\n    )\n", PENDING_RESOURCE) +
+                //with template ID and are not deleted
+                "    and TEMPLATE_ID IS NOT NULL and DELETED_AT is NULL\n)\n" +
+                //join them to vid_job by template_id
+                "group by template_id)\n"+
+                "as COUNTER on COUNTER.template_id=JOB.template_id \n" +
+
+                "where (\n"+
+                //select jobs with job_status PENDING_RESOURCE that are nit taken and not deleted
+                filterByStatusNotTakenNotDeleted(PENDING_RESOURCE) + "\n" +
+                //that have no count in the counter (no other in progress job with same templateId)
+                "    and in_progress_count is NULL \n" +
+                //and that have valid templateId
+                "    and JOB.template_id is not NULL \n"+
+                ")\n" +
+                //INDEX_IN_BULK is for order them inside same templateId,
+                //template_id - for order between different templateId (just to be deterministic)
+                "order by INDEX_IN_BULK,JOB.template_id \n" +
+                "limit 1;";
     }
 
+
     @Override
     public void pushBack(Job job) {
         final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, job.getUuid(), null);
@@ -253,7 +300,7 @@ public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
                 throw new OperationNotAllowedException("Service does not exist");
             }
 
-            if (!remoteDaoJob.getStatus().equals(Job.JobStatus.PENDING) && !remoteDaoJob.getStatus().equals(Job.JobStatus.STOPPED) || !StringUtils.isEmpty(remoteDaoJob.getTakenBy()) ) {
+            if ((remoteDaoJob.getStatus() != Job.JobStatus.PENDING) && (remoteDaoJob.getStatus() != Job.JobStatus.STOPPED) || !StringUtils.isEmpty(remoteDaoJob.getTakenBy()) ) {
                 logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service status does not allow deletion from the queue, status = {}", jobId, remoteDaoJob.getStatus() +
                 ", takenBy " + remoteDaoJob.getTakenBy());
                 throw new OperationNotAllowedException("Service status does not allow deletion from the queue");
@@ -290,4 +337,17 @@ public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
 
         return updatedEntities != 0;
     }
+
+    private static String sqlListOfFinalStatus =
+            String.format("(%s)",
+                FINAL_STATUS.stream().
+                map(x->String.format("'%s'",x)).
+                collect(Collectors.joining(","))
+            );
+
+    @Override
+    public void deleteOldFinalJobs(long secondsAgo) {
+        String select = String.format(" MODIFIED_DATE <= '%s' and JOB_STATUS in %s", nowMinusInterval(secondsAgo), sqlListOfFinalStatus);
+        dataAccessService.deleteDomainObjects(JobDaoImpl.class, select, null);
+    }
 }