2 * Copyright 2017 ZTE Corporation.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.holmes.rulemgt.send;
19 import lombok.extern.slf4j.Slf4j;
20 import org.glassfish.hk2.api.ServiceLocator;
21 import org.jvnet.hk2.annotations.Service;
22 import org.onap.holmes.common.api.entity.CorrelationRule;
23 import org.onap.holmes.common.dropwizard.ioc.utils.ServiceLocatorHolder;
24 import org.onap.holmes.common.exception.CorrelationException;
25 import org.onap.holmes.common.utils.DbDaoUtil;
26 import org.onap.holmes.rulemgt.bolt.enginebolt.EngineWrapper;
27 import org.onap.holmes.rulemgt.db.CorrelationRuleDao;
28 import org.onap.holmes.rulemgt.msb.EngineIpList;
29 import org.onap.holmes.rulemgt.wrapper.RuleQueryWrapper;
30 import org.onap.holmes.rulemgt.wrapper.RuleMgtWrapper;
32 import javax.annotation.PostConstruct;
33 import javax.inject.Inject;
38 public class RuleAllocation {
40 private final static int ENABLE = 1;
41 private RuleMgtWrapper ruleMgtWrapper;
42 private RuleQueryWrapper ruleQueryWrapper;
43 private EngineWrapper engineWrapper;
44 private EngineIpList engineIpList;
45 private DbDaoUtil daoUtil;
46 private CorrelationRuleDao correlationRuleDao;
47 private int ruleCount;
48 private int serviceCount;
49 private List<String> temIpList = new ArrayList<>();
50 private List<String> engineService = new ArrayList<>();
51 private List<CorrelationRule> allRules = new ArrayList<>();
53 public RuleAllocation() {
54 ServiceLocator locator = ServiceLocatorHolder.getLocator();
55 ruleMgtWrapper = locator.getService(RuleMgtWrapper.class);
56 ruleQueryWrapper = locator.getService(RuleQueryWrapper.class);
57 engineWrapper = locator.getService(EngineWrapper.class);
58 engineIpList = locator.getService(EngineIpList.class);
59 daoUtil = locator.getService(DbDaoUtil.class);
61 initDaoUtilAndEngineIp();
64 private void initDaoUtilAndEngineIp() {
65 correlationRuleDao = daoUtil.getJdbiDaoByOnDemand(CorrelationRuleDao.class);
67 temIpList = engineIpList.getServiceCount();
69 } catch (Exception e) {
70 log.warn("Failed to get the number of engine instances.", e);
74 public void judgeAndAllocateRule(List<String> ipList) throws Exception {
76 engineService = ipList;
77 serviceCount = ipList.size();
79 if (temIpList.size() < serviceCount) {
81 List<CorrelationRule> deleteRule = calculateRule(temIpList);
82 List<CorrelationRule> allocateRule = calculateRule(temIpList);
83 List<String> extendIp = extendCompareIp(engineService, temIpList);
84 AllocateService(extendIp, allocateRule);
85 deleteRuleFromFormerEngine(deleteRule, temIpList);
87 } else if (temIpList.size() > serviceCount) {
89 List<String> destroyIp = destroyCompareIp(engineService, temIpList);
90 AllocateService(restIp(destroyIp), relocateRuleAfterDestroy(destroyIp));
92 } else if (temIpList.size() == serviceCount) {
93 temIpList = engineService;
96 temIpList = engineService;
101 // When the engine is expanding, the rules that need to be allocated are calculated.
102 private List<CorrelationRule> calculateRule(List<String> oldIpList) throws Exception {
103 allRules = ruleQueryWrapper.queryRuleByEnable(ENABLE);
104 if (allRules != null) {
105 ruleCount = allRules.size();
107 int count = ruleCount / serviceCount;
108 int remainder = ruleCount % serviceCount;
110 List<CorrelationRule> subRule = new ArrayList<>();
111 for (String ip : oldIpList) {
112 List<CorrelationRule> rules = ruleQueryWrapper.queryRuleByEngineInstance(ip);
113 List<CorrelationRule> tem = rules.subList(count + (remainder-- / oldIpList.size()), rules.size());
119 //Rules that need to be allocated after the engine is destroyed
120 private List<CorrelationRule> relocateRuleAfterDestroy(List<String> destroyIpList) throws CorrelationException {
121 List<CorrelationRule> rules = new ArrayList<>();
123 if (destroyIpList != null) {
124 for (String ip : destroyIpList) {
125 rules.addAll(ruleQueryWrapper.queryRuleByEngineInstance(ip));
128 } catch (CorrelationException e) {
129 log.error("method relocateRuleAfterDestroy get data from DB failed !", e);
135 private List<String> extendCompareIp(List<String> newList, List<String> oldList) {
136 List<String> extendIpList = new ArrayList<>();
138 for (String ip : newList) {
139 if (!oldList.contains(ip)) {
140 extendIpList.add(ip);
147 private List<String> destroyCompareIp(List<String> newList, List<String> oldList) {
148 List<String> destroyIpList = new ArrayList<>();
149 for (String ip : oldList) {
150 if (!newList.contains(ip)) {
151 destroyIpList.add(ip);
154 return destroyIpList;
157 //Residual IP after destruction
158 private List<String> restIp(List<String> destroyIp) {
159 List<String> restIpList = new ArrayList<>();
160 for (String ip : engineService) {
161 if (!destroyIp.contains(ip)) {
168 public void AllocateService(List<String> extendIpList, List<CorrelationRule> subList) throws Exception {
169 List<String> needIpList = getSortIp(extendIpList);
171 for (int i = 0, j = 0; j < subList.size(); i++, j++) {
172 int index = i % needIpList.size();
173 String deployIp = needIpList.get(index);
174 CorrelationRule rule = subList.get(j);
175 rule.setEngineInstance(deployIp);
176 allocateDeployRule(rule, deployIp);
180 //The IP to be allocated is in ascending order, and the least is circulate.
181 private List<String> getSortIp(List<String> ipList) {
182 List<CorrelationRule> ipRuleList = new ArrayList<>();
183 HashMap<String, String> hashMap = new HashMap();
186 for (String ip : ipList) {
187 ipRuleList = ruleQueryWrapper.queryRuleByEngineInstance(ip);
188 if (ipRuleList != null) {
189 hashMap.put(ip, String.valueOf(ipRuleList.size()));
192 } catch (Exception e) {
193 log.error("getEngineIp4AddRule failed !", e);
196 List<Map.Entry<String, String>> list_Data = new ArrayList<>(hashMap.entrySet());
197 Collections.sort(list_Data,(o1,o2) -> o1.getValue().compareTo(o2.getValue()));
198 List<String> needList = new ArrayList<>();
199 for (Map.Entry<String, String> map : list_Data) {
200 String key = map.getKey();
206 private void allocateDeployRule(CorrelationRule rule, String ip) throws CorrelationException {
208 ruleMgtWrapper.deployRule2Engine(rule, ip);
209 correlationRuleDao.updateRule(rule);
210 } catch (CorrelationException e) {
211 throw new CorrelationException("allocate Deploy Rule failed", e);
215 private void deleteRuleFromFormerEngine(List<CorrelationRule> subRule, List<String> oldList) {
217 for (String ip : oldList) {
218 for (CorrelationRule rule : subRule) {
219 if (ip.equals(rule.getEngineInstance())) {
220 engineWrapper.deleteRuleFromEngine(rule.getPackageName(), ip);
224 } catch (CorrelationException e) {
225 log.error("When the engine is extended, deleting rule failed", e);