Fixed MSB Invocation Issues
[holmes/rule-management.git] / rulemgt / src / main / java / org / onap / holmes / rulemgt / RuleAllocator.java
@@ -1,5 +1,5 @@
 /**
- * Copyright 2017 ZTE Corporation.
+ * Copyright 2017-2020 ZTE Corporation.
  * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * limitations under the License.
  */
 
-package org.onap.holmes.rulemgt.send;
+package org.onap.holmes.rulemgt;
 
 import lombok.extern.slf4j.Slf4j;
-import org.glassfish.hk2.api.ServiceLocator;
+import org.jvnet.hk2.annotations.Service;
 import org.onap.holmes.common.api.entity.CorrelationRule;
-import org.onap.holmes.common.dropwizard.ioc.utils.ServiceLocatorHolder;
 import org.onap.holmes.common.exception.CorrelationException;
 import org.onap.holmes.common.utils.DbDaoUtil;
 import org.onap.holmes.rulemgt.bolt.enginebolt.EngineWrapper;
 import org.onap.holmes.rulemgt.db.CorrelationRuleDao;
-import org.onap.holmes.rulemgt.msb.EngineInsQueryTool;
+import org.onap.holmes.rulemgt.tools.EngineTools;
 import org.onap.holmes.rulemgt.wrapper.RuleMgtWrapper;
 import org.onap.holmes.rulemgt.wrapper.RuleQueryWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import javax.annotation.PostConstruct;
+import javax.inject.Inject;
 import java.util.*;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
 
 @Slf4j
+@Service
 public class RuleAllocator {
+    private static final Logger LOGGER = LoggerFactory.getLogger(RuleAllocator.class);
+
     public final static int ENABLE = 1;
     private RuleMgtWrapper ruleMgtWrapper;
     private RuleQueryWrapper ruleQueryWrapper;
     private EngineWrapper engineWrapper;
-    private EngineInsQueryTool engineInsQueryTool;
-    private DbDaoUtil daoUtil;
+    private EngineTools engineTools;
     private CorrelationRuleDao correlationRuleDao;
-    private int latestEngineInsNum = 0;
-    private List<String> existingEngineServiceIps = new ArrayList<>();
-    private List<String> latestEngineServiceIps = new ArrayList<>();
-
-    public RuleAllocator() {
-        ServiceLocator locator = ServiceLocatorHolder.getLocator();
-        ruleMgtWrapper = locator.getService(RuleMgtWrapper.class);
-        ruleQueryWrapper = locator.getService(RuleQueryWrapper.class);
-        engineWrapper = locator.getService(EngineWrapper.class);
-        engineInsQueryTool = locator.getService(EngineInsQueryTool.class);
-        daoUtil = locator.getService(DbDaoUtil.class);
-
-        initDaoUtilAndEngineIp();
-    }
 
-    private void initDaoUtilAndEngineIp() {
+    @Inject
+    public RuleAllocator(RuleMgtWrapper ruleMgtWrapper, RuleQueryWrapper ruleQueryWrapper,
+                         EngineWrapper engineWrapper, EngineTools engineTools, DbDaoUtil daoUtil) {
+        this.ruleMgtWrapper = ruleMgtWrapper;
+        this.ruleQueryWrapper = ruleQueryWrapper;
+        this.engineWrapper = engineWrapper;
+        this.engineTools = engineTools;
         correlationRuleDao = daoUtil.getJdbiDaoByOnDemand(CorrelationRuleDao.class);
-        try {
-            existingEngineServiceIps = engineInsQueryTool.getInstanceList();
+    }
 
-        } catch (Exception e) {
-            log.warn("Failed to get the number of engine instances.", e);
-        }
+    @PostConstruct
+    private void initialize() {
+        new Timer("RuleAllocatorTimer").schedule(new TimerTask() {
+
+            public void run() {
+                try {
+                    allocateRules();
+                } catch (Exception e) {
+                    LOGGER.error("Failed to reallocate rules.", e);
+                }
+            }
+
+        }, SECONDS.toMillis(10), SECONDS.toMillis(30));
     }
 
-    public synchronized void allocateRules(List<String> latestEngineIps) throws Exception {
-        if (latestEngineIps == null) {
-            throw new NullPointerException("The parameter of " + this.getClass().getSimpleName()
-                    + ".allocateRules(List<String>) can not be null!");
+    public synchronized void allocateRules() throws Exception {
+        List<String> engines = engineTools.getInstanceList();
+
+        if (engines == null) {
+            return;
         }
 
-        latestEngineServiceIps = latestEngineIps;
-        latestEngineInsNum = latestEngineIps.size();
-        if (existingEngineServiceIps.size() < latestEngineInsNum) {
+        int numOfEngines = engines.size();
+        LOGGER.info(String.format("There are %d engine instance(s) running currently.", numOfEngines));
+
+        List<String> legacyEngineInstances = engineTools.getLegacyEngineInstances();
+        if (legacyEngineInstances == null) {
+            return;
+        }
+
+        if (legacyEngineInstances.size() < numOfEngines) {
             //extend
-            List<CorrelationRule> rules2Allocate = calculateRule(existingEngineServiceIps);
+            List<CorrelationRule> rules2Allocate = calculateRule(legacyEngineInstances, numOfEngines);
             List<CorrelationRule> rules2Delete = copyList(rules2Allocate);
-            List<String> newInstanceIds = sortOutNewEngineInstances(latestEngineServiceIps, existingEngineServiceIps);
+            List<String> newInstanceIds = sortOutNewEngineInstances(engines, legacyEngineInstances);
             distributeRules(newInstanceIds, rules2Allocate);
-            cleanUpRulesFromEngines(rules2Delete, existingEngineServiceIps);
-        } else if (existingEngineServiceIps.size() > latestEngineInsNum) {
+            cleanUpRulesFromEngines(rules2Delete, legacyEngineInstances);
+        } else {
             //destroy
-            List<String> destroyed = getDestroyedEngines(latestEngineServiceIps, existingEngineServiceIps);
-            distributeRules(getRemainingEngines(destroyed), reallocateRules(destroyed));
+            List<String> destroyed = getDestroyedEngines(engines, legacyEngineInstances);
+            distributeRules(getRemainingEngines(engines, destroyed), getRules(destroyed));
         }
-
-        existingEngineServiceIps = latestEngineServiceIps;
     }
 
     private List<CorrelationRule> copyList(List<CorrelationRule> rules) {
@@ -98,13 +110,16 @@ public class RuleAllocator {
     }
 
     // When the engine is expanding, the rules that need to be allocated are calculated.
-    private List<CorrelationRule> calculateRule(List<String> existingEngineIps) throws CorrelationException {
+    private List<CorrelationRule> calculateRule(List<String> existingEngineIps,
+                                                int latestEngineInsNum) throws CorrelationException {
         List<CorrelationRule> enabledRules = ruleQueryWrapper.queryRuleByEnable(ENABLE);
         int ruleCount = 0;
         if (enabledRules != null) {
             ruleCount = enabledRules.size();
         }
+        // Average number of rule that's to be allocate into each instance
         int count = ruleCount / latestEngineInsNum;
+        // The number of remaining rules (to be allocated) after each instance has been allocated with the average number of rules.
         int remainder = ruleCount % latestEngineInsNum;
 
         List<CorrelationRule> ret = new ArrayList<>();
@@ -117,7 +132,7 @@ public class RuleAllocator {
     }
 
     // Rules that need to be allocated after the engine is destroyed
-    private List<CorrelationRule> reallocateRules(List<String> destroyIpList) throws CorrelationException {
+    private List<CorrelationRule> getRules(List<String> destroyIpList) throws CorrelationException {
         List<CorrelationRule> rules = new ArrayList<>();
         try {
             if (destroyIpList != null) {
@@ -126,7 +141,7 @@ public class RuleAllocator {
                 }
             }
         } catch (CorrelationException e) {
-            log.error("method reallocateRules get data from DB failed !", e);
+            LOGGER.error("method getRules get data from DB failed !", e);
         }
         return rules;
     }
@@ -155,9 +170,9 @@ public class RuleAllocator {
     }
 
     // Residual IP after destruction
-    private List<String> getRemainingEngines(List<String> destroyed) {
+    private List<String> getRemainingEngines(List<String> all, List<String> destroyed) {
         List<String> ret = new ArrayList<>();
-        for (String ip : latestEngineServiceIps) {
+        for (String ip : all) {
             if (!destroyed.contains(ip)) {
                 ret.add(ip);
             }
@@ -190,7 +205,7 @@ public class RuleAllocator {
                 }
             }
         } catch (Exception e) {
-            log.error("getEngineIp4AddRule failed !", e);
+            LOGGER.error("getEngineWithLeastRules failed !", e);
         }
 
         List<Map.Entry<String, Integer>> sortedEntries = new ArrayList<>(ruleNumOfEngines.entrySet());
@@ -208,7 +223,8 @@ public class RuleAllocator {
             ruleMgtWrapper.deployRule2Engine(rule, ip);
             correlationRuleDao.updateRule(rule);
         } catch (CorrelationException e) {
-            throw new CorrelationException("allocate Deploy Rule failed", e);
+            throw new CorrelationException(String.format("Failed to allocate rule <%s> to <%s>",
+                    rule.getName(), ip), e);
         }
     }
 
@@ -222,7 +238,7 @@ public class RuleAllocator {
                 }
             }
         } catch (CorrelationException e) {
-            log.error("When the engine is extended, deleting rule failed", e);
+            LOGGER.error("When the engine is extended, deleting rule failed", e);
         }
     }
 }