2 * Copyright 2017-2021 ZTE Corporation.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.holmes.rulemgt;
19 import lombok.extern.slf4j.Slf4j;
20 import org.onap.holmes.common.api.entity.CorrelationRule;
21 import org.onap.holmes.common.exception.CorrelationException;
22 import org.onap.holmes.rulemgt.bolt.enginebolt.EngineWrapper;
23 import org.onap.holmes.rulemgt.db.CorrelationRuleService;
24 import org.onap.holmes.rulemgt.tools.EngineTools;
25 import org.onap.holmes.rulemgt.wrapper.RuleMgtWrapper;
26 import org.onap.holmes.rulemgt.wrapper.RuleQueryWrapper;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29 import org.springframework.beans.factory.annotation.Autowired;
30 import org.springframework.stereotype.Component;
32 import javax.annotation.PostConstruct;
35 import static java.util.concurrent.TimeUnit.SECONDS;
39 public class RuleAllocator {
40 private static final Logger LOGGER = LoggerFactory.getLogger(RuleAllocator.class);
42 public final static int ENABLE = 1;
43 public final static int RETRY_TIMES = 5;
44 public final static long RETRY_INTERVAL_SEC = 15;
45 private RuleMgtWrapper ruleMgtWrapper;
46 private RuleQueryWrapper ruleQueryWrapper;
47 private EngineWrapper engineWrapper;
48 private EngineTools engineTools;
49 private CorrelationRuleService correlationRuleService;
52 public RuleAllocator(RuleMgtWrapper ruleMgtWrapper, RuleQueryWrapper ruleQueryWrapper,
53 EngineWrapper engineWrapper, EngineTools engineTools, CorrelationRuleService correlationRuleService) {
54 this.ruleMgtWrapper = ruleMgtWrapper;
55 this.ruleQueryWrapper = ruleQueryWrapper;
56 this.engineWrapper = engineWrapper;
57 this.engineTools = engineTools;
58 this.correlationRuleService = correlationRuleService;
62 private void initialize() {
63 new Timer("RuleAllocatorTimer").schedule(new TimerTask() {
68 } catch (Exception e) {
69 LOGGER.error("Failed to reallocate rules.", e);
73 }, SECONDS.toMillis(10), SECONDS.toMillis(30));
76 public synchronized void allocateRules() throws Exception {
77 List<String> engines = engineTools.getInstanceList();
79 if (engines == null) {
83 int numOfEngines = engines.size();
84 LOGGER.info(String.format("There are %d engine instance(s) running currently.", numOfEngines));
86 List<String> legacyEngineInstances = engineTools.getLegacyEngineInstances();
87 if (legacyEngineInstances == null) {
91 if (legacyEngineInstances.size() < numOfEngines) { // extend
92 List<CorrelationRule> rules2Allocate = calculateRule(legacyEngineInstances, numOfEngines);
93 List<CorrelationRule> rules2Delete = copyList(rules2Allocate);
94 List<String> newInstanceIds = sortOutNewEngineInstances(engines, legacyEngineInstances);
95 distributeRules(newInstanceIds, rules2Allocate);
96 cleanUpRulesFromEngines(rules2Delete, legacyEngineInstances);
98 // If new engine instances share the same IP addresses with the old ones, the
99 // rule management module will simply leave the them to cope with the legacy rules.
100 // Here, it only takes care of the rules that need to be moved from one IP address to another.
101 List<String> destroyed = getDestroyedEngines(engines, legacyEngineInstances);
102 distributeRules(getRemainingEngines(engines, destroyed), getRules(destroyed));
106 private List<CorrelationRule> copyList(List<CorrelationRule> rules) {
107 List<CorrelationRule> ret = new ArrayList<>(rules.size());
108 for (CorrelationRule r : rules) {
109 ret.add((CorrelationRule) r.clone());
114 // When the engine is expanding, the rules that need to be allocated are calculated.
115 private List<CorrelationRule> calculateRule(List<String> existingEngineIps,
116 int latestEngineInsNum) throws CorrelationException {
117 List<CorrelationRule> enabledRules = ruleQueryWrapper.queryRuleByEnable(ENABLE);
119 if (enabledRules != null) {
120 ruleCount = enabledRules.size();
122 // Average number of rule that's to be allocate into each instance
123 int count = ruleCount / latestEngineInsNum;
124 // The number of remaining rules (to be allocated) after each instance has been allocated with the average number of rules.
125 int remainder = ruleCount % latestEngineInsNum;
127 List<CorrelationRule> ret = new ArrayList<>();
128 for (String ip : existingEngineIps) {
129 List<CorrelationRule> rules = ruleQueryWrapper.queryRuleByEngineInstance(ip);
130 List<CorrelationRule> tmp = rules.subList(count + (remainder-- / existingEngineIps.size()), rules.size());
136 // Rules that need to be allocated after the engine is destroyed
137 private List<CorrelationRule> getRules(List<String> destroyIpList) throws CorrelationException {
138 List<CorrelationRule> rules = new ArrayList<>();
140 if (destroyIpList != null) {
141 for (String ip : destroyIpList) {
142 rules.addAll(ruleQueryWrapper.queryRuleByEngineInstance(ip));
145 } catch (CorrelationException e) {
146 LOGGER.error("method getRules get data from DB failed !", e);
152 private List<String> sortOutNewEngineInstances(List<String> newIps, List<String> oldIps) {
153 List<String> ret = new ArrayList<>();
155 for (String ip : newIps) {
156 if (!oldIps.contains(ip)) {
164 private List<String> getDestroyedEngines(List<String> latest, List<String> existing) {
165 List<String> ret = new ArrayList<>();
166 for (String ip : existing) {
167 if (!latest.contains(ip)) {
174 // Residual IP after destruction
175 private List<String> getRemainingEngines(List<String> all, List<String> destroyed) {
176 List<String> ret = new ArrayList<>();
177 for (String ip : all) {
178 if (!destroyed.contains(ip)) {
185 private void distributeRules(List<String> instanceIps, List<CorrelationRule> rules) throws CorrelationException {
186 List<String> sortedIps = sortIpByRuleNumDesc(instanceIps);
188 for (int i = 0, j = 0; j < rules.size(); i++, j++) {
189 int index = i % sortedIps.size();
190 String ip = sortedIps.get(index);
191 CorrelationRule rule = rules.get(j);
192 rule.setEngineInstance(ip);
193 allocateRule(rule, ip);
197 // Sorted by the number of rules each engine contains, in a descending order.
198 private List<String> sortIpByRuleNumDesc(List<String> ips) {
199 List<CorrelationRule> rules;
200 Map<String, Integer> ruleNumOfEngines = new HashMap();
203 for (String ip : ips) {
204 rules = ruleQueryWrapper.queryRuleByEngineInstance(ip);
206 ruleNumOfEngines.put(ip, rules.size());
209 } catch (Exception e) {
210 LOGGER.error("getEngineWithLeastRules failed !", e);
213 List<Map.Entry<String, Integer>> sortedEntries = new ArrayList<>(ruleNumOfEngines.entrySet());
214 Collections.sort(sortedEntries, (o1, o2) -> o2.getValue() - o1.getValue());
216 List<String> ret = new ArrayList<>();
217 for (Map.Entry<String, Integer> entry : sortedEntries) {
218 ret.add(entry.getKey());
223 private void allocateRule(CorrelationRule rule, String ip) throws CorrelationException {
224 // Retry for a couple of times in case of deployment failure
225 // due to unfinished initialization procedures of engine instances.
226 for (int i = 0; i <= RETRY_TIMES; ++i) {
228 ruleMgtWrapper.deployRule2Engine(rule, ip);
229 correlationRuleService.updateRule(rule);
230 // If the codes reach here, it means everything's okay. There's no need to run the loop more.
232 } catch (CorrelationException e) {
233 LOGGER.warn(String.format("Failed to allocate rule <%s> to <%s>. Retry: %d.",
234 rule.getName(), ip, i), e);
235 if (i == RETRY_TIMES) {
236 throw new CorrelationException(String.format("Failed to allocate rule <%s> to <%s>",
237 rule.getName(), ip), e);
240 SECONDS.sleep(RETRY_INTERVAL_SEC * (i + 1));
241 } catch (InterruptedException interruptedException) {
242 LOGGER.info(interruptedException.getMessage(), interruptedException);
248 private void cleanUpRulesFromEngines(List<CorrelationRule> rules, List<String> ipList) {
250 for (String ip : ipList) {
251 for (CorrelationRule rule : rules) {
252 if (ip.equals(rule.getEngineInstance())) {
253 engineWrapper.deleteRuleFromEngine(rule.getPackageName(), ip);
257 } catch (CorrelationException e) {
258 LOGGER.error("When the engine is extended, deleting rule failed", e);