Merge "Expose hazelcast cluster info"
[cps.git] / cps-ncmp-service / src / main / java / org / onap / cps / ncmp / api / impl / trustlevel / DeviceHeartbeatConsumer.java
index 458c1b8..45aa631 100644 (file)
 
 package org.onap.cps.ncmp.api.impl.trustlevel;
 
-import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent;
-
-import com.hazelcast.collection.ISet;
 import io.cloudevents.CloudEvent;
 import io.cloudevents.kafka.impl.KafkaHeaders;
 import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper;
+import org.onap.cps.ncmp.events.trustlevel.DeviceTrustLevel;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.stereotype.Component;
 
-@Slf4j
 @Component
 @RequiredArgsConstructor
 public class DeviceHeartbeatConsumer {
 
-    private final ISet<String> untrustworthyCmHandlesSet;
+    private static final String CLOUD_EVENT_ID_HEADER_NAME = "ce_id";
+    private final TrustLevelManager trustLevelManager;
+
 
     /**
      * Listening the device heartbeats.
@@ -44,27 +43,17 @@ public class DeviceHeartbeatConsumer {
      * @param deviceHeartbeatConsumerRecord Device Heartbeat record.
      */
     @KafkaListener(topics = "${app.dmi.device-heartbeat.topic}",
-            containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
+        containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
     public void heartbeatListener(final ConsumerRecord<String, CloudEvent> deviceHeartbeatConsumerRecord) {
 
-        final String cmHandleId = KafkaHeaders.getParsedKafkaHeader(deviceHeartbeatConsumerRecord.headers(), "ce_id");
+        final String cmHandleId = KafkaHeaders.getParsedKafkaHeader(deviceHeartbeatConsumerRecord.headers(),
+            CLOUD_EVENT_ID_HEADER_NAME);
 
         final DeviceTrustLevel deviceTrustLevel =
-                toTargetEvent(deviceHeartbeatConsumerRecord.value(), DeviceTrustLevel.class);
+            CloudEventMapper.toTargetEvent(deviceHeartbeatConsumerRecord.value(), DeviceTrustLevel.class);
 
-        if (deviceTrustLevel == null || deviceTrustLevel.getTrustLevel() == null) {
-            log.warn("No or Invalid trust level defined");
-            return;
-        }
+        trustLevelManager.handleUpdateOfTrustLevels(cmHandleId, deviceTrustLevel.getData().getTrustLevel());
 
-        if (deviceTrustLevel.getTrustLevel().equals(TrustLevel.NONE)) {
-            untrustworthyCmHandlesSet.add(cmHandleId);
-            log.debug("Added cmHandleId to untrustworthy set : {}", cmHandleId);
-        } else if (deviceTrustLevel.getTrustLevel().equals(TrustLevel.COMPLETE) && untrustworthyCmHandlesSet.contains(
-                cmHandleId)) {
-            untrustworthyCmHandlesSet.remove(cmHandleId);
-            log.debug("Removed cmHandleId from untrustworthy set : {}", cmHandleId);
-        }
     }
 
 }