Fixed an Issue on API Calling via MSB
[holmes/rule-management.git] / rulemgt / src / main / java / org / onap / holmes / rulemgt / send / RuleAllocator.java
1 /**
2  * Copyright 2017 ZTE Corporation.
3  * <p>
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  * <p>
8  * http://www.apache.org/licenses/LICENSE-2.0
9  * <p>
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.holmes.rulemgt.send;
18
19 import lombok.extern.slf4j.Slf4j;
20 import org.glassfish.hk2.api.ServiceLocator;
21 import org.onap.holmes.common.api.entity.CorrelationRule;
22 import org.onap.holmes.common.dropwizard.ioc.utils.ServiceLocatorHolder;
23 import org.onap.holmes.common.exception.CorrelationException;
24 import org.onap.holmes.common.utils.DbDaoUtil;
25 import org.onap.holmes.rulemgt.bolt.enginebolt.EngineWrapper;
26 import org.onap.holmes.rulemgt.db.CorrelationRuleDao;
27 import org.onap.holmes.rulemgt.msb.EngineInsQueryTool;
28 import org.onap.holmes.rulemgt.wrapper.RuleMgtWrapper;
29 import org.onap.holmes.rulemgt.wrapper.RuleQueryWrapper;
30
31 import java.util.*;
32
33
34 @Slf4j
35 public class RuleAllocator {
36     public final static int ENABLE = 1;
37     private RuleMgtWrapper ruleMgtWrapper;
38     private RuleQueryWrapper ruleQueryWrapper;
39     private EngineWrapper engineWrapper;
40     private EngineInsQueryTool engineInsQueryTool;
41     private DbDaoUtil daoUtil;
42     private CorrelationRuleDao correlationRuleDao;
43     private int latestEngineInsNum = 0;
44     private List<String> existingEngineServiceIps = new ArrayList<>();
45     private List<String> latestEngineServiceIps = new ArrayList<>();
46
47     public RuleAllocator() {
48         ServiceLocator locator = ServiceLocatorHolder.getLocator();
49         ruleMgtWrapper = locator.getService(RuleMgtWrapper.class);
50         ruleQueryWrapper = locator.getService(RuleQueryWrapper.class);
51         engineWrapper = locator.getService(EngineWrapper.class);
52         engineInsQueryTool = locator.getService(EngineInsQueryTool.class);
53         daoUtil = locator.getService(DbDaoUtil.class);
54
55         initDaoUtilAndEngineIp();
56     }
57
58     private void initDaoUtilAndEngineIp() {
59         correlationRuleDao = daoUtil.getJdbiDaoByOnDemand(CorrelationRuleDao.class);
60         try {
61             existingEngineServiceIps = engineInsQueryTool.getInstanceList();
62
63         } catch (Exception e) {
64             log.warn("Failed to get the number of engine instances.", e);
65         }
66     }
67
68     public synchronized void allocateRules(List<String> latestEngineIps) throws Exception {
69         if (latestEngineIps == null) {
70             throw new NullPointerException("The parameter of " + this.getClass().getSimpleName()
71                     + ".allocateRules(List<String>) can not be null!");
72         }
73
74         latestEngineServiceIps = latestEngineIps;
75         latestEngineInsNum = latestEngineIps.size();
76         if (existingEngineServiceIps.size() < latestEngineInsNum) {
77             //extend
78             List<CorrelationRule> rules2Allocate = calculateRule(existingEngineServiceIps);
79             List<CorrelationRule> rules2Delete = copyList(rules2Allocate);
80             List<String> newInstanceIds = sortOutNewEngineInstances(latestEngineServiceIps, existingEngineServiceIps);
81             distributeRules(newInstanceIds, rules2Allocate);
82             cleanUpRulesFromEngines(rules2Delete, existingEngineServiceIps);
83         } else if (existingEngineServiceIps.size() > latestEngineInsNum) {
84             //destroy
85             List<String> destroyed = getDestroyedEngines(latestEngineServiceIps, existingEngineServiceIps);
86             distributeRules(getRemainingEngines(destroyed), reallocateRules(destroyed));
87         }
88
89         existingEngineServiceIps = latestEngineServiceIps;
90     }
91
92     private List<CorrelationRule> copyList(List<CorrelationRule> rules) {
93         List<CorrelationRule> ret = new ArrayList<>(rules.size());
94         for (CorrelationRule r : rules) {
95             ret.add((CorrelationRule) r.clone());
96         }
97         return ret;
98     }
99
100     // When the engine is expanding, the rules that need to be allocated are calculated.
101     private List<CorrelationRule> calculateRule(List<String> existingEngineIps) throws CorrelationException {
102         List<CorrelationRule> enabledRules = ruleQueryWrapper.queryRuleByEnable(ENABLE);
103         int ruleCount = 0;
104         if (enabledRules != null) {
105             ruleCount = enabledRules.size();
106         }
107         int count = ruleCount / latestEngineInsNum;
108         int remainder = ruleCount % latestEngineInsNum;
109
110         List<CorrelationRule> ret = new ArrayList<>();
111         for (String ip : existingEngineIps) {
112             List<CorrelationRule> rules = ruleQueryWrapper.queryRuleByEngineInstance(ip);
113             List<CorrelationRule> tmp = rules.subList(count + (remainder-- / existingEngineIps.size()), rules.size());
114             ret.addAll(tmp);
115         }
116         return ret;
117     }
118
119     // Rules that need to be allocated after the engine is destroyed
120     private List<CorrelationRule> reallocateRules(List<String> destroyIpList) throws CorrelationException {
121         List<CorrelationRule> rules = new ArrayList<>();
122         try {
123             if (destroyIpList != null) {
124                 for (String ip : destroyIpList) {
125                     rules.addAll(ruleQueryWrapper.queryRuleByEngineInstance(ip));
126                 }
127             }
128         } catch (CorrelationException e) {
129             log.error("method reallocateRules get data from DB failed !", e);
130         }
131         return rules;
132     }
133
134     // Extended IP
135     private List<String> sortOutNewEngineInstances(List<String> newIps, List<String> oldIps) {
136         List<String> ret = new ArrayList<>();
137
138         for (String ip : newIps) {
139             if (!oldIps.contains(ip)) {
140                 ret.add(ip);
141             }
142         }
143         return ret;
144     }
145
146     // Destroyed IP
147     private List<String> getDestroyedEngines(List<String> latest, List<String> existing) {
148         List<String> ret = new ArrayList<>();
149         for (String ip : existing) {
150             if (!latest.contains(ip)) {
151                 ret.add(ip);
152             }
153         }
154         return ret;
155     }
156
157     // Residual IP after destruction
158     private List<String> getRemainingEngines(List<String> destroyed) {
159         List<String> ret = new ArrayList<>();
160         for (String ip : latestEngineServiceIps) {
161             if (!destroyed.contains(ip)) {
162                 ret.add(ip);
163             }
164         }
165         return ret;
166     }
167
168     private void distributeRules(List<String> instanceIps, List<CorrelationRule> rules) throws CorrelationException {
169         List<String> sortedIps = sortIpByRuleNumDesc(instanceIps);
170
171         for (int i = 0, j = 0; j < rules.size(); i++, j++) {
172             int index = i % sortedIps.size();
173             String ip = sortedIps.get(index);
174             CorrelationRule rule = rules.get(j);
175             rule.setEngineInstance(ip);
176             allocateRule(rule, ip);
177         }
178     }
179
180     // Sorted by the number of rules each engine contains, in a descending order.
181     private List<String> sortIpByRuleNumDesc(List<String> ips) {
182         List<CorrelationRule> rules = null;
183         Map<String, Integer> ruleNumOfEngines = new HashMap();
184
185         try {
186             for (String ip : ips) {
187                 rules = ruleQueryWrapper.queryRuleByEngineInstance(ip);
188                 if (rules != null) {
189                     ruleNumOfEngines.put(ip, rules.size());
190                 }
191             }
192         } catch (Exception e) {
193             log.error("getEngineIp4AddRule failed !", e);
194         }
195
196         List<Map.Entry<String, Integer>> sortedEntries = new ArrayList<>(ruleNumOfEngines.entrySet());
197         Collections.sort(sortedEntries, (o1, o2) -> o2.getValue() - o1.getValue());
198
199         List<String> ret = new ArrayList<>();
200         for (Map.Entry<String, Integer> entry : sortedEntries) {
201             ret.add(entry.getKey());
202         }
203         return ret;
204     }
205
206     private void allocateRule(CorrelationRule rule, String ip) throws CorrelationException {
207         try {
208             ruleMgtWrapper.deployRule2Engine(rule, ip);
209             correlationRuleDao.updateRule(rule);
210         } catch (CorrelationException e) {
211             throw new CorrelationException("allocate Deploy Rule failed", e);
212         }
213     }
214
215     private void cleanUpRulesFromEngines(List<CorrelationRule> rules, List<String> ipList) {
216         try {
217             for (String ip : ipList) {
218                 for (CorrelationRule rule : rules) {
219                     if (ip.equals(rule.getEngineInstance())) {
220                         engineWrapper.deleteRuleFromEngine(rule.getPackageName(), ip);
221                     }
222                 }
223             }
224         } catch (CorrelationException e) {
225             log.error("When the engine is extended, deleting rule failed", e);
226         }
227     }
228 }