/**
- * Copyright 2017 ZTE Corporation.
+ * Copyright 2017-2020 ZTE Corporation.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* limitations under the License.
*/
-package org.onap.holmes.rulemgt.send;
+package org.onap.holmes.rulemgt;
import lombok.extern.slf4j.Slf4j;
-import org.glassfish.hk2.api.ServiceLocator;
+import org.jvnet.hk2.annotations.Service;
import org.onap.holmes.common.api.entity.CorrelationRule;
-import org.onap.holmes.common.dropwizard.ioc.utils.ServiceLocatorHolder;
import org.onap.holmes.common.exception.CorrelationException;
import org.onap.holmes.common.utils.DbDaoUtil;
import org.onap.holmes.rulemgt.bolt.enginebolt.EngineWrapper;
import org.onap.holmes.rulemgt.db.CorrelationRuleDao;
-import org.onap.holmes.rulemgt.msb.EngineInsQueryTool;
+import org.onap.holmes.rulemgt.tools.EngineTools;
import org.onap.holmes.rulemgt.wrapper.RuleMgtWrapper;
import org.onap.holmes.rulemgt.wrapper.RuleQueryWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import javax.annotation.PostConstruct;
+import javax.inject.Inject;
import java.util.*;
+import static java.util.concurrent.TimeUnit.SECONDS;
@Slf4j
+@Service
public class RuleAllocator {
+ private static final Logger LOGGER = LoggerFactory.getLogger(RuleAllocator.class);
+
public final static int ENABLE = 1;
private RuleMgtWrapper ruleMgtWrapper;
private RuleQueryWrapper ruleQueryWrapper;
private EngineWrapper engineWrapper;
- private EngineInsQueryTool engineInsQueryTool;
- private DbDaoUtil daoUtil;
+ private EngineTools engineTools;
private CorrelationRuleDao correlationRuleDao;
- private int latestEngineInsNum = 0;
- private List<String> existingEngineServiceIps = new ArrayList<>();
- private List<String> latestEngineServiceIps = new ArrayList<>();
-
- public RuleAllocator() {
- ServiceLocator locator = ServiceLocatorHolder.getLocator();
- ruleMgtWrapper = locator.getService(RuleMgtWrapper.class);
- ruleQueryWrapper = locator.getService(RuleQueryWrapper.class);
- engineWrapper = locator.getService(EngineWrapper.class);
- engineInsQueryTool = locator.getService(EngineInsQueryTool.class);
- daoUtil = locator.getService(DbDaoUtil.class);
-
- initDaoUtilAndEngineIp();
- }
- private void initDaoUtilAndEngineIp() {
+ @Inject
+ public RuleAllocator(RuleMgtWrapper ruleMgtWrapper, RuleQueryWrapper ruleQueryWrapper,
+ EngineWrapper engineWrapper, EngineTools engineTools, DbDaoUtil daoUtil) {
+ this.ruleMgtWrapper = ruleMgtWrapper;
+ this.ruleQueryWrapper = ruleQueryWrapper;
+ this.engineWrapper = engineWrapper;
+ this.engineTools = engineTools;
correlationRuleDao = daoUtil.getJdbiDaoByOnDemand(CorrelationRuleDao.class);
- try {
- existingEngineServiceIps = engineInsQueryTool.getInstanceList();
+ }
- } catch (Exception e) {
- log.warn("Failed to get the number of engine instances.", e);
- }
+ @PostConstruct
+ private void initialize() {
+ new Timer("RuleAllocatorTimer").schedule(new TimerTask() {
+
+ public void run() {
+ try {
+ allocateRules();
+ } catch (Exception e) {
+ LOGGER.error("Failed to reallocate rules.", e);
+ }
+ }
+
+ }, SECONDS.toMillis(10), SECONDS.toMillis(30));
}
- public synchronized void allocateRules(List<String> latestEngineIps) throws Exception {
- if (latestEngineIps == null) {
- throw new NullPointerException("The parameter of " + this.getClass().getSimpleName()
- + ".allocateRules(List<String>) can not be null!");
+ public synchronized void allocateRules() throws Exception {
+ List<String> engines = engineTools.getInstanceList();
+
+ if (engines == null) {
+ return;
}
- latestEngineServiceIps = latestEngineIps;
- latestEngineInsNum = latestEngineIps.size();
- if (existingEngineServiceIps.size() < latestEngineInsNum) {
+ int numOfEngines = engines.size();
+ LOGGER.info(String.format("There are %d engine instance(s) running currently.", numOfEngines));
+
+ List<String> legacyEngineInstances = engineTools.getLegacyEngineInstances();
+ if (legacyEngineInstances == null) {
+ return;
+ }
+
+ if (legacyEngineInstances.size() < numOfEngines) {
//extend
- List<CorrelationRule> rules2Allocate = calculateRule(existingEngineServiceIps);
+ List<CorrelationRule> rules2Allocate = calculateRule(legacyEngineInstances, numOfEngines);
List<CorrelationRule> rules2Delete = copyList(rules2Allocate);
- List<String> newInstanceIds = sortOutNewEngineInstances(latestEngineServiceIps, existingEngineServiceIps);
+ List<String> newInstanceIds = sortOutNewEngineInstances(engines, legacyEngineInstances);
distributeRules(newInstanceIds, rules2Allocate);
- cleanUpRulesFromEngines(rules2Delete, existingEngineServiceIps);
- } else if (existingEngineServiceIps.size() > latestEngineInsNum) {
+ cleanUpRulesFromEngines(rules2Delete, legacyEngineInstances);
+ } else {
//destroy
- List<String> destroyed = getDestroyedEngines(latestEngineServiceIps, existingEngineServiceIps);
- distributeRules(getRemainingEngines(destroyed), reallocateRules(destroyed));
+ List<String> destroyed = getDestroyedEngines(engines, legacyEngineInstances);
+ distributeRules(getRemainingEngines(engines, destroyed), getRules(destroyed));
}
-
- existingEngineServiceIps = latestEngineServiceIps;
}
private List<CorrelationRule> copyList(List<CorrelationRule> rules) {
}
// When the engine is expanding, the rules that need to be allocated are calculated.
- private List<CorrelationRule> calculateRule(List<String> existingEngineIps) throws CorrelationException {
+ private List<CorrelationRule> calculateRule(List<String> existingEngineIps,
+ int latestEngineInsNum) throws CorrelationException {
List<CorrelationRule> enabledRules = ruleQueryWrapper.queryRuleByEnable(ENABLE);
int ruleCount = 0;
if (enabledRules != null) {
ruleCount = enabledRules.size();
}
+ // Average number of rule that's to be allocate into each instance
int count = ruleCount / latestEngineInsNum;
+ // The number of remaining rules (to be allocated) after each instance has been allocated with the average number of rules.
int remainder = ruleCount % latestEngineInsNum;
List<CorrelationRule> ret = new ArrayList<>();
}
// Rules that need to be allocated after the engine is destroyed
- private List<CorrelationRule> reallocateRules(List<String> destroyIpList) throws CorrelationException {
+ private List<CorrelationRule> getRules(List<String> destroyIpList) throws CorrelationException {
List<CorrelationRule> rules = new ArrayList<>();
try {
if (destroyIpList != null) {
}
}
} catch (CorrelationException e) {
- log.error("method reallocateRules get data from DB failed !", e);
+ LOGGER.error("method getRules get data from DB failed !", e);
}
return rules;
}
}
// Residual IP after destruction
- private List<String> getRemainingEngines(List<String> destroyed) {
+ private List<String> getRemainingEngines(List<String> all, List<String> destroyed) {
List<String> ret = new ArrayList<>();
- for (String ip : latestEngineServiceIps) {
+ for (String ip : all) {
if (!destroyed.contains(ip)) {
ret.add(ip);
}
}
}
} catch (Exception e) {
- log.error("getEngineIp4AddRule failed !", e);
+ LOGGER.error("getEngineWithLeastRules failed !", e);
}
List<Map.Entry<String, Integer>> sortedEntries = new ArrayList<>(ruleNumOfEngines.entrySet());
ruleMgtWrapper.deployRule2Engine(rule, ip);
correlationRuleDao.updateRule(rule);
} catch (CorrelationException e) {
- throw new CorrelationException("allocate Deploy Rule failed", e);
+ throw new CorrelationException(String.format("Failed to allocate rule <%s> to <%s>",
+ rule.getName(), ip), e);
}
}
}
}
} catch (CorrelationException e) {
- log.error("When the engine is extended, deleting rule failed", e);
+ LOGGER.error("When the engine is extended, deleting rule failed", e);
}
}
}