/**
* Copyright 2017 ZTE Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onap.holmes.rulemgt.send;
import lombok.extern.slf4j.Slf4j;
import org.glassfish.hk2.api.ServiceLocator;
import org.jvnet.hk2.annotations.Service;
import org.onap.holmes.common.api.entity.CorrelationRule;
import org.onap.holmes.common.dropwizard.ioc.utils.ServiceLocatorHolder;
import org.onap.holmes.common.exception.CorrelationException;
import org.onap.holmes.common.utils.DbDaoUtil;
import org.onap.holmes.rulemgt.bolt.enginebolt.EngineWrapper;
import org.onap.holmes.rulemgt.db.CorrelationRuleDao;
import org.onap.holmes.rulemgt.msb.EngineIpList;
import org.onap.holmes.rulemgt.wrapper.RuleQueryWrapper;
import org.onap.holmes.rulemgt.wrapper.RuleMgtWrapper;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
import java.util.*;
@Slf4j
public class RuleAllocation {
private final static int ENABLE = 1;
private RuleMgtWrapper ruleMgtWrapper;
private RuleQueryWrapper ruleQueryWrapper;
private EngineWrapper engineWrapper;
private EngineIpList engineIpList;
private DbDaoUtil daoUtil;
private CorrelationRuleDao correlationRuleDao;
private int ruleCount;
private int serviceCount;
private List temIpList = new ArrayList<>();
private List engineService = new ArrayList<>();
private List allRules = new ArrayList<>();
public RuleAllocation() {
ServiceLocator locator = ServiceLocatorHolder.getLocator();
ruleMgtWrapper = locator.getService(RuleMgtWrapper.class);
ruleQueryWrapper = locator.getService(RuleQueryWrapper.class);
engineWrapper = locator.getService(EngineWrapper.class);
engineIpList = locator.getService(EngineIpList.class);
daoUtil = locator.getService(DbDaoUtil.class);
initDaoUtilAndEngineIp();
}
private void initDaoUtilAndEngineIp() {
correlationRuleDao = daoUtil.getJdbiDaoByOnDemand(CorrelationRuleDao.class);
try {
temIpList = engineIpList.getServiceCount();
} catch (Exception e) {
log.warn("Failed to get the number of engine instances.", e);
}
}
public void judgeAndAllocateRule(List ipList) throws Exception {
if (ipList != null) {
engineService = ipList;
serviceCount = ipList.size();
}
if (temIpList.size() < serviceCount) {
//extend
List deleteRule = calculateRule(temIpList);
List allocateRule = calculateRule(temIpList);
List extendIp = extendCompareIp(engineService, temIpList);
AllocateService(extendIp, allocateRule);
deleteRuleFromFormerEngine(deleteRule, temIpList);
} else if (temIpList.size() > serviceCount) {
//destroy
List destroyIp = destroyCompareIp(engineService, temIpList);
AllocateService(restIp(destroyIp), relocateRuleAfterDestroy(destroyIp));
} else if (temIpList.size() == serviceCount) {
temIpList = engineService;
return;
}
temIpList = engineService;
}
// When the engine is expanding, the rules that need to be allocated are calculated.
private List calculateRule(List oldIpList) throws Exception {
allRules = ruleQueryWrapper.queryRuleByEnable(ENABLE);
if (allRules != null) {
ruleCount = allRules.size();
}
int count = ruleCount / serviceCount;
int remainder = ruleCount % serviceCount;
List subRule = new ArrayList<>();
for (String ip : oldIpList) {
List rules = ruleQueryWrapper.queryRuleByEngineInstance(ip);
List tem = rules.subList(count + (remainder-- / oldIpList.size()), rules.size());
subRule.addAll(tem);
}
return subRule;
}
//Rules that need to be allocated after the engine is destroyed
private List relocateRuleAfterDestroy(List destroyIpList) throws CorrelationException {
List rules = new ArrayList<>();
try {
if (destroyIpList != null) {
for (String ip : destroyIpList) {
rules.addAll(ruleQueryWrapper.queryRuleByEngineInstance(ip));
}
}
} catch (CorrelationException e) {
log.error("method relocateRuleAfterDestroy get data from DB failed !", e);
}
return rules;
}
//Extended IP
private List extendCompareIp(List newList, List oldList) {
List extendIpList = new ArrayList<>();
for (String ip : newList) {
if (!oldList.contains(ip)) {
extendIpList.add(ip);
}
}
return extendIpList;
}
//Destroyed IP
private List destroyCompareIp(List newList, List oldList) {
List destroyIpList = new ArrayList<>();
for (String ip : oldList) {
if (!newList.contains(ip)) {
destroyIpList.add(ip);
}
}
return destroyIpList;
}
//Residual IP after destruction
private List restIp(List destroyIp) {
List restIpList = new ArrayList<>();
for (String ip : engineService) {
if (!destroyIp.contains(ip)) {
restIpList.add(ip);
}
}
return restIpList;
}
public void AllocateService(List extendIpList, List subList) throws Exception {
List needIpList = getSortIp(extendIpList);
for (int i = 0, j = 0; j < subList.size(); i++, j++) {
int index = i % needIpList.size();
String deployIp = needIpList.get(index);
CorrelationRule rule = subList.get(j);
rule.setEngineInstance(deployIp);
allocateDeployRule(rule, deployIp);
}
}
//The IP to be allocated is in ascending order, and the least is circulate.
private List getSortIp(List ipList) {
List ipRuleList = new ArrayList<>();
HashMap hashMap = new HashMap();
try {
for (String ip : ipList) {
ipRuleList = ruleQueryWrapper.queryRuleByEngineInstance(ip);
if (ipRuleList != null) {
hashMap.put(ip, String.valueOf(ipRuleList.size()));
}
}
} catch (Exception e) {
log.error("getEngineIp4AddRule failed !", e);
}
List> list_Data = new ArrayList>(hashMap.entrySet());
Collections.sort(list_Data, new Comparator>() {
public int compare(Map.Entry o1, Map.Entry o2) {
return o1.getValue().compareTo(o2.getValue());
}
});
List needList = new ArrayList<>();
for (Map.Entry map : list_Data) {
String key = map.getKey();
needList.add(key);
}
return needList;
}
private void allocateDeployRule(CorrelationRule rule, String ip) throws CorrelationException {
try {
ruleMgtWrapper.deployRule2Engine(rule, ip);
correlationRuleDao.updateRule(rule);
} catch (CorrelationException e) {
throw new CorrelationException("allocate Deploy Rule failed", e);
}
}
private void deleteRuleFromFormerEngine(List subRule, List oldList) {
try {
for (String ip : oldList) {
for (CorrelationRule rule : subRule) {
if (ip.equals(rule.getEngineInstance())) {
engineWrapper.deleteRuleFromEngine(rule.getPackageName(), ip);
}
}
}
} catch (CorrelationException e) {
log.error("When the engine is extended, deleting rule failed", e);
}
}
}