bugfix - engines cannot get ip when deploy rules during init
[holmes/engine-management.git] / engine-d / src / main / java / org / onap / holmes / engine / manager / DroolsEngine.java
1 /**
2  * Copyright 2017 ZTE Corporation.
3  * <p>
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  * <p>
8  * http://www.apache.org/licenses/LICENSE-2.0
9  * <p>
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onap.holmes.engine.manager;
17
18 import lombok.extern.slf4j.Slf4j;
19 import org.drools.compiler.kie.builder.impl.InternalKieModule;
20 import org.drools.core.util.StringUtils;
21 import org.jvnet.hk2.annotations.Service;
22 import org.kie.api.KieServices;
23 import org.kie.api.builder.*;
24 import org.kie.api.builder.Message.Level;
25 import org.kie.api.builder.model.KieBaseModel;
26 import org.kie.api.builder.model.KieModuleModel;
27 import org.kie.api.builder.model.KieSessionModel;
28 import org.kie.api.conf.EqualityBehaviorOption;
29 import org.kie.api.io.Resource;
30 import org.kie.api.runtime.KieContainer;
31 import org.kie.api.runtime.KieSession;
32 import org.kie.api.runtime.rule.FactHandle;
33 import org.onap.holmes.common.api.entity.AlarmInfo;
34 import org.onap.holmes.common.api.entity.CorrelationRule;
35 import org.onap.holmes.common.api.stat.VesAlarm;
36 import org.onap.holmes.common.config.MicroServiceConfig;
37 import org.onap.holmes.common.dmaap.store.ClosedLoopControlNameCache;
38 import org.onap.holmes.common.exception.AlarmInfoException;
39 import org.onap.holmes.common.exception.CorrelationException;
40 import org.onap.holmes.common.utils.DbDaoUtil;
41 import org.onap.holmes.common.utils.ExceptionUtil;
42 import org.onap.holmes.engine.db.AlarmInfoDao;
43 import org.onap.holmes.engine.request.DeployRuleRequest;
44 import org.onap.holmes.engine.wrapper.RuleMgtWrapper;
45
46 import javax.annotation.PostConstruct;
47 import javax.inject.Inject;
48 import java.util.ArrayList;
49 import java.util.List;
50 import java.util.Map;
51 import java.util.concurrent.ConcurrentHashMap;
52 import java.util.stream.Collectors;
53
54 @Slf4j
55 @Service
56 public class DroolsEngine {
57
58     private final static int ENABLE = 1;
59     private final Map<String, String> deployed = new ConcurrentHashMap<>();
60     private RuleMgtWrapper ruleMgtWrapper;
61     private DbDaoUtil daoUtil;
62     private ClosedLoopControlNameCache closedLoopControlNameCache;
63     private AlarmInfoDao alarmInfoDao;
64     private KieServices ks = KieServices.Factory.get();
65     private ReleaseId releaseId = ks.newReleaseId("org.onap.holmes", "rules", "1.0.0-SNAPSHOT");
66     private ReleaseId compilationRelease = ks.newReleaseId("org.onap.holmes", "compilation", "1.0.0-SNAPSHOT");
67     private KieContainer container;
68     private KieSession session;
69     private String instanceIp;
70
71     @Inject
72     public void setRuleMgtWrapper(RuleMgtWrapper ruleMgtWrapper) {
73         this.ruleMgtWrapper = ruleMgtWrapper;
74     }
75
76     @Inject
77     public void setDaoUtil(DbDaoUtil daoUtil) {
78         this.daoUtil = daoUtil;
79     }
80
81     @Inject
82     public void setClosedLoopControlNameCache(ClosedLoopControlNameCache closedLoopControlNameCache) {
83         this.closedLoopControlNameCache = closedLoopControlNameCache;
84     }
85
86     @PostConstruct
87     private void init() {
88         alarmInfoDao = daoUtil.getJdbiDaoByOnDemand(AlarmInfoDao.class);
89         instanceIp = MicroServiceConfig.getMicroServiceIpAndPort()[0];
90         try {
91             log.info("Drools engine initializing...");
92             initEngine();
93             log.info("Drools engine initialized.");
94
95             log.info("Start deploy existing rules...");
96             initRules();
97             log.info("All rules were deployed.");
98
99             log.info("Synchronizing alarms...");
100             syncAlarms();
101             log.info("Alarm synchronization succeeded.");
102         } catch (Exception e) {
103             log.error("Failed to startup the engine of Holmes: " + e.getMessage(), e);
104             throw ExceptionUtil.buildExceptionResponse("Failed to startup Drools!");
105         }
106     }
107
108     public void stop() {
109         session.dispose();
110     }
111
112     public void initEngine() {
113         KieModule km = null;
114         try {
115             String drl = "package holmes;";
116             deployed.put(getPackageName(drl), drl);
117             km = createAndDeployJar(ks, releaseId, new ArrayList<>(deployed.values()));
118         } catch (Exception e) {
119             log.error("Failed to initialize the engine service module.", e);
120         }
121         if (null != km) {
122             container = ks.newKieContainer(km.getReleaseId());
123         }
124         session = container.newKieSession();
125         deployed.clear();
126     }
127
128     private void initRules() throws CorrelationException {
129         List<CorrelationRule> rules = ruleMgtWrapper.queryRuleByEnable(ENABLE)
130                 .stream()
131                 .filter(r -> r.getEngineInstance().equals(instanceIp))
132                 .collect(Collectors.toList());
133
134         if (rules.isEmpty()) {
135             return;
136         }
137
138         for (CorrelationRule rule : rules) {
139             if (!StringUtils.isEmpty(rule.getContent())) {
140                 deployRule(rule.getContent());
141                 closedLoopControlNameCache.put(rule.getPackageName(), rule.getClosedControlLoopName());
142             }
143         }
144
145         session.fireAllRules();
146     }
147
148     public void syncAlarms() throws AlarmInfoException {
149         alarmInfoDao.queryAllAlarm().forEach(alarmInfo -> putRaisedIntoStream(convertAlarmInfo2VesAlarm(alarmInfo)));
150     }
151
152     public String deployRule(DeployRuleRequest rule) throws CorrelationException {
153         return deployRule(rule.getContent());
154     }
155
156     private synchronized String deployRule(String rule) throws CorrelationException {
157         final String packageName = getPackageName(rule);
158
159         if (StringUtils.isEmpty(packageName)) {
160             throw new CorrelationException("The package name can not be empty.");
161         }
162
163         if (deployed.containsKey(packageName)) {
164             throw new CorrelationException("A rule with the same package name already exists in the system.");
165         }
166
167         if (!StringUtils.isEmpty(rule)) {
168             deployed.put(packageName, rule);
169             try {
170                 refreshInMemRules();
171             } catch (CorrelationException e) {
172                 deployed.remove(packageName);
173                 throw e;
174             }
175             session.fireAllRules();
176         }
177
178         return packageName;
179     }
180
181     public synchronized void undeployRule(String packageName) throws CorrelationException {
182
183         if (StringUtils.isEmpty(packageName)) {
184             throw new CorrelationException("The package name should not be null.");
185         }
186
187         if (!deployed.containsKey(packageName)) {
188             throw new CorrelationException("The rule " + packageName + " does not exist!");
189         }
190
191         String removed = deployed.remove(packageName);
192         try {
193             refreshInMemRules();
194         } catch (Exception e) {
195             deployed.put(packageName, removed);
196             throw new CorrelationException("Failed to delete the rule: " + packageName, e);
197         }
198     }
199
200     private void refreshInMemRules() throws CorrelationException {
201         KieModule km = createAndDeployJar(ks, releaseId, new ArrayList<>(deployed.values()));
202         container.updateToVersion(km.getReleaseId());
203     }
204
205     public void compileRule(String content)
206             throws CorrelationException {
207
208         KieFileSystem kfs = ks.newKieFileSystem().generateAndWritePomXML(compilationRelease);
209         kfs.write("src/main/resources/rules/rule.drl", content);
210         KieBuilder builder = ks.newKieBuilder(kfs).buildAll();
211         if (builder.getResults().hasMessages(Message.Level.ERROR)) {
212             String errorMsg = "There are errors in the rule: " + builder.getResults()
213                     .getMessages(Level.ERROR).toString();
214             log.info("Compilation failure: " + errorMsg);
215             throw new CorrelationException(errorMsg);
216         }
217
218         if (deployed.containsKey(getPackageName(content))) {
219             throw new CorrelationException("There's no compilation error. But a rule with the same package name already " +
220                     "exists in the engine, which may cause a deployment failure.");
221         }
222
223         ks.getRepository().removeKieModule(compilationRelease);
224     }
225
226     public void putRaisedIntoStream(VesAlarm alarm) {
227         FactHandle factHandle = this.session.getFactHandle(alarm);
228         if (factHandle != null) {
229             Object obj = this.session.getObject(factHandle);
230             if (obj != null && obj instanceof VesAlarm) {
231                 alarm.setRootFlag(((VesAlarm) obj).getRootFlag());
232             }
233             this.session.delete(factHandle);
234         }
235
236         this.session.insert(alarm);
237
238         this.session.fireAllRules();
239     }
240
241     public List<String> queryPackagesFromEngine() {
242         return container.getKieBase().getKiePackages().stream()
243                 .filter(pkg -> pkg.getRules().size() != 0)
244                 .map(pkg -> pkg.getName())
245                 .collect(Collectors.toList());
246     }
247
248     private VesAlarm convertAlarmInfo2VesAlarm(AlarmInfo alarmInfo) {
249         VesAlarm vesAlarm = new VesAlarm();
250         vesAlarm.setEventId(alarmInfo.getEventId());
251         vesAlarm.setEventName(alarmInfo.getEventName());
252         vesAlarm.setStartEpochMicrosec(alarmInfo.getStartEpochMicroSec());
253         vesAlarm.setSourceId(alarmInfo.getSourceId());
254         vesAlarm.setSourceName(alarmInfo.getSourceName());
255         vesAlarm.setRootFlag(alarmInfo.getRootFlag());
256         vesAlarm.setAlarmIsCleared(alarmInfo.getAlarmIsCleared());
257         vesAlarm.setLastEpochMicrosec(alarmInfo.getLastEpochMicroSec());
258         return vesAlarm;
259     }
260
261     private AlarmInfo convertVesAlarm2AlarmInfo(VesAlarm vesAlarm) {
262         AlarmInfo alarmInfo = new AlarmInfo();
263         alarmInfo.setEventId(vesAlarm.getEventId());
264         alarmInfo.setEventName(vesAlarm.getEventName());
265         alarmInfo.setStartEpochMicroSec(vesAlarm.getStartEpochMicrosec());
266         alarmInfo.setLastEpochMicroSec(vesAlarm.getLastEpochMicrosec());
267         alarmInfo.setSourceId(vesAlarm.getSourceId());
268         alarmInfo.setSourceName(vesAlarm.getSourceName());
269         alarmInfo.setAlarmIsCleared(vesAlarm.getAlarmIsCleared());
270         alarmInfo.setRootFlag(vesAlarm.getRootFlag());
271
272         return alarmInfo;
273     }
274
275     private String getPackageName(String contents) {
276         String ret = contents.trim();
277         StringBuilder stringBuilder = new StringBuilder();
278         if (ret.startsWith("package")) {
279             ret = ret.substring(7).trim();
280             for (int i = 0; i < ret.length(); i++) {
281                 char tmp = ret.charAt(i);
282                 if (tmp == ';' || tmp == ' ' || tmp == '\n') {
283                     break;
284                 }
285                 stringBuilder.append(tmp);
286             }
287         }
288         return stringBuilder.toString();
289     }
290
291     private KieModule createAndDeployJar(KieServices ks, ReleaseId releaseId, List<String> drls) throws CorrelationException {
292         byte[] jar = createJar(ks, releaseId, drls);
293         KieModule km = deployJarIntoRepository(ks, jar);
294         return km;
295     }
296
297     private byte[] createJar(KieServices ks, ReleaseId releaseId, List<String> drls) throws CorrelationException {
298         KieModuleModel kieModuleModel = ks.newKieModuleModel();
299         KieBaseModel kieBaseModel = kieModuleModel.newKieBaseModel("KBase")
300                 .setDefault(true)
301                 .setEqualsBehavior(EqualityBehaviorOption.EQUALITY);
302         kieBaseModel.newKieSessionModel("KSession")
303                 .setDefault(true)
304                 .setType(KieSessionModel.KieSessionType.STATEFUL);
305         KieFileSystem kfs = ks.newKieFileSystem().writeKModuleXML(kieModuleModel.toXML()).generateAndWritePomXML(releaseId);
306
307         int i = 0;
308         for (String drl : drls) {
309             if (!StringUtils.isEmpty(drl)) {
310                 kfs.write("src/main/resources/" + getPackageName(drl) + ".drl", drl);
311             }
312         }
313         KieBuilder kb = ks.newKieBuilder(kfs).buildAll();
314         if (kb.getResults().hasMessages(Message.Level.ERROR)) {
315             StringBuilder sb = new StringBuilder();
316             for (Message msg : kb.getResults().getMessages()) {
317                 sb.append(String.format("[%s]Line: %d, Col: %d\t%s\n", msg.getLevel().toString(), msg.getLine(),
318                         msg.getColumn(), msg.getText()));
319             }
320             throw new CorrelationException("Failed to compile JAR. Details: \n" + sb.toString());
321         }
322
323         InternalKieModule kieModule = (InternalKieModule) ks.getRepository()
324                 .getKieModule(releaseId);
325
326         return kieModule.getBytes();
327     }
328
329     private KieModule deployJarIntoRepository(KieServices ks, byte[] jar) {
330         Resource jarRes = ks.getResources().newByteArrayResource(jar);
331         return ks.getRepository().addKieModule(jarRes);
332     }
333
334 }