Remove Alarm Info from DB when alarms are cleared
[holmes/engine-management.git] / engine-d / src / main / java / org / onap / holmes / engine / manager / DroolsEngine.java
1 /**
2  * Copyright 2017 ZTE Corporation.
3  * <p>
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  * <p>
8  * http://www.apache.org/licenses/LICENSE-2.0
9  * <p>
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onap.holmes.engine.manager;
17
18 import lombok.extern.slf4j.Slf4j;
19 import org.drools.compiler.kie.builder.impl.InternalKieModule;
20 import org.drools.core.util.StringUtils;
21 import org.jvnet.hk2.annotations.Service;
22 import org.kie.api.KieServices;
23 import org.kie.api.builder.*;
24 import org.kie.api.builder.Message.Level;
25 import org.kie.api.io.Resource;
26 import org.kie.api.runtime.KieContainer;
27 import org.kie.api.runtime.KieSession;
28 import org.kie.api.runtime.rule.FactHandle;
29 import org.onap.holmes.common.api.entity.AlarmInfo;
30 import org.onap.holmes.common.api.entity.CorrelationRule;
31 import org.onap.holmes.common.api.stat.VesAlarm;
32 import org.onap.holmes.common.dmaap.DmaapService;
33 import org.onap.holmes.common.exception.AlarmInfoException;
34 import org.onap.holmes.common.exception.CorrelationException;
35 import org.onap.holmes.common.utils.DbDaoUtil;
36 import org.onap.holmes.common.utils.ExceptionUtil;
37 import org.onap.holmes.engine.db.AlarmInfoDao;
38 import org.onap.holmes.engine.request.DeployRuleRequest;
39 import org.onap.holmes.engine.wrapper.RuleMgtWrapper;
40
41 import javax.annotation.PostConstruct;
42 import javax.inject.Inject;
43 import java.util.ArrayList;
44 import java.util.List;
45 import java.util.Map;
46 import java.util.concurrent.ConcurrentHashMap;
47 import java.util.stream.Collectors;
48
49 @Slf4j
50 @Service
51 public class DroolsEngine {
52
53     @Inject
54     private RuleMgtWrapper ruleMgtWrapper;
55     @Inject
56     private DbDaoUtil daoUtil;
57
58     private final static int ENABLE = 1;
59     private AlarmInfoDao alarmInfoDao;
60     private final Map<String, String> deployed = new ConcurrentHashMap<>();
61     private KieServices ks = KieServices.Factory.get();
62     private ReleaseId releaseId = ks.newReleaseId("org.onap.holmes", "rules", "1.0.0-SNAPSHOT");
63     private ReleaseId compilationRelease = ks.newReleaseId("org.onap.holmes", "compilation", "1.0.0-SNAPSHOT");
64     private KieContainer container;
65     private KieSession session;
66
67     @PostConstruct
68     private void init() {
69         alarmInfoDao = daoUtil.getJdbiDaoByOnDemand(AlarmInfoDao.class);
70         try {
71             log.info("Drools engine initializing...");
72             initEngine();
73             log.info("Drools engine initialized.");
74
75             log.info("Start deploy existing rules...");
76             initRules();
77             log.info("All rules were deployed.");
78
79             log.info("Synchronizing alarms...");
80             syncAlarms();
81             log.info("Alarm synchronization succeeded.");
82         } catch (Exception e) {
83             log.error("Failed to startup the engine of Holmes: " + e.getMessage(), e);
84             throw ExceptionUtil.buildExceptionResponse("Failed to startup Drools!");
85         }
86     }
87
88     public void stop() {
89         session.dispose();
90     }
91
92     public void initEngine() {
93         KieModule km = null;
94         try {
95             String drl = "package holmes;";
96             deployed.put(getPackageName(drl), drl);
97             km = createAndDeployJar(ks, releaseId, new ArrayList<>(deployed.values()));
98         } catch (Exception e) {
99             log.error("Failed to initialize the engine service module.", e);
100         }
101         if (null != km) {
102             container = ks.newKieContainer(km.getReleaseId());
103         }
104         session = container.newKieSession();
105         deployed.clear();
106     }
107
108     private void initRules() throws CorrelationException {
109         List<CorrelationRule> rules = ruleMgtWrapper.queryRuleByEnable(ENABLE);
110         if (rules.isEmpty()) {
111             return;
112         }
113
114         for (CorrelationRule rule : rules) {
115             if (!StringUtils.isEmpty(rule.getContent())) {
116                 deployRule(rule.getContent());
117                 DmaapService.loopControlNames.put(rule.getPackageName(), rule.getClosedControlLoopName());
118             }
119         }
120
121         session.fireAllRules();
122     }
123
124     public void syncAlarms() throws AlarmInfoException {
125         alarmInfoDao.queryAllAlarm().forEach(alarmInfo -> putRaisedIntoStream(convertAlarmInfo2VesAlarm(alarmInfo)));
126     }
127
128     public String deployRule(DeployRuleRequest rule) throws CorrelationException {
129         return deployRule(rule.getContent());
130     }
131
132     private synchronized String deployRule(String rule) throws CorrelationException {
133         final String packageName = getPackageName(rule);
134
135         if (StringUtils.isEmpty(packageName)) {
136             throw new CorrelationException("The package name can not be empty.");
137         }
138
139         if (deployed.containsKey(packageName)) {
140             throw new CorrelationException("A rule with the same package name already exists in the system.");
141         }
142
143         if (!StringUtils.isEmpty(rule)) {
144             deployed.put(packageName, rule);
145             try {
146                 refreshInMemRules();
147             } catch (CorrelationException e) {
148                 deployed.remove(packageName);
149                 throw e;
150             }
151             session.fireAllRules();
152         }
153
154         return packageName;
155     }
156
157     public synchronized void undeployRule(String packageName) throws CorrelationException {
158
159         if (StringUtils.isEmpty(packageName)) {
160             throw new CorrelationException("The package name should not be null.");
161         }
162
163         if (!deployed.containsKey(packageName)) {
164             throw new CorrelationException("The rule " + packageName + " does not exist!");
165         }
166
167         String removed = deployed.remove(packageName);
168         try {
169             refreshInMemRules();
170         } catch (Exception e) {
171             deployed.put(packageName, removed);
172             throw new CorrelationException("Failed to delete the rule: " + packageName, e);
173         }
174     }
175
176     private void refreshInMemRules() throws CorrelationException {
177         KieModule km = createAndDeployJar(ks, releaseId, new ArrayList<>(deployed.values()));
178         container.updateToVersion(km.getReleaseId());
179     }
180
181     public void compileRule(String content)
182             throws CorrelationException {
183
184         KieFileSystem kfs = ks.newKieFileSystem().generateAndWritePomXML(compilationRelease);
185         kfs.write("src/main/resources/rules/rule.drl", content);
186         KieBuilder builder = ks.newKieBuilder(kfs).buildAll();
187         if (builder.getResults().hasMessages(Message.Level.ERROR)) {
188             String errorMsg = "There are errors in the rule: " + builder.getResults()
189                     .getMessages(Level.ERROR).toString();
190             log.info("Compilation failure: " + errorMsg);
191             throw new CorrelationException(errorMsg);
192         }
193
194         if (deployed.containsKey(getPackageName(content))) {
195             throw new CorrelationException("There's no compilation error. But a rule with the same package name already " +
196                     "exists in the engine, which may cause a deployment failure.");
197         }
198
199         ks.getRepository().removeKieModule(compilationRelease);
200     }
201
202     public void putRaisedIntoStream(VesAlarm alarm) {
203         FactHandle factHandle = this.session.getFactHandle(alarm);
204         if (factHandle != null) {
205             Object obj = this.session.getObject(factHandle);
206             if (obj != null && obj instanceof VesAlarm) {
207                 alarm.setRootFlag(((VesAlarm) obj).getRootFlag());
208             }
209             this.session.delete(factHandle);
210         }
211
212         this.session.insert(alarm);
213
214         this.session.fireAllRules();
215     }
216
217     public List<String> queryPackagesFromEngine() {
218         return container.getKieBase().getKiePackages().stream()
219                 .filter(pkg -> pkg.getRules().size() != 0)
220                 .map(pkg -> pkg.getName())
221                 .collect(Collectors.toList());
222     }
223
224     private VesAlarm convertAlarmInfo2VesAlarm(AlarmInfo alarmInfo) {
225         VesAlarm vesAlarm = new VesAlarm();
226         vesAlarm.setEventId(alarmInfo.getEventId());
227         vesAlarm.setEventName(alarmInfo.getEventName());
228         vesAlarm.setStartEpochMicrosec(alarmInfo.getStartEpochMicroSec());
229         vesAlarm.setSourceId(alarmInfo.getSourceId());
230         vesAlarm.setSourceName(alarmInfo.getSourceName());
231         vesAlarm.setRootFlag(alarmInfo.getRootFlag());
232         vesAlarm.setAlarmIsCleared(alarmInfo.getAlarmIsCleared());
233         vesAlarm.setLastEpochMicrosec(alarmInfo.getLastEpochMicroSec());
234         return vesAlarm;
235     }
236
237     private AlarmInfo convertVesAlarm2AlarmInfo(VesAlarm vesAlarm) {
238         AlarmInfo alarmInfo = new AlarmInfo();
239         alarmInfo.setEventId(vesAlarm.getEventId());
240         alarmInfo.setEventName(vesAlarm.getEventName());
241         alarmInfo.setStartEpochMicroSec(vesAlarm.getStartEpochMicrosec());
242         alarmInfo.setLastEpochMicroSec(vesAlarm.getLastEpochMicrosec());
243         alarmInfo.setSourceId(vesAlarm.getSourceId());
244         alarmInfo.setSourceName(vesAlarm.getSourceName());
245         alarmInfo.setAlarmIsCleared(vesAlarm.getAlarmIsCleared());
246         alarmInfo.setRootFlag(vesAlarm.getRootFlag());
247
248         return alarmInfo;
249     }
250
251     private String getPackageName(String contents) {
252         String ret = contents.trim();
253         StringBuilder stringBuilder = new StringBuilder();
254         if (ret.startsWith("package")) {
255             ret = ret.substring(7).trim();
256             for (int i = 0; i < ret.length(); i++) {
257                 char tmp = ret.charAt(i);
258                 if (tmp == ';' || tmp == ' ' || tmp == '\n') {
259                     break;
260                 }
261                 stringBuilder.append(tmp);
262             }
263         }
264         return stringBuilder.toString();
265     }
266
267     private KieModule createAndDeployJar(KieServices ks, ReleaseId releaseId, List<String> drls) throws CorrelationException {
268         byte[] jar = createJar(ks, releaseId, drls);
269         KieModule km = deployJarIntoRepository(ks, jar);
270         return km;
271     }
272
273     private byte[] createJar(KieServices ks, ReleaseId releaseId, List<String> drls) throws CorrelationException {
274         KieFileSystem kfs = ks.newKieFileSystem().generateAndWritePomXML(releaseId);
275         int i = 0;
276         for (String drl : drls) {
277             if (!StringUtils.isEmpty(drl)) {
278                 kfs.write("src/main/resources/" + getPackageName(drl) + ".drl", drl);
279             }
280         }
281         KieBuilder kb = ks.newKieBuilder(kfs).buildAll();
282         if (kb.getResults().hasMessages(Message.Level.ERROR)) {
283             StringBuilder sb = new StringBuilder();
284             for (Message msg : kb.getResults().getMessages()) {
285                 sb.append(String.format("[%s]Line: %d, Col: %d\t%s\n", msg.getLevel().toString(), msg.getLine(),
286                         msg.getColumn(), msg.getText()));
287             }
288             throw new CorrelationException("Failed to compile JAR. Details: \n" + sb.toString());
289         }
290
291         InternalKieModule kieModule = (InternalKieModule) ks.getRepository()
292                 .getKieModule(releaseId);
293
294         return kieModule.getBytes();
295     }
296
297     private KieModule deployJarIntoRepository(KieServices ks, byte[] jar) {
298         Resource jarRes = ks.getResources().newByteArrayResource(jar);
299         return ks.getRepository().addKieModule(jarRes);
300     }
301
302 }