add alarm synchronization related operation
[holmes/engine-management.git] / engine-d / src / main / java / org / onap / holmes / engine / dmaap / DMaaPAlarmPolling.java
index 009de8e..1446b14 100644 (file)
  */
 package org.onap.holmes.engine.dmaap;
 
-import static jdk.nashorn.internal.runtime.regexp.joni.Config.log;
-
-import java.util.ArrayList;
-import java.util.List;
 import lombok.extern.slf4j.Slf4j;
+import org.onap.holmes.common.api.entity.AlarmInfo;
 import org.onap.holmes.common.api.stat.VesAlarm;
+import org.onap.holmes.common.exception.AlarmInfoException;
 import org.onap.holmes.common.exception.CorrelationException;
 import org.onap.holmes.dsa.dmaappolling.Subscriber;
+import org.onap.holmes.engine.db.AlarmInfoDao;
 import org.onap.holmes.engine.manager.DroolsEngine;
 
+import java.util.ArrayList;
+import java.util.List;
+
 @Slf4j
 public class DMaaPAlarmPolling implements Runnable {
 
     private Subscriber subscriber;
     private DroolsEngine droolsEngine;
     private volatile boolean isAlive = true;
+    private AlarmInfoDao alarmInfoDao;
 
-    public DMaaPAlarmPolling(Subscriber subscriber, DroolsEngine droolsEngine) {
+
+    public DMaaPAlarmPolling(Subscriber subscriber, DroolsEngine droolsEngine, AlarmInfoDao alarmInfoDao) {
         this.subscriber = subscriber;
         this.droolsEngine = droolsEngine;
+        this.alarmInfoDao = alarmInfoDao;
     }
 
     public void run() {
@@ -42,13 +47,21 @@ public class DMaaPAlarmPolling implements Runnable {
             List<VesAlarm> vesAlarmList = new ArrayList<>();
             try {
                 vesAlarmList = subscriber.subscribe();
-                vesAlarmList.forEach(vesAlarm -> droolsEngine.putRaisedIntoStream(vesAlarm));
+                vesAlarmList.forEach(vesAlarm -> {
+                    try {
+                        alarmInfoDao.saveAlarm(getAlarmInfo(vesAlarm));
+                        droolsEngine.putRaisedIntoStream(vesAlarm);
+                    } catch(AlarmInfoException e) {
+                        log.error("Failed to save alarm to database", e);
+                    }
+                });
             } catch (CorrelationException e) {
                 log.error("Failed to process alarms. Sleep for 60 seconds to restart.", e);
                 try {
                     Thread.sleep(60000);
                 } catch (InterruptedException e1) {
                     log.info("Thread is still active.", e);
+                    Thread.currentThread().interrupt();
                 }
             } catch (Exception e) {
                 log.error("An error occurred while processing alarm. Sleep for 60 seconds to restart.", e);
@@ -56,10 +69,23 @@ public class DMaaPAlarmPolling implements Runnable {
                     Thread.sleep(60000);
                 } catch (InterruptedException e1) {
                     log.info("Thread is still active.", e);
+                    Thread.currentThread().interrupt();
                 }
             }
         }
     }
+    private AlarmInfo getAlarmInfo(VesAlarm vesAlarm) {
+        AlarmInfo alarmInfo = new AlarmInfo();
+        alarmInfo.setAlarmIsCleared(vesAlarm.getAlarmIsCleared());
+        alarmInfo.setSourceName(vesAlarm.getSourceName());
+        alarmInfo.setSourceId(vesAlarm.getSourceId());
+        alarmInfo.setStartEpochMicroSec(vesAlarm.getStartEpochMicrosec());
+        alarmInfo.setLastEpochMicroSec(vesAlarm.getLastEpochMicrosec());
+        alarmInfo.setEventId(vesAlarm.getEventId());
+        alarmInfo.setEventName(vesAlarm.getEventName());
+        alarmInfo.setRootFlag(vesAlarm.getRootFlag());
+        return alarmInfo;
+    }
 
     public void stopTask() {
         isAlive = false;