2 * Copyright 2017 ZTE Corporation.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.holmes.rulemgt.send;
19 import lombok.extern.slf4j.Slf4j;
20 import org.jvnet.hk2.annotations.Service;
21 import org.onap.holmes.common.api.entity.CorrelationRule;
22 import org.onap.holmes.common.exception.CorrelationException;
23 import org.onap.holmes.common.utils.DbDaoUtil;
24 import org.onap.holmes.rulemgt.bolt.enginebolt.EngineWrapper;
25 import org.onap.holmes.rulemgt.db.CorrelationRuleDao;
26 import org.onap.holmes.rulemgt.msb.EngineIpList;
27 import org.onap.holmes.rulemgt.wrapper.RuleQueryWrapper;
28 import org.onap.holmes.rulemgt.wrapper.RuleMgtWrapper;
30 import javax.annotation.PostConstruct;
31 import javax.inject.Inject;
37 public class RuleAllocation {
39 private final static int ENABLE = 1;
42 private RuleMgtWrapper ruleMgtWrapper;
44 private RuleQueryWrapper ruleQueryWrapper;
46 private EngineWrapper engineWrapper;
48 private EngineIpList engineIpList;
50 private DbDaoUtil daoUtil;
52 private CorrelationRuleDao correlationRuleDao;
54 private int ruleCount;
55 private int serviceCount;
56 private List<String> temIpList = new ArrayList<>();
57 private List<String> engineService = new ArrayList<>();
58 private List<CorrelationRule> allRules = new ArrayList<>();
61 public void initDaoUtilAndEngineIp() throws Exception{
62 correlationRuleDao = daoUtil.getJdbiDaoByOnDemand(CorrelationRuleDao.class);
63 temIpList = engineIpList.getServiceCount();
66 public void judgeAndAllocateRule(List<String> ipList)throws Exception{
68 engineService = ipList;
69 serviceCount = ipList.size();
71 if(temIpList.size() < serviceCount){
73 List<CorrelationRule> deleteRule = calculateRule(temIpList);
74 List<CorrelationRule> allocateRule = calculateRule(temIpList);
75 List<String> extendIp = extendCompareIp(engineService,temIpList);
76 AllocateService(extendIp,allocateRule);
77 deleteRuleFromFormerEngine(deleteRule,temIpList);
79 } else if (temIpList.size() > serviceCount) {
81 List<String> destroyIp = destroyCompareIp(engineService, temIpList);
82 AllocateService(restIp(destroyIp), relocateRuleAfterDestroy(destroyIp));
84 } else if(temIpList.size() == serviceCount) {
85 temIpList = engineService;
88 temIpList = engineService;
93 // When the engine is expanding, the rules that need to be allocated are calculated.
94 private List<CorrelationRule> calculateRule(List<String> oldIpList) throws Exception{
95 allRules = ruleQueryWrapper.queryRuleByEnable(ENABLE);
96 if(allRules != null) {
97 ruleCount = allRules.size();
99 int count = ruleCount / serviceCount;
100 int remainder = ruleCount % serviceCount;
102 List<CorrelationRule> subRule = new ArrayList<>();
103 for(String ip : oldIpList) {
104 List<CorrelationRule> rules = ruleQueryWrapper.queryRuleByEngineInstance(ip);
105 List<CorrelationRule> tem = rules.subList(count + (remainder-- / oldIpList.size()),rules.size());
111 //Rules that need to be allocated after the engine is destroyed
112 private List<CorrelationRule> relocateRuleAfterDestroy(List<String> destroyIpList) throws CorrelationException {
113 List<CorrelationRule> rules = new ArrayList<>();
115 if(destroyIpList != null){
116 for(String ip : destroyIpList) {
117 rules.addAll(ruleQueryWrapper.queryRuleByEngineInstance(ip));
120 }catch(CorrelationException e) {
121 log.error("method relocateRuleAfterDestroy get data from DB failed !" +e.getMessage());
127 private List<String> extendCompareIp(List<String> newList, List<String> oldList){
128 List<String> extendIpList = new ArrayList<>();
130 for( String ip :newList) {
131 if(! oldList.contains(ip)) {
132 extendIpList.add(ip);
139 private List<String> destroyCompareIp(List<String> newList, List<String> oldList) {
140 List<String> destroyIpList = new ArrayList<>();
141 for(String ip : oldList) {
142 if(!newList.contains(ip)) {
143 destroyIpList.add(ip);
146 return destroyIpList;
149 //Residual IP after destruction
150 private List<String> restIp(List<String> destroyIp) {
151 List<String> restIpList = new ArrayList<>();
152 for(String ip : engineService) {
153 if(!destroyIp.contains(ip)) {
160 public void AllocateService(List<String> extendIpList, List<CorrelationRule> subList) throws Exception{
161 List<String> needIpList = getSortIp(extendIpList);
163 for(int i=0,j=0;j < subList.size();i++,j++ ){
164 int index = i % needIpList.size();
165 String deployIp = needIpList.get(index);
166 CorrelationRule rule = subList.get(j);
167 rule.setEngineInstance(deployIp);
168 allocateDeployRule(rule,deployIp);
172 //The IP to be allocated is in ascending order, and the least is circulate.
173 private List<String > getSortIp(List<String> ipList){
174 List<CorrelationRule> ipRuleList = new ArrayList<>();
175 HashMap<String,String> hashMap = new HashMap();
178 for(String ip : ipList){
179 ipRuleList = ruleQueryWrapper.queryRuleByEngineInstance(ip);
180 if(ipRuleList != null) {
181 hashMap.put(ip, String.valueOf(ipRuleList.size()));
184 }catch (Exception e){
185 log.error("getEngineIp4AddRule failed !" + e.getMessage());
188 List<Map.Entry<String, String>> list_Data = new ArrayList<Map.Entry<String, String>>(hashMap.entrySet());
189 Collections.sort(list_Data, new Comparator<Map.Entry<String, String>>() {
190 public int compare(Map.Entry<String, String> o1, Map.Entry<String, String> o2)
192 return o1.getValue().compareTo(o2.getValue());
196 List<String> needList = new ArrayList<>();
197 for(Map.Entry<String, String> map: list_Data) {
198 String key = map.getKey();
204 private void allocateDeployRule(CorrelationRule rule, String ip) throws CorrelationException {
206 ruleMgtWrapper.deployRule2Engine(rule,ip);
207 correlationRuleDao.updateRule(rule);
208 }catch (CorrelationException e){
209 throw new CorrelationException("allocate Deploy Rule failed", e);
213 private void deleteRuleFromFormerEngine(List<CorrelationRule> subRule, List<String> oldList) {
215 for(String ip : oldList){
216 for(CorrelationRule rule: subRule) {
217 if(ip.equals(rule.getEngineInstance())) {
218 engineWrapper.deleteRuleFromEngine(rule.getPackageName(),ip);
222 }catch (CorrelationException e) {
223 log.error("When the engine is extended, deleting rule failed" +e.getMessage());