package org.onap.cps.ncmp.api.impl.trustlevel;
-import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent;
-
-import com.hazelcast.collection.ISet;
import io.cloudevents.CloudEvent;
import io.cloudevents.kafka.impl.KafkaHeaders;
import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper;
+import org.onap.cps.ncmp.events.trustlevel.DeviceTrustLevel;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
-@Slf4j
@Component
@RequiredArgsConstructor
public class DeviceHeartbeatConsumer {
- private final ISet<String> untrustworthyCmHandlesSet;
+ private static final String CLOUD_EVENT_ID_HEADER_NAME = "ce_id";
+ private final TrustLevelManager trustLevelManager;
+
/**
* Listening the device heartbeats.
* @param deviceHeartbeatConsumerRecord Device Heartbeat record.
*/
@KafkaListener(topics = "${app.dmi.device-heartbeat.topic}",
- containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
+ containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
public void heartbeatListener(final ConsumerRecord<String, CloudEvent> deviceHeartbeatConsumerRecord) {
- final String cmHandleId = KafkaHeaders.getParsedKafkaHeader(deviceHeartbeatConsumerRecord.headers(), "ce_id");
+ final String cmHandleId = KafkaHeaders.getParsedKafkaHeader(deviceHeartbeatConsumerRecord.headers(),
+ CLOUD_EVENT_ID_HEADER_NAME);
final DeviceTrustLevel deviceTrustLevel =
- toTargetEvent(deviceHeartbeatConsumerRecord.value(), DeviceTrustLevel.class);
+ CloudEventMapper.toTargetEvent(deviceHeartbeatConsumerRecord.value(), DeviceTrustLevel.class);
- if (deviceTrustLevel == null || deviceTrustLevel.getTrustLevel() == null) {
- log.warn("No or Invalid trust level defined");
- return;
- }
+ trustLevelManager.handleUpdateOfTrustLevels(cmHandleId, deviceTrustLevel.getData().getTrustLevel());
- if (deviceTrustLevel.getTrustLevel().equals(TrustLevel.NONE)) {
- untrustworthyCmHandlesSet.add(cmHandleId);
- log.debug("Added cmHandleId to untrustworthy set : {}", cmHandleId);
- } else if (deviceTrustLevel.getTrustLevel().equals(TrustLevel.COMPLETE) && untrustworthyCmHandlesSet.contains(
- cmHandleId)) {
- untrustworthyCmHandlesSet.remove(cmHandleId);
- log.debug("Removed cmHandleId from untrustworthy set : {}", cmHandleId);
- }
}
}