2 * Copyright 2017 ZTE Corporation.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.holmes.rulemgt.send;
19 import lombok.extern.slf4j.Slf4j;
20 import org.glassfish.hk2.api.ServiceLocator;
21 import org.onap.holmes.common.api.entity.CorrelationRule;
22 import org.onap.holmes.common.dropwizard.ioc.utils.ServiceLocatorHolder;
23 import org.onap.holmes.common.exception.CorrelationException;
24 import org.onap.holmes.common.utils.DbDaoUtil;
25 import org.onap.holmes.rulemgt.bolt.enginebolt.EngineWrapper;
26 import org.onap.holmes.rulemgt.db.CorrelationRuleDao;
27 import org.onap.holmes.rulemgt.msb.EngineInsQueryTool;
28 import org.onap.holmes.rulemgt.wrapper.RuleMgtWrapper;
29 import org.onap.holmes.rulemgt.wrapper.RuleQueryWrapper;
35 public class RuleAllocator {
36 public final static int ENABLE = 1;
37 private RuleMgtWrapper ruleMgtWrapper;
38 private RuleQueryWrapper ruleQueryWrapper;
39 private EngineWrapper engineWrapper;
40 private EngineInsQueryTool engineInsQueryTool;
41 private DbDaoUtil daoUtil;
42 private CorrelationRuleDao correlationRuleDao;
43 private int latestEngineInsNum = 0;
44 private List<String> existingEngineServiceIps = new ArrayList<>();
45 private List<String> latestEngineServiceIps = new ArrayList<>();
47 public RuleAllocator() {
48 ServiceLocator locator = ServiceLocatorHolder.getLocator();
49 ruleMgtWrapper = locator.getService(RuleMgtWrapper.class);
50 ruleQueryWrapper = locator.getService(RuleQueryWrapper.class);
51 engineWrapper = locator.getService(EngineWrapper.class);
52 engineInsQueryTool = locator.getService(EngineInsQueryTool.class);
53 daoUtil = locator.getService(DbDaoUtil.class);
55 initDaoUtilAndEngineIp();
58 private void initDaoUtilAndEngineIp() {
59 correlationRuleDao = daoUtil.getJdbiDaoByOnDemand(CorrelationRuleDao.class);
61 existingEngineServiceIps = engineInsQueryTool.getInstanceList();
63 } catch (Exception e) {
64 log.warn("Failed to get the number of engine instances.", e);
68 public synchronized void allocateRules(List<String> latestEngineIps) throws Exception {
69 if (latestEngineIps == null) {
70 throw new NullPointerException("The parameter of " + this.getClass().getSimpleName()
71 + ".allocateRules(List<String>) can not be null!");
74 latestEngineServiceIps = latestEngineIps;
75 latestEngineInsNum = latestEngineIps.size();
76 if (existingEngineServiceIps.size() < latestEngineInsNum) {
78 List<CorrelationRule> rules2Allocate = calculateRule(existingEngineServiceIps);
79 List<CorrelationRule> rules2Delete = copyList(rules2Allocate);
80 List<String> newInstanceIds = sortOutNewEngineInstances(latestEngineServiceIps, existingEngineServiceIps);
81 distributeRules(newInstanceIds, rules2Allocate);
82 cleanUpRulesFromEngines(rules2Delete, existingEngineServiceIps);
83 } else if (existingEngineServiceIps.size() > latestEngineInsNum) {
85 List<String> destroyed = getDestroyedEngines(latestEngineServiceIps, existingEngineServiceIps);
86 distributeRules(getRemainingEngines(destroyed), reallocateRules(destroyed));
89 existingEngineServiceIps = latestEngineServiceIps;
92 private List<CorrelationRule> copyList(List<CorrelationRule> rules) {
93 List<CorrelationRule> ret = new ArrayList<>(rules.size());
94 for (CorrelationRule r : rules) {
95 ret.add((CorrelationRule) r.clone());
100 // When the engine is expanding, the rules that need to be allocated are calculated.
101 private List<CorrelationRule> calculateRule(List<String> existingEngineIps) throws CorrelationException {
102 List<CorrelationRule> enabledRules = ruleQueryWrapper.queryRuleByEnable(ENABLE);
104 if (enabledRules != null) {
105 ruleCount = enabledRules.size();
107 int count = ruleCount / latestEngineInsNum;
108 int remainder = ruleCount % latestEngineInsNum;
110 List<CorrelationRule> ret = new ArrayList<>();
111 for (String ip : existingEngineIps) {
112 List<CorrelationRule> rules = ruleQueryWrapper.queryRuleByEngineInstance(ip);
113 List<CorrelationRule> tmp = rules.subList(count + (remainder-- / existingEngineIps.size()), rules.size());
119 // Rules that need to be allocated after the engine is destroyed
120 private List<CorrelationRule> reallocateRules(List<String> destroyIpList) throws CorrelationException {
121 List<CorrelationRule> rules = new ArrayList<>();
123 if (destroyIpList != null) {
124 for (String ip : destroyIpList) {
125 rules.addAll(ruleQueryWrapper.queryRuleByEngineInstance(ip));
128 } catch (CorrelationException e) {
129 log.error("method reallocateRules get data from DB failed !", e);
135 private List<String> sortOutNewEngineInstances(List<String> newIps, List<String> oldIps) {
136 List<String> ret = new ArrayList<>();
138 for (String ip : newIps) {
139 if (!oldIps.contains(ip)) {
147 private List<String> getDestroyedEngines(List<String> latest, List<String> existing) {
148 List<String> ret = new ArrayList<>();
149 for (String ip : existing) {
150 if (!latest.contains(ip)) {
157 // Residual IP after destruction
158 private List<String> getRemainingEngines(List<String> destroyed) {
159 List<String> ret = new ArrayList<>();
160 for (String ip : latestEngineServiceIps) {
161 if (!destroyed.contains(ip)) {
168 private void distributeRules(List<String> instanceIps, List<CorrelationRule> rules) throws CorrelationException {
169 List<String> sortedIps = sortIpByRuleNumDesc(instanceIps);
171 for (int i = 0, j = 0; j < rules.size(); i++, j++) {
172 int index = i % sortedIps.size();
173 String ip = sortedIps.get(index);
174 CorrelationRule rule = rules.get(j);
175 rule.setEngineInstance(ip);
176 allocateRule(rule, ip);
180 // Sorted by the number of rules each engine contains, in a descending order.
181 private List<String> sortIpByRuleNumDesc(List<String> ips) {
182 List<CorrelationRule> rules = null;
183 Map<String, Integer> ruleNumOfEngines = new HashMap();
186 for (String ip : ips) {
187 rules = ruleQueryWrapper.queryRuleByEngineInstance(ip);
189 ruleNumOfEngines.put(ip, rules.size());
192 } catch (Exception e) {
193 log.error("getEngineIp4AddRule failed !", e);
196 List<Map.Entry<String, Integer>> sortedEntries = new ArrayList<>(ruleNumOfEngines.entrySet());
197 Collections.sort(sortedEntries, (o1, o2) -> o2.getValue() - o1.getValue());
199 List<String> ret = new ArrayList<>();
200 for (Map.Entry<String, Integer> entry : sortedEntries) {
201 ret.add(entry.getKey());
206 private void allocateRule(CorrelationRule rule, String ip) throws CorrelationException {
208 ruleMgtWrapper.deployRule2Engine(rule, ip);
209 correlationRuleDao.updateRule(rule);
210 } catch (CorrelationException e) {
211 throw new CorrelationException("allocate Deploy Rule failed", e);
215 private void cleanUpRulesFromEngines(List<CorrelationRule> rules, List<String> ipList) {
217 for (String ip : ipList) {
218 for (CorrelationRule rule : rules) {
219 if (ip.equals(rule.getEngineInstance())) {
220 engineWrapper.deleteRuleFromEngine(rule.getPackageName(), ip);
224 } catch (CorrelationException e) {
225 log.error("When the engine is extended, deleting rule failed", e);