add alarm synchronization related operation
[holmes/engine-management.git] / engine-d / src / main / java / org / onap / holmes / engine / dmaap / DMaaPAlarmPolling.java
1 /*
2  * Copyright 2017 ZTE Corporation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onap.holmes.engine.dmaap;
17
18 import lombok.extern.slf4j.Slf4j;
19 import org.onap.holmes.common.api.entity.AlarmInfo;
20 import org.onap.holmes.common.api.stat.VesAlarm;
21 import org.onap.holmes.common.exception.AlarmInfoException;
22 import org.onap.holmes.common.exception.CorrelationException;
23 import org.onap.holmes.dsa.dmaappolling.Subscriber;
24 import org.onap.holmes.engine.db.AlarmInfoDao;
25 import org.onap.holmes.engine.manager.DroolsEngine;
26
27 import java.util.ArrayList;
28 import java.util.List;
29
30 @Slf4j
31 public class DMaaPAlarmPolling implements Runnable {
32
33     private Subscriber subscriber;
34     private DroolsEngine droolsEngine;
35     private volatile boolean isAlive = true;
36     private AlarmInfoDao alarmInfoDao;
37
38
39     public DMaaPAlarmPolling(Subscriber subscriber, DroolsEngine droolsEngine, AlarmInfoDao alarmInfoDao) {
40         this.subscriber = subscriber;
41         this.droolsEngine = droolsEngine;
42         this.alarmInfoDao = alarmInfoDao;
43     }
44
45     public void run() {
46         while (isAlive) {
47             List<VesAlarm> vesAlarmList = new ArrayList<>();
48             try {
49                 vesAlarmList = subscriber.subscribe();
50                 vesAlarmList.forEach(vesAlarm -> {
51                     try {
52                         alarmInfoDao.saveAlarm(getAlarmInfo(vesAlarm));
53                         droolsEngine.putRaisedIntoStream(vesAlarm);
54                     } catch(AlarmInfoException e) {
55                         log.error("Failed to save alarm to database", e);
56                     }
57                 });
58             } catch (CorrelationException e) {
59                 log.error("Failed to process alarms. Sleep for 60 seconds to restart.", e);
60                 try {
61                     Thread.sleep(60000);
62                 } catch (InterruptedException e1) {
63                     log.info("Thread is still active.", e);
64                     Thread.currentThread().interrupt();
65                 }
66             } catch (Exception e) {
67                 log.error("An error occurred while processing alarm. Sleep for 60 seconds to restart.", e);
68                 try {
69                     Thread.sleep(60000);
70                 } catch (InterruptedException e1) {
71                     log.info("Thread is still active.", e);
72                     Thread.currentThread().interrupt();
73                 }
74             }
75         }
76     }
77     private AlarmInfo getAlarmInfo(VesAlarm vesAlarm) {
78         AlarmInfo alarmInfo = new AlarmInfo();
79         alarmInfo.setAlarmIsCleared(vesAlarm.getAlarmIsCleared());
80         alarmInfo.setSourceName(vesAlarm.getSourceName());
81         alarmInfo.setSourceId(vesAlarm.getSourceId());
82         alarmInfo.setStartEpochMicroSec(vesAlarm.getStartEpochMicrosec());
83         alarmInfo.setLastEpochMicroSec(vesAlarm.getLastEpochMicrosec());
84         alarmInfo.setEventId(vesAlarm.getEventId());
85         alarmInfo.setEventName(vesAlarm.getEventName());
86         alarmInfo.setRootFlag(vesAlarm.getRootFlag());
87         return alarmInfo;
88     }
89
90     public void stopTask() {
91         isAlive = false;
92     }
93 }