Added fix for potential nullpointerexception
[holmes/engine-management.git] / engine-d / src / main / java / org / onap / holmes / engine / manager / DroolsEngine.java
1 /**
2  * Copyright 2017 ZTE Corporation.
3  * <p>
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  * <p>
8  * http://www.apache.org/licenses/LICENSE-2.0
9  * <p>
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onap.holmes.engine.manager;
17
18 import java.util.*;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.stream.Collectors;
21 import javax.annotation.PostConstruct;
22 import javax.inject.Inject;
23
24 import lombok.extern.slf4j.Slf4j;
25 import org.drools.compiler.kie.builder.impl.InternalKieModule;
26 import org.drools.core.util.StringUtils;
27 import org.jvnet.hk2.annotations.Service;
28
29 import org.kie.api.KieServices;
30 import org.kie.api.builder.*;
31 import org.kie.api.builder.Message.Level;
32 import org.kie.api.io.Resource;
33 import org.kie.api.runtime.KieContainer;
34 import org.kie.api.runtime.KieSession;
35 import org.kie.api.runtime.rule.FactHandle;
36
37 import org.onap.holmes.common.api.entity.AlarmInfo;
38
39 import org.onap.holmes.common.api.stat.VesAlarm;
40 import org.onap.holmes.common.dmaap.DmaapService;
41 import org.onap.holmes.common.exception.AlarmInfoException;
42 import org.onap.holmes.common.utils.DbDaoUtil;
43 import org.onap.holmes.engine.db.AlarmInfoDao;
44 import org.onap.holmes.engine.request.DeployRuleRequest;
45 import org.onap.holmes.common.api.entity.CorrelationRule;
46 import org.onap.holmes.common.exception.CorrelationException;
47 import org.onap.holmes.common.utils.ExceptionUtil;
48 import org.onap.holmes.engine.wrapper.RuleMgtWrapper;
49
50 @Slf4j
51 @Service
52 public class DroolsEngine {
53
54     @Inject
55     private RuleMgtWrapper ruleMgtWrapper;
56     @Inject
57     private DbDaoUtil daoUtil;
58
59     private final static int ENABLE = 1;
60     private AlarmInfoDao alarmInfoDao;
61     private final Map<String, String> deployed = new ConcurrentHashMap<>();
62     private KieServices ks = KieServices.Factory.get();
63     private ReleaseId releaseId = ks.newReleaseId("org.onap.holmes", "rules", "1.0.0-SNAPSHOT");
64     private ReleaseId compilationRelease = ks.newReleaseId("org.onap.holmes", "compilation", "1.0.0-SNAPSHOT");
65     private KieContainer container;
66     private KieSession session;
67
68     @PostConstruct
69     private void init() {
70         alarmInfoDao = daoUtil.getJdbiDaoByOnDemand(AlarmInfoDao.class);
71         try {
72             log.info("Drools engine initializing...");
73             initEngine();
74             log.info("Drools engine initialized.");
75
76             log.info("Start deploy existing rules...");
77             initRules();
78             log.info("All rules were deployed.");
79
80             log.info("Synchronizing alarms...");
81             syncAlarms();
82             log.info("Alarm synchronization succeeded.");
83         } catch (Exception e) {
84             log.error("Failed to startup the engine of Holmes: " + e.getMessage(), e);
85             throw ExceptionUtil.buildExceptionResponse("Failed to startup Drools!");
86         }
87     }
88
89     public void stop() {
90         session.dispose();
91     }
92
93     public void initEngine() {
94         KieModule km = null;
95         try {
96             String drl = "package holmes;";
97             deployed.put(getPackageName(drl), drl);
98             km = createAndDeployJar(ks, releaseId, new ArrayList<>(deployed.values()));
99         } catch (Exception e) {
100             log.error("Failed to initialize the engine service module.", e);
101         }
102         if(null!=km) {
103                 container = ks.newKieContainer(km.getReleaseId());
104         }
105         session = container.newKieSession();
106         deployed.clear();
107     }
108
109     private void initRules() throws CorrelationException {
110         List<CorrelationRule> rules = ruleMgtWrapper.queryRuleByEnable(ENABLE);
111         if (rules.isEmpty()) {
112             return;
113         }
114
115         for (CorrelationRule rule : rules) {
116             if (!StringUtils.isEmpty(rule.getContent())) {
117                 deployRule(rule.getContent());
118                 DmaapService.loopControlNames.put(rule.getPackageName(), rule.getClosedControlLoopName());
119             }
120         }
121
122         session.fireAllRules();
123     }
124
125     public void syncAlarms() throws AlarmInfoException {
126         alarmInfoDao.queryAllAlarm().forEach(alarmInfo -> alarmInfoDao.deleteClearedAlarm(alarmInfo));
127         alarmInfoDao.queryAllAlarm().forEach(alarmInfo -> putRaisedIntoStream(convertAlarmInfo2VesAlarm(alarmInfo)));
128     }
129
130     public String deployRule(DeployRuleRequest rule) throws CorrelationException {
131         return deployRule(rule.getContent());
132     }
133
134     private synchronized String deployRule(String rule) throws CorrelationException {
135         final String packageName = getPackageName(rule);
136
137         if (StringUtils.isEmpty(packageName)) {
138             throw new CorrelationException("The package name can not be empty.");
139         }
140
141         if (deployed.containsKey(packageName)) {
142             throw new CorrelationException("A rule with the same package name already exists in the system.");
143         }
144
145         if (!StringUtils.isEmpty(rule)) {
146             deployed.put(packageName, rule);
147             try {
148                 refreshInMemRules();
149             } catch (CorrelationException e) {
150                 deployed.remove(packageName);
151                 throw e;
152             }
153             session.fireAllRules();
154         }
155
156         return packageName;
157     }
158
159     public synchronized void undeployRule(String packageName) throws CorrelationException {
160
161         if (StringUtils.isEmpty(packageName)) {
162             throw new CorrelationException("The package name should not be null.");
163         }
164
165         if (!deployed.containsKey(packageName)) {
166             throw new CorrelationException("The rule " + packageName + " does not exist!");
167         }
168
169         String removed = deployed.remove(packageName);
170         try {
171             refreshInMemRules();
172         } catch (Exception e) {
173             deployed.put(packageName, removed);
174             throw new CorrelationException("Failed to delete the rule: " + packageName, e);
175         }
176     }
177
178     private void refreshInMemRules() throws CorrelationException {
179         KieModule km = createAndDeployJar(ks, releaseId, new ArrayList<>(deployed.values()));
180         container.updateToVersion(km.getReleaseId());
181     }
182
183     public void compileRule(String content)
184             throws CorrelationException {
185
186         KieFileSystem kfs = ks.newKieFileSystem().generateAndWritePomXML(compilationRelease);
187         kfs.write("src/main/resources/rules/rule.drl", content);
188         KieBuilder builder = ks.newKieBuilder(kfs).buildAll();
189         if (builder.getResults().hasMessages(Message.Level.ERROR)) {
190             String errorMsg = "There are errors in the rule: " + builder.getResults()
191                     .getMessages(Level.ERROR).toString();
192             log.info("Compilation failure: " + errorMsg);
193             throw new CorrelationException(errorMsg);
194         }
195
196         if (deployed.containsKey(getPackageName(content))) {
197             throw new CorrelationException("There's no compilation error. But a rule with the same package name already " +
198                     "exists in the engine, which may cause a deployment failure.");
199         }
200
201         ks.getRepository().removeKieModule(compilationRelease);
202     }
203
204     public void putRaisedIntoStream(VesAlarm alarm) {
205         FactHandle factHandle = this.session.getFactHandle(alarm);
206         if (factHandle != null) {
207             Object obj = this.session.getObject(factHandle);
208             if (obj != null && obj instanceof VesAlarm) {
209                 alarm.setRootFlag(((VesAlarm) obj).getRootFlag());
210             }
211             this.session.delete(factHandle);
212
213             if (alarm.getAlarmIsCleared() == 1) {
214                 alarmInfoDao.deleteClearedAlarm(convertVesAlarm2AlarmInfo(alarm));
215             }
216         } else {
217             this.session.insert(alarm);
218         }
219
220         this.session.fireAllRules();
221     }
222
223     public List<String> queryPackagesFromEngine() {
224         return container.getKieBase().getKiePackages().stream()
225                 .filter(pkg -> pkg.getRules().size() != 0)
226                 .map(pkg -> pkg.getName())
227                 .collect(Collectors.toList());
228     }
229
230
231
232     private VesAlarm convertAlarmInfo2VesAlarm(AlarmInfo alarmInfo) {
233         VesAlarm vesAlarm = new VesAlarm();
234         vesAlarm.setEventId(alarmInfo.getEventId());
235         vesAlarm.setEventName(alarmInfo.getEventName());
236         vesAlarm.setStartEpochMicrosec(alarmInfo.getStartEpochMicroSec());
237         vesAlarm.setSourceId(alarmInfo.getSourceId());
238         vesAlarm.setSourceName(alarmInfo.getSourceName());
239         vesAlarm.setRootFlag(alarmInfo.getRootFlag());
240         vesAlarm.setAlarmIsCleared(alarmInfo.getAlarmIsCleared());
241         vesAlarm.setLastEpochMicrosec(alarmInfo.getLastEpochMicroSec());
242         return vesAlarm;
243     }
244
245     private AlarmInfo convertVesAlarm2AlarmInfo(VesAlarm vesAlarm) {
246         AlarmInfo alarmInfo = new AlarmInfo();
247         alarmInfo.setEventId(vesAlarm.getEventId());
248         alarmInfo.setEventName(vesAlarm.getEventName());
249         alarmInfo.setStartEpochMicroSec(vesAlarm.getStartEpochMicrosec());
250         alarmInfo.setLastEpochMicroSec(vesAlarm.getLastEpochMicrosec());
251         alarmInfo.setSourceId(vesAlarm.getSourceId());
252         alarmInfo.setSourceName(vesAlarm.getSourceName());
253         alarmInfo.setAlarmIsCleared(vesAlarm.getAlarmIsCleared());
254         alarmInfo.setRootFlag(vesAlarm.getRootFlag());
255
256         return alarmInfo;
257     }
258
259     private String getPackageName(String contents) {
260         String ret = contents.trim();
261         StringBuilder stringBuilder = new StringBuilder();
262         if (ret.startsWith("package")) {
263             ret = ret.substring(7).trim();
264             for (int i = 0; i < ret.length(); i++) {
265                 char tmp = ret.charAt(i);
266                 if (tmp == ';' || tmp == ' ' || tmp == '\n') {
267                     break;
268                 }
269                 stringBuilder.append(tmp);
270             }
271         }
272         return stringBuilder.toString();
273     }
274
275     private KieModule createAndDeployJar(KieServices ks, ReleaseId releaseId, List<String> drls) throws CorrelationException {
276         byte[] jar = createJar(ks, releaseId, drls);
277         KieModule km = deployJarIntoRepository(ks, jar);
278         return km;
279     }
280
281     private byte[] createJar(KieServices ks, ReleaseId releaseId, List<String> drls) throws CorrelationException {
282         KieFileSystem kfs = ks.newKieFileSystem().generateAndWritePomXML(releaseId);
283         int i = 0;
284         for (String drl : drls) {
285             if (!StringUtils.isEmpty(drl)) {
286                 kfs.write("src/main/resources/" + getPackageName(drl) + ".drl", drl);
287             }
288         }
289         KieBuilder kb = ks.newKieBuilder(kfs).buildAll();
290         if (kb.getResults().hasMessages(Message.Level.ERROR)) {
291             StringBuilder sb = new StringBuilder();
292             for (Message msg : kb.getResults().getMessages()) {
293                 sb.append(String.format("[%s]Line: %d, Col: %d\t%s\n", msg.getLevel().toString(), msg.getLine(),
294                         msg.getColumn(), msg.getText()));
295             }
296             throw new CorrelationException("Failed to compile JAR. Details: \n" + sb.toString());
297         }
298
299         InternalKieModule kieModule = (InternalKieModule) ks.getRepository()
300                 .getKieModule(releaseId);
301
302         return kieModule.getBytes();
303     }
304
305     private KieModule deployJarIntoRepository(KieServices ks, byte[] jar) {
306         Resource jarRes = ks.getResources().newByteArrayResource(jar);
307         return ks.getRepository().addKieModule(jarRes);
308     }
309
310 }