Switched from Dropwizard to Springboot
[holmes/engine-management.git] / engine-d / src / main / java / org / onap / holmes / engine / manager / DroolsEngine.java
1 /**
2  * Copyright 2017 ZTE Corporation.
3  * <p>
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  * <p>
8  * http://www.apache.org/licenses/LICENSE-2.0
9  * <p>
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onap.holmes.engine.manager;
17
18 import lombok.extern.slf4j.Slf4j;
19 import org.drools.compiler.kie.builder.impl.InternalKieModule;
20 import org.drools.core.util.StringUtils;
21 import org.kie.api.KieServices;
22 import org.kie.api.builder.*;
23 import org.kie.api.builder.Message.Level;
24 import org.kie.api.builder.model.KieBaseModel;
25 import org.kie.api.builder.model.KieModuleModel;
26 import org.kie.api.builder.model.KieSessionModel;
27 import org.kie.api.conf.EqualityBehaviorOption;
28 import org.kie.api.io.Resource;
29 import org.kie.api.runtime.KieContainer;
30 import org.kie.api.runtime.KieSession;
31 import org.kie.api.runtime.rule.FactHandle;
32 import org.onap.holmes.common.api.entity.AlarmInfo;
33 import org.onap.holmes.common.api.entity.CorrelationRule;
34 import org.onap.holmes.common.api.stat.VesAlarm;
35 import org.onap.holmes.common.config.MicroServiceConfig;
36 import org.onap.holmes.common.dmaap.store.ClosedLoopControlNameCache;
37 import org.onap.holmes.common.exception.AlarmInfoException;
38 import org.onap.holmes.common.exception.CorrelationException;
39 import org.onap.holmes.common.utils.ExceptionUtil;
40 import org.onap.holmes.engine.db.AlarmInfoDaoService;
41 import org.onap.holmes.engine.request.DeployRuleRequest;
42 import org.onap.holmes.engine.wrapper.RuleMgtWrapper;
43 import org.springframework.beans.factory.annotation.Autowired;
44 import org.springframework.boot.ApplicationArguments;
45 import org.springframework.boot.ApplicationRunner;
46 import org.springframework.stereotype.Component;
47
48 import java.util.ArrayList;
49 import java.util.List;
50 import java.util.Map;
51 import java.util.concurrent.ConcurrentHashMap;
52 import java.util.stream.Collectors;
53
54 @Slf4j
55 @Component
56 public class DroolsEngine implements ApplicationRunner {
57     private final static int ENABLE = 1;
58     private final Map<String, String> deployed = new ConcurrentHashMap<>();
59     private RuleMgtWrapper ruleMgtWrapper;
60     private ClosedLoopControlNameCache closedLoopControlNameCache;
61     private AlarmInfoDaoService alarmInfoDaoService;
62     private KieServices ks = KieServices.Factory.get();
63     private ReleaseId releaseId = ks.newReleaseId("org.onap.holmes", "rules", "1.0.0-SNAPSHOT");
64     private ReleaseId compilationRelease = ks.newReleaseId("org.onap.holmes", "compilation", "1.0.0-SNAPSHOT");
65     private KieContainer container;
66     private KieSession session;
67     private String instanceIp;
68
69     @Autowired
70     public void setAlarmInfoDaoService(AlarmInfoDaoService alarmInfoDaoService) {
71         this.alarmInfoDaoService = alarmInfoDaoService;
72     }
73
74     @Autowired
75     public void setRuleMgtWrapper(RuleMgtWrapper ruleMgtWrapper) {
76         this.ruleMgtWrapper = ruleMgtWrapper;
77     }
78
79     @Autowired
80     public void setClosedLoopControlNameCache(ClosedLoopControlNameCache closedLoopControlNameCache) {
81         this.closedLoopControlNameCache = closedLoopControlNameCache;
82     }
83
84     @Override
85     public void run(ApplicationArguments args) {
86         instanceIp = MicroServiceConfig.getMicroServiceIpAndPort()[0];
87         try {
88             log.info("Drools engine initializing...");
89             initEngine();
90             log.info("Drools engine initialized.");
91
92             log.info("Start deploy existing rules...");
93             initRules();
94             log.info("All rules were deployed.");
95
96             log.info("Synchronizing alarms...");
97             syncAlarms();
98             log.info("Alarm synchronization succeeded.");
99         } catch (Exception e) {
100             log.error("Failed to startup the engine of Holmes: " + e.getMessage(), e);
101             throw ExceptionUtil.buildExceptionResponse("Failed to startup Drools!");
102         }
103     }
104
105     public void stop() {
106         session.dispose();
107     }
108
109     public void initEngine() {
110         KieModule km = null;
111         try {
112             String drl = "package holmes;";
113             deployed.put(getPackageName(drl), drl);
114             km = createAndDeployJar(ks, releaseId, new ArrayList<>(deployed.values()));
115         } catch (Exception e) {
116             log.error("Failed to initialize the engine service module.", e);
117         }
118         if (null != km) {
119             container = ks.newKieContainer(km.getReleaseId());
120         }
121         session = container.newKieSession();
122         deployed.clear();
123     }
124
125     private void initRules() throws CorrelationException {
126         List<CorrelationRule> rules = ruleMgtWrapper.queryRuleByEnable(ENABLE)
127                 .stream()
128                 .filter(r -> r.getEngineInstance().equals(instanceIp))
129                 .collect(Collectors.toList());
130
131         if (rules.isEmpty()) {
132             return;
133         }
134
135         for (CorrelationRule rule : rules) {
136             if (!StringUtils.isEmpty(rule.getContent())) {
137                 deployRule(rule.getContent());
138                 closedLoopControlNameCache.put(rule.getPackageName(), rule.getClosedControlLoopName());
139             }
140         }
141
142         session.fireAllRules();
143     }
144
145     public void syncAlarms() throws AlarmInfoException {
146         alarmInfoDaoService.queryAllAlarm().forEach(alarmInfo -> putRaisedIntoStream(convertAlarmInfo2VesAlarm(alarmInfo)));
147     }
148
149     public String deployRule(DeployRuleRequest rule) throws CorrelationException {
150         return deployRule(rule.getContent());
151     }
152
153     private synchronized String deployRule(String rule) throws CorrelationException {
154         final String packageName = getPackageName(rule);
155
156         if (StringUtils.isEmpty(packageName)) {
157             throw new CorrelationException("The package name can not be empty.");
158         }
159
160         if (deployed.containsKey(packageName)) {
161             throw new CorrelationException("A rule with the same package name already exists in the system.");
162         }
163
164         if (!StringUtils.isEmpty(rule)) {
165             deployed.put(packageName, rule);
166             try {
167                 refreshInMemRules();
168             } catch (CorrelationException e) {
169                 deployed.remove(packageName);
170                 throw e;
171             }
172             session.fireAllRules();
173         }
174
175         return packageName;
176     }
177
178     public synchronized void undeployRule(String packageName) throws CorrelationException {
179
180         if (StringUtils.isEmpty(packageName)) {
181             throw new CorrelationException("The package name should not be null.");
182         }
183
184         if (!deployed.containsKey(packageName)) {
185             throw new CorrelationException("The rule " + packageName + " does not exist!");
186         }
187
188         String removed = deployed.remove(packageName);
189         try {
190             refreshInMemRules();
191         } catch (Exception e) {
192             deployed.put(packageName, removed);
193             throw new CorrelationException("Failed to delete the rule: " + packageName, e);
194         }
195     }
196
197     private void refreshInMemRules() throws CorrelationException {
198         KieModule km = createAndDeployJar(ks, releaseId, new ArrayList<>(deployed.values()));
199         container.updateToVersion(km.getReleaseId());
200     }
201
202     public void compileRule(String content)
203             throws CorrelationException {
204
205         KieFileSystem kfs = ks.newKieFileSystem().generateAndWritePomXML(compilationRelease);
206         kfs.write("src/main/resources/rules/rule.drl", content);
207         KieBuilder builder = ks.newKieBuilder(kfs).buildAll();
208         if (builder.getResults().hasMessages(Message.Level.ERROR)) {
209             String errorMsg = "There are errors in the rule: " + builder.getResults()
210                     .getMessages(Level.ERROR).toString();
211             log.info("Compilation failure: " + errorMsg);
212             throw new CorrelationException(errorMsg);
213         }
214
215         if (deployed.containsKey(getPackageName(content))) {
216             throw new CorrelationException("There's no compilation error. But a rule with the same package name already " +
217                     "exists in the engine, which may cause a deployment failure.");
218         }
219
220         ks.getRepository().removeKieModule(compilationRelease);
221     }
222
223     public void putRaisedIntoStream(VesAlarm alarm) {
224         FactHandle factHandle = this.session.getFactHandle(alarm);
225         if (factHandle != null) {
226             Object obj = this.session.getObject(factHandle);
227             if (obj != null && obj instanceof VesAlarm) {
228                 alarm.setRootFlag(((VesAlarm) obj).getRootFlag());
229             }
230             this.session.delete(factHandle);
231         }
232
233         this.session.insert(alarm);
234
235         this.session.fireAllRules();
236     }
237
238     public List<String> queryPackagesFromEngine() {
239         return container.getKieBase().getKiePackages().stream()
240                 .filter(pkg -> pkg.getRules().size() != 0)
241                 .map(pkg -> pkg.getName())
242                 .collect(Collectors.toList());
243     }
244
245     private VesAlarm convertAlarmInfo2VesAlarm(AlarmInfo alarmInfo) {
246         VesAlarm vesAlarm = new VesAlarm();
247         vesAlarm.setEventId(alarmInfo.getEventId());
248         vesAlarm.setEventName(alarmInfo.getEventName());
249         vesAlarm.setStartEpochMicrosec(alarmInfo.getStartEpochMicroSec());
250         vesAlarm.setSourceId(alarmInfo.getSourceId());
251         vesAlarm.setSourceName(alarmInfo.getSourceName());
252         vesAlarm.setRootFlag(alarmInfo.getRootFlag());
253         vesAlarm.setAlarmIsCleared(alarmInfo.getAlarmIsCleared());
254         vesAlarm.setLastEpochMicrosec(alarmInfo.getLastEpochMicroSec());
255         return vesAlarm;
256     }
257
258     private AlarmInfo convertVesAlarm2AlarmInfo(VesAlarm vesAlarm) {
259         AlarmInfo alarmInfo = new AlarmInfo();
260         alarmInfo.setEventId(vesAlarm.getEventId());
261         alarmInfo.setEventName(vesAlarm.getEventName());
262         alarmInfo.setStartEpochMicroSec(vesAlarm.getStartEpochMicrosec());
263         alarmInfo.setLastEpochMicroSec(vesAlarm.getLastEpochMicrosec());
264         alarmInfo.setSourceId(vesAlarm.getSourceId());
265         alarmInfo.setSourceName(vesAlarm.getSourceName());
266         alarmInfo.setAlarmIsCleared(vesAlarm.getAlarmIsCleared());
267         alarmInfo.setRootFlag(vesAlarm.getRootFlag());
268
269         return alarmInfo;
270     }
271
272     private String getPackageName(String contents) {
273         String ret = contents.trim();
274         StringBuilder stringBuilder = new StringBuilder();
275         if (ret.startsWith("package")) {
276             ret = ret.substring(7).trim();
277             for (int i = 0; i < ret.length(); i++) {
278                 char tmp = ret.charAt(i);
279                 if (tmp == ';' || tmp == ' ' || tmp == '\n') {
280                     break;
281                 }
282                 stringBuilder.append(tmp);
283             }
284         }
285         return stringBuilder.toString();
286     }
287
288     private KieModule createAndDeployJar(KieServices ks, ReleaseId releaseId, List<String> drls) throws CorrelationException {
289         byte[] jar = createJar(ks, releaseId, drls);
290         KieModule km = deployJarIntoRepository(ks, jar);
291         return km;
292     }
293
294     private byte[] createJar(KieServices ks, ReleaseId releaseId, List<String> drls) throws CorrelationException {
295         KieModuleModel kieModuleModel = ks.newKieModuleModel();
296         KieBaseModel kieBaseModel = kieModuleModel.newKieBaseModel("KBase")
297                 .setDefault(true)
298                 .setEqualsBehavior(EqualityBehaviorOption.EQUALITY);
299         kieBaseModel.newKieSessionModel("KSession")
300                 .setDefault(true)
301                 .setType(KieSessionModel.KieSessionType.STATEFUL);
302         KieFileSystem kfs = ks.newKieFileSystem().writeKModuleXML(kieModuleModel.toXML()).generateAndWritePomXML(releaseId);
303
304         int i = 0;
305         for (String drl : drls) {
306             if (!StringUtils.isEmpty(drl)) {
307                 kfs.write("src/main/resources/" + getPackageName(drl) + ".drl", drl);
308             }
309         }
310         KieBuilder kb = ks.newKieBuilder(kfs).buildAll();
311         if (kb.getResults().hasMessages(Message.Level.ERROR)) {
312             StringBuilder sb = new StringBuilder();
313             for (Message msg : kb.getResults().getMessages()) {
314                 sb.append(String.format("[%s]Line: %d, Col: %d\t%s\n", msg.getLevel().toString(), msg.getLine(),
315                         msg.getColumn(), msg.getText()));
316             }
317             throw new CorrelationException("Failed to compile JAR. Details: \n" + sb.toString());
318         }
319
320         InternalKieModule kieModule = (InternalKieModule) ks.getRepository()
321                 .getKieModule(releaseId);
322
323         return kieModule.getBytes();
324     }
325
326     private KieModule deployJarIntoRepository(KieServices ks, byte[] jar) {
327         Resource jarRes = ks.getResources().newByteArrayResource(jar);
328         return ks.getRepository().addKieModule(jarRes);
329     }
330 }