/** * Copyright 2017 ZTE Corporation. *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at *

* http://www.apache.org/licenses/LICENSE-2.0 *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.onap.holmes.rulemgt.send; import lombok.extern.slf4j.Slf4j; import org.glassfish.hk2.api.ServiceLocator; import org.onap.holmes.common.api.entity.CorrelationRule; import org.onap.holmes.common.dropwizard.ioc.utils.ServiceLocatorHolder; import org.onap.holmes.common.exception.CorrelationException; import org.onap.holmes.common.utils.DbDaoUtil; import org.onap.holmes.rulemgt.bolt.enginebolt.EngineWrapper; import org.onap.holmes.rulemgt.db.CorrelationRuleDao; import org.onap.holmes.rulemgt.msb.EngineInsQueryTool; import org.onap.holmes.rulemgt.wrapper.RuleMgtWrapper; import org.onap.holmes.rulemgt.wrapper.RuleQueryWrapper; import java.util.*; @Slf4j public class RuleAllocator { public final static int ENABLE = 1; private RuleMgtWrapper ruleMgtWrapper; private RuleQueryWrapper ruleQueryWrapper; private EngineWrapper engineWrapper; private EngineInsQueryTool engineInsQueryTool; private DbDaoUtil daoUtil; private CorrelationRuleDao correlationRuleDao; private int latestEngineInsNum = 0; private List existingEngineServiceIps = new ArrayList<>(); private List latestEngineServiceIps = new ArrayList<>(); public RuleAllocator() { ServiceLocator locator = ServiceLocatorHolder.getLocator(); ruleMgtWrapper = locator.getService(RuleMgtWrapper.class); ruleQueryWrapper = locator.getService(RuleQueryWrapper.class); engineWrapper = locator.getService(EngineWrapper.class); engineInsQueryTool = locator.getService(EngineInsQueryTool.class); daoUtil = locator.getService(DbDaoUtil.class); initDaoUtilAndEngineIp(); } private void initDaoUtilAndEngineIp() { correlationRuleDao = daoUtil.getJdbiDaoByOnDemand(CorrelationRuleDao.class); try { existingEngineServiceIps = engineInsQueryTool.getInstanceList(); } catch (Exception e) { log.warn("Failed to get the number of engine instances.", e); } } public synchronized void allocateRules(List latestEngineIps) throws Exception { if (latestEngineIps == null) { throw new NullPointerException("The parameter of " + this.getClass().getSimpleName() + ".allocateRules(List) can not be null!"); } latestEngineServiceIps = latestEngineIps; latestEngineInsNum = latestEngineIps.size(); if (existingEngineServiceIps.size() < latestEngineInsNum) { //extend List rules2Allocate = calculateRule(existingEngineServiceIps); List rules2Delete = copyList(rules2Allocate); List newInstanceIds = sortOutNewEngineInstances(latestEngineServiceIps, existingEngineServiceIps); distributeRules(newInstanceIds, rules2Allocate); cleanUpRulesFromEngines(rules2Delete, existingEngineServiceIps); } else if (existingEngineServiceIps.size() > latestEngineInsNum) { //destroy List destroyed = getDestroyedEngines(latestEngineServiceIps, existingEngineServiceIps); distributeRules(getRemainingEngines(destroyed), reallocateRules(destroyed)); } existingEngineServiceIps = latestEngineServiceIps; } private List copyList(List rules) { List ret = new ArrayList<>(rules.size()); for (CorrelationRule r : rules) { ret.add((CorrelationRule) r.clone()); } return ret; } // When the engine is expanding, the rules that need to be allocated are calculated. private List calculateRule(List existingEngineIps) throws CorrelationException { List enabledRules = ruleQueryWrapper.queryRuleByEnable(ENABLE); int ruleCount = 0; if (enabledRules != null) { ruleCount = enabledRules.size(); } int count = ruleCount / latestEngineInsNum; int remainder = ruleCount % latestEngineInsNum; List ret = new ArrayList<>(); for (String ip : existingEngineIps) { List rules = ruleQueryWrapper.queryRuleByEngineInstance(ip); List tmp = rules.subList(count + (remainder-- / existingEngineIps.size()), rules.size()); ret.addAll(tmp); } return ret; } // Rules that need to be allocated after the engine is destroyed private List reallocateRules(List destroyIpList) throws CorrelationException { List rules = new ArrayList<>(); try { if (destroyIpList != null) { for (String ip : destroyIpList) { rules.addAll(ruleQueryWrapper.queryRuleByEngineInstance(ip)); } } } catch (CorrelationException e) { log.error("method reallocateRules get data from DB failed !", e); } return rules; } // Extended IP private List sortOutNewEngineInstances(List newIps, List oldIps) { List ret = new ArrayList<>(); for (String ip : newIps) { if (!oldIps.contains(ip)) { ret.add(ip); } } return ret; } // Destroyed IP private List getDestroyedEngines(List latest, List existing) { List ret = new ArrayList<>(); for (String ip : existing) { if (!latest.contains(ip)) { ret.add(ip); } } return ret; } // Residual IP after destruction private List getRemainingEngines(List destroyed) { List ret = new ArrayList<>(); for (String ip : latestEngineServiceIps) { if (!destroyed.contains(ip)) { ret.add(ip); } } return ret; } private void distributeRules(List instanceIps, List rules) throws CorrelationException { List sortedIps = sortIpByRuleNumDesc(instanceIps); for (int i = 0, j = 0; j < rules.size(); i++, j++) { int index = i % sortedIps.size(); String ip = sortedIps.get(index); CorrelationRule rule = rules.get(j); rule.setEngineInstance(ip); allocateRule(rule, ip); } } // Sorted by the number of rules each engine contains, in a descending order. private List sortIpByRuleNumDesc(List ips) { List rules = null; Map ruleNumOfEngines = new HashMap(); try { for (String ip : ips) { rules = ruleQueryWrapper.queryRuleByEngineInstance(ip); if (rules != null) { ruleNumOfEngines.put(ip, rules.size()); } } } catch (Exception e) { log.error("getEngineIp4AddRule failed !", e); } List> sortedEntries = new ArrayList<>(ruleNumOfEngines.entrySet()); Collections.sort(sortedEntries, (o1, o2) -> o2.getValue() - o1.getValue()); List ret = new ArrayList<>(); for (Map.Entry entry : sortedEntries) { ret.add(entry.getKey()); } return ret; } private void allocateRule(CorrelationRule rule, String ip) throws CorrelationException { try { ruleMgtWrapper.deployRule2Engine(rule, ip); correlationRuleDao.updateRule(rule); } catch (CorrelationException e) { throw new CorrelationException("allocate Deploy Rule failed", e); } } private void cleanUpRulesFromEngines(List rules, List ipList) { try { for (String ip : ipList) { for (CorrelationRule rule : rules) { if (ip.equals(rule.getEngineInstance())) { engineWrapper.deleteRuleFromEngine(rule.getPackageName(), ip); } } } } catch (CorrelationException e) { log.error("When the engine is extended, deleting rule failed", e); } } }