Fixed the ABATED issue for Holmes
[holmes/engine-management.git] / engine-d / src / main / java / org / onap / holmes / engine / manager / DroolsEngine.java
1 /**
2  * Copyright 2017 ZTE Corporation.
3  * <p>
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  * <p>
8  * http://www.apache.org/licenses/LICENSE-2.0
9  * <p>
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onap.holmes.engine.manager;
17
18 import lombok.extern.slf4j.Slf4j;
19 import org.drools.compiler.kie.builder.impl.InternalKieModule;
20 import org.drools.core.util.StringUtils;
21 import org.jvnet.hk2.annotations.Service;
22 import org.kie.api.KieServices;
23 import org.kie.api.builder.*;
24 import org.kie.api.builder.Message.Level;
25 import org.kie.api.builder.model.KieBaseModel;
26 import org.kie.api.builder.model.KieModuleModel;
27 import org.kie.api.builder.model.KieSessionModel;
28 import org.kie.api.conf.EqualityBehaviorOption;
29 import org.kie.api.conf.EventProcessingOption;
30 import org.kie.api.io.Resource;
31 import org.kie.api.runtime.KieContainer;
32 import org.kie.api.runtime.KieSession;
33 import org.kie.api.runtime.conf.ClockTypeOption;
34 import org.kie.api.runtime.rule.FactHandle;
35 import org.onap.holmes.common.api.entity.AlarmInfo;
36 import org.onap.holmes.common.api.entity.CorrelationRule;
37 import org.onap.holmes.common.api.stat.VesAlarm;
38 import org.onap.holmes.common.dmaap.DmaapService;
39 import org.onap.holmes.common.exception.AlarmInfoException;
40 import org.onap.holmes.common.exception.CorrelationException;
41 import org.onap.holmes.common.utils.DbDaoUtil;
42 import org.onap.holmes.common.utils.ExceptionUtil;
43 import org.onap.holmes.engine.db.AlarmInfoDao;
44 import org.onap.holmes.engine.request.DeployRuleRequest;
45 import org.onap.holmes.engine.wrapper.RuleMgtWrapper;
46
47 import javax.annotation.PostConstruct;
48 import javax.inject.Inject;
49 import java.util.ArrayList;
50 import java.util.List;
51 import java.util.Map;
52 import java.util.concurrent.ConcurrentHashMap;
53 import java.util.stream.Collectors;
54
55 @Slf4j
56 @Service
57 public class DroolsEngine {
58
59     @Inject
60     private RuleMgtWrapper ruleMgtWrapper;
61     @Inject
62     private DbDaoUtil daoUtil;
63
64     private final static int ENABLE = 1;
65     private AlarmInfoDao alarmInfoDao;
66     private final Map<String, String> deployed = new ConcurrentHashMap<>();
67     private KieServices ks = KieServices.Factory.get();
68     private ReleaseId releaseId = ks.newReleaseId("org.onap.holmes", "rules", "1.0.0-SNAPSHOT");
69     private ReleaseId compilationRelease = ks.newReleaseId("org.onap.holmes", "compilation", "1.0.0-SNAPSHOT");
70     private KieContainer container;
71     private KieSession session;
72
73     @PostConstruct
74     private void init() {
75         alarmInfoDao = daoUtil.getJdbiDaoByOnDemand(AlarmInfoDao.class);
76         try {
77             log.info("Drools engine initializing...");
78             initEngine();
79             log.info("Drools engine initialized.");
80
81             log.info("Start deploy existing rules...");
82             initRules();
83             log.info("All rules were deployed.");
84
85             log.info("Synchronizing alarms...");
86             syncAlarms();
87             log.info("Alarm synchronization succeeded.");
88         } catch (Exception e) {
89             log.error("Failed to startup the engine of Holmes: " + e.getMessage(), e);
90             throw ExceptionUtil.buildExceptionResponse("Failed to startup Drools!");
91         }
92     }
93
94     public void stop() {
95         session.dispose();
96     }
97
98     public void initEngine() {
99         KieModule km = null;
100         try {
101             String drl = "package holmes;";
102             deployed.put(getPackageName(drl), drl);
103             km = createAndDeployJar(ks, releaseId, new ArrayList<>(deployed.values()));
104         } catch (Exception e) {
105             log.error("Failed to initialize the engine service module.", e);
106         }
107         if (null != km) {
108             container = ks.newKieContainer(km.getReleaseId());
109         }
110         session = container.newKieSession();
111         deployed.clear();
112     }
113
114     private void initRules() throws CorrelationException {
115         List<CorrelationRule> rules = ruleMgtWrapper.queryRuleByEnable(ENABLE);
116         if (rules.isEmpty()) {
117             return;
118         }
119
120         for (CorrelationRule rule : rules) {
121             if (!StringUtils.isEmpty(rule.getContent())) {
122                 deployRule(rule.getContent());
123                 DmaapService.loopControlNames.put(rule.getPackageName(), rule.getClosedControlLoopName());
124             }
125         }
126
127         session.fireAllRules();
128     }
129
130     public void syncAlarms() throws AlarmInfoException {
131         alarmInfoDao.queryAllAlarm().forEach(alarmInfo -> putRaisedIntoStream(convertAlarmInfo2VesAlarm(alarmInfo)));
132     }
133
134     public String deployRule(DeployRuleRequest rule) throws CorrelationException {
135         return deployRule(rule.getContent());
136     }
137
138     private synchronized String deployRule(String rule) throws CorrelationException {
139         final String packageName = getPackageName(rule);
140
141         if (StringUtils.isEmpty(packageName)) {
142             throw new CorrelationException("The package name can not be empty.");
143         }
144
145         if (deployed.containsKey(packageName)) {
146             throw new CorrelationException("A rule with the same package name already exists in the system.");
147         }
148
149         if (!StringUtils.isEmpty(rule)) {
150             deployed.put(packageName, rule);
151             try {
152                 refreshInMemRules();
153             } catch (CorrelationException e) {
154                 deployed.remove(packageName);
155                 throw e;
156             }
157             session.fireAllRules();
158         }
159
160         return packageName;
161     }
162
163     public synchronized void undeployRule(String packageName) throws CorrelationException {
164
165         if (StringUtils.isEmpty(packageName)) {
166             throw new CorrelationException("The package name should not be null.");
167         }
168
169         if (!deployed.containsKey(packageName)) {
170             throw new CorrelationException("The rule " + packageName + " does not exist!");
171         }
172
173         String removed = deployed.remove(packageName);
174         try {
175             refreshInMemRules();
176         } catch (Exception e) {
177             deployed.put(packageName, removed);
178             throw new CorrelationException("Failed to delete the rule: " + packageName, e);
179         }
180     }
181
182     private void refreshInMemRules() throws CorrelationException {
183         KieModule km = createAndDeployJar(ks, releaseId, new ArrayList<>(deployed.values()));
184         container.updateToVersion(km.getReleaseId());
185     }
186
187     public void compileRule(String content)
188             throws CorrelationException {
189
190         KieFileSystem kfs = ks.newKieFileSystem().generateAndWritePomXML(compilationRelease);
191         kfs.write("src/main/resources/rules/rule.drl", content);
192         KieBuilder builder = ks.newKieBuilder(kfs).buildAll();
193         if (builder.getResults().hasMessages(Message.Level.ERROR)) {
194             String errorMsg = "There are errors in the rule: " + builder.getResults()
195                     .getMessages(Level.ERROR).toString();
196             log.info("Compilation failure: " + errorMsg);
197             throw new CorrelationException(errorMsg);
198         }
199
200         if (deployed.containsKey(getPackageName(content))) {
201             throw new CorrelationException("There's no compilation error. But a rule with the same package name already " +
202                     "exists in the engine, which may cause a deployment failure.");
203         }
204
205         ks.getRepository().removeKieModule(compilationRelease);
206     }
207
208     public void putRaisedIntoStream(VesAlarm alarm) {
209         FactHandle factHandle = this.session.getFactHandle(alarm);
210         if (factHandle != null) {
211             Object obj = this.session.getObject(factHandle);
212             if (obj != null && obj instanceof VesAlarm) {
213                 alarm.setRootFlag(((VesAlarm) obj).getRootFlag());
214             }
215             this.session.delete(factHandle);
216         }
217
218         this.session.insert(alarm);
219
220         this.session.fireAllRules();
221     }
222
223     public List<String> queryPackagesFromEngine() {
224         return container.getKieBase().getKiePackages().stream()
225                 .filter(pkg -> pkg.getRules().size() != 0)
226                 .map(pkg -> pkg.getName())
227                 .collect(Collectors.toList());
228     }
229
230     private VesAlarm convertAlarmInfo2VesAlarm(AlarmInfo alarmInfo) {
231         VesAlarm vesAlarm = new VesAlarm();
232         vesAlarm.setEventId(alarmInfo.getEventId());
233         vesAlarm.setEventName(alarmInfo.getEventName());
234         vesAlarm.setStartEpochMicrosec(alarmInfo.getStartEpochMicroSec());
235         vesAlarm.setSourceId(alarmInfo.getSourceId());
236         vesAlarm.setSourceName(alarmInfo.getSourceName());
237         vesAlarm.setRootFlag(alarmInfo.getRootFlag());
238         vesAlarm.setAlarmIsCleared(alarmInfo.getAlarmIsCleared());
239         vesAlarm.setLastEpochMicrosec(alarmInfo.getLastEpochMicroSec());
240         return vesAlarm;
241     }
242
243     private AlarmInfo convertVesAlarm2AlarmInfo(VesAlarm vesAlarm) {
244         AlarmInfo alarmInfo = new AlarmInfo();
245         alarmInfo.setEventId(vesAlarm.getEventId());
246         alarmInfo.setEventName(vesAlarm.getEventName());
247         alarmInfo.setStartEpochMicroSec(vesAlarm.getStartEpochMicrosec());
248         alarmInfo.setLastEpochMicroSec(vesAlarm.getLastEpochMicrosec());
249         alarmInfo.setSourceId(vesAlarm.getSourceId());
250         alarmInfo.setSourceName(vesAlarm.getSourceName());
251         alarmInfo.setAlarmIsCleared(vesAlarm.getAlarmIsCleared());
252         alarmInfo.setRootFlag(vesAlarm.getRootFlag());
253
254         return alarmInfo;
255     }
256
257     private String getPackageName(String contents) {
258         String ret = contents.trim();
259         StringBuilder stringBuilder = new StringBuilder();
260         if (ret.startsWith("package")) {
261             ret = ret.substring(7).trim();
262             for (int i = 0; i < ret.length(); i++) {
263                 char tmp = ret.charAt(i);
264                 if (tmp == ';' || tmp == ' ' || tmp == '\n') {
265                     break;
266                 }
267                 stringBuilder.append(tmp);
268             }
269         }
270         return stringBuilder.toString();
271     }
272
273     private KieModule createAndDeployJar(KieServices ks, ReleaseId releaseId, List<String> drls) throws CorrelationException {
274         byte[] jar = createJar(ks, releaseId, drls);
275         KieModule km = deployJarIntoRepository(ks, jar);
276         return km;
277     }
278
279     private byte[] createJar(KieServices ks, ReleaseId releaseId, List<String> drls) throws CorrelationException {
280         KieModuleModel kieModuleModel = ks.newKieModuleModel();
281         KieBaseModel kieBaseModel = kieModuleModel.newKieBaseModel("KBase")
282                 .setDefault(true)
283                 .setEqualsBehavior(EqualityBehaviorOption.EQUALITY);
284         kieBaseModel.newKieSessionModel("KSession")
285                 .setDefault(true)
286                 .setType(KieSessionModel.KieSessionType.STATEFUL);
287         KieFileSystem kfs = ks.newKieFileSystem().writeKModuleXML(kieModuleModel.toXML()).generateAndWritePomXML(releaseId);
288
289         int i = 0;
290         for (String drl : drls) {
291             if (!StringUtils.isEmpty(drl)) {
292                 kfs.write("src/main/resources/" + getPackageName(drl) + ".drl", drl);
293             }
294         }
295         KieBuilder kb = ks.newKieBuilder(kfs).buildAll();
296         if (kb.getResults().hasMessages(Message.Level.ERROR)) {
297             StringBuilder sb = new StringBuilder();
298             for (Message msg : kb.getResults().getMessages()) {
299                 sb.append(String.format("[%s]Line: %d, Col: %d\t%s\n", msg.getLevel().toString(), msg.getLine(),
300                         msg.getColumn(), msg.getText()));
301             }
302             throw new CorrelationException("Failed to compile JAR. Details: \n" + sb.toString());
303         }
304
305         InternalKieModule kieModule = (InternalKieModule) ks.getRepository()
306                 .getKieModule(releaseId);
307
308         return kieModule.getBytes();
309     }
310
311     private KieModule deployJarIntoRepository(KieServices ks, byte[] jar) {
312         Resource jarRes = ks.getResources().newByteArrayResource(jar);
313         return ks.getRepository().addKieModule(jarRes);
314     }
315
316 }