/** * Copyright 2017 ZTE Corporation. *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at *

* http://www.apache.org/licenses/LICENSE-2.0 *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.onap.holmes.rulemgt.send; import lombok.extern.slf4j.Slf4j; import org.glassfish.hk2.api.ServiceLocator; import org.jvnet.hk2.annotations.Service; import org.onap.holmes.common.api.entity.CorrelationRule; import org.onap.holmes.common.dropwizard.ioc.utils.ServiceLocatorHolder; import org.onap.holmes.common.exception.CorrelationException; import org.onap.holmes.common.utils.DbDaoUtil; import org.onap.holmes.rulemgt.bolt.enginebolt.EngineWrapper; import org.onap.holmes.rulemgt.db.CorrelationRuleDao; import org.onap.holmes.rulemgt.msb.EngineIpList; import org.onap.holmes.rulemgt.wrapper.RuleQueryWrapper; import org.onap.holmes.rulemgt.wrapper.RuleMgtWrapper; import javax.annotation.PostConstruct; import javax.inject.Inject; import java.util.*; @Slf4j public class RuleAllocation { private final static int ENABLE = 1; private RuleMgtWrapper ruleMgtWrapper; private RuleQueryWrapper ruleQueryWrapper; private EngineWrapper engineWrapper; private EngineIpList engineIpList; private DbDaoUtil daoUtil; private CorrelationRuleDao correlationRuleDao; private int ruleCount; private int serviceCount; private List temIpList = new ArrayList<>(); private List engineService = new ArrayList<>(); private List allRules = new ArrayList<>(); public RuleAllocation() { ServiceLocator locator = ServiceLocatorHolder.getLocator(); ruleMgtWrapper = locator.getService(RuleMgtWrapper.class); ruleQueryWrapper = locator.getService(RuleQueryWrapper.class); engineWrapper = locator.getService(EngineWrapper.class); engineIpList = locator.getService(EngineIpList.class); daoUtil = locator.getService(DbDaoUtil.class); initDaoUtilAndEngineIp(); } private void initDaoUtilAndEngineIp() { correlationRuleDao = daoUtil.getJdbiDaoByOnDemand(CorrelationRuleDao.class); try { temIpList = engineIpList.getServiceCount(); } catch (Exception e) { log.warn("Failed to get the number of engine instances.", e); } } public void judgeAndAllocateRule(List ipList) throws Exception { if (ipList != null) { engineService = ipList; serviceCount = ipList.size(); } if (temIpList.size() < serviceCount) { //extend List deleteRule = calculateRule(temIpList); List allocateRule = calculateRule(temIpList); List extendIp = extendCompareIp(engineService, temIpList); AllocateService(extendIp, allocateRule); deleteRuleFromFormerEngine(deleteRule, temIpList); } else if (temIpList.size() > serviceCount) { //destroy List destroyIp = destroyCompareIp(engineService, temIpList); AllocateService(restIp(destroyIp), relocateRuleAfterDestroy(destroyIp)); } else if (temIpList.size() == serviceCount) { temIpList = engineService; return; } temIpList = engineService; } // When the engine is expanding, the rules that need to be allocated are calculated. private List calculateRule(List oldIpList) throws Exception { allRules = ruleQueryWrapper.queryRuleByEnable(ENABLE); if (allRules != null) { ruleCount = allRules.size(); } int count = ruleCount / serviceCount; int remainder = ruleCount % serviceCount; List subRule = new ArrayList<>(); for (String ip : oldIpList) { List rules = ruleQueryWrapper.queryRuleByEngineInstance(ip); List tem = rules.subList(count + (remainder-- / oldIpList.size()), rules.size()); subRule.addAll(tem); } return subRule; } //Rules that need to be allocated after the engine is destroyed private List relocateRuleAfterDestroy(List destroyIpList) throws CorrelationException { List rules = new ArrayList<>(); try { if (destroyIpList != null) { for (String ip : destroyIpList) { rules.addAll(ruleQueryWrapper.queryRuleByEngineInstance(ip)); } } } catch (CorrelationException e) { log.error("method relocateRuleAfterDestroy get data from DB failed !", e); } return rules; } //Extended IP private List extendCompareIp(List newList, List oldList) { List extendIpList = new ArrayList<>(); for (String ip : newList) { if (!oldList.contains(ip)) { extendIpList.add(ip); } } return extendIpList; } //Destroyed IP private List destroyCompareIp(List newList, List oldList) { List destroyIpList = new ArrayList<>(); for (String ip : oldList) { if (!newList.contains(ip)) { destroyIpList.add(ip); } } return destroyIpList; } //Residual IP after destruction private List restIp(List destroyIp) { List restIpList = new ArrayList<>(); for (String ip : engineService) { if (!destroyIp.contains(ip)) { restIpList.add(ip); } } return restIpList; } public void AllocateService(List extendIpList, List subList) throws Exception { List needIpList = getSortIp(extendIpList); for (int i = 0, j = 0; j < subList.size(); i++, j++) { int index = i % needIpList.size(); String deployIp = needIpList.get(index); CorrelationRule rule = subList.get(j); rule.setEngineInstance(deployIp); allocateDeployRule(rule, deployIp); } } //The IP to be allocated is in ascending order, and the least is circulate. private List getSortIp(List ipList) { List ipRuleList = new ArrayList<>(); HashMap hashMap = new HashMap(); try { for (String ip : ipList) { ipRuleList = ruleQueryWrapper.queryRuleByEngineInstance(ip); if (ipRuleList != null) { hashMap.put(ip, String.valueOf(ipRuleList.size())); } } } catch (Exception e) { log.error("getEngineIp4AddRule failed !", e); } List> list_Data = new ArrayList>(hashMap.entrySet()); Collections.sort(list_Data, new Comparator>() { public int compare(Map.Entry o1, Map.Entry o2) { return o1.getValue().compareTo(o2.getValue()); } }); List needList = new ArrayList<>(); for (Map.Entry map : list_Data) { String key = map.getKey(); needList.add(key); } return needList; } private void allocateDeployRule(CorrelationRule rule, String ip) throws CorrelationException { try { ruleMgtWrapper.deployRule2Engine(rule, ip); correlationRuleDao.updateRule(rule); } catch (CorrelationException e) { throw new CorrelationException("allocate Deploy Rule failed", e); } } private void deleteRuleFromFormerEngine(List subRule, List oldList) { try { for (String ip : oldList) { for (CorrelationRule rule : subRule) { if (ip.equals(rule.getEngineInstance())) { engineWrapper.deleteRuleFromEngine(rule.getPackageName(), ip); } } } } catch (CorrelationException e) { log.error("When the engine is extended, deleting rule failed", e); } } }