package org.onap.cps.ncmp.api.impl.trustlevel;
-import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent;
-
-import com.hazelcast.collection.ISet;
import io.cloudevents.CloudEvent;
import io.cloudevents.kafka.impl.KafkaHeaders;
+import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper;
+import org.onap.cps.ncmp.events.trustlevel.DeviceTrustLevel;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@RequiredArgsConstructor
public class DeviceHeartbeatConsumer {
- private final ISet<String> untrustworthyCmHandlesSet;
+ private static final String CLOUD_EVENT_ID_HEADER_NAME = "ce_id";
+ private final Map<String, TrustLevel> trustLevelPerCmHandle;
/**
* Listening the device heartbeats.
containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
public void heartbeatListener(final ConsumerRecord<String, CloudEvent> deviceHeartbeatConsumerRecord) {
- final String cmHandleId = KafkaHeaders.getParsedKafkaHeader(deviceHeartbeatConsumerRecord.headers(), "ce_id");
+ final String cmHandleId = KafkaHeaders.getParsedKafkaHeader(deviceHeartbeatConsumerRecord.headers(),
+ CLOUD_EVENT_ID_HEADER_NAME);
final DeviceTrustLevel deviceTrustLevel =
- toTargetEvent(deviceHeartbeatConsumerRecord.value(), DeviceTrustLevel.class);
-
- if (deviceTrustLevel == null || deviceTrustLevel.getTrustLevel() == null) {
- log.warn("No or Invalid trust level defined");
- return;
- }
+ CloudEventMapper.toTargetEvent(deviceHeartbeatConsumerRecord.value(), DeviceTrustLevel.class);
- if (deviceTrustLevel.getTrustLevel().equals(TrustLevel.NONE)) {
- untrustworthyCmHandlesSet.add(cmHandleId);
- log.debug("Added cmHandleId to untrustworthy set : {}", cmHandleId);
- } else if (deviceTrustLevel.getTrustLevel().equals(TrustLevel.COMPLETE) && untrustworthyCmHandlesSet.contains(
- cmHandleId)) {
- untrustworthyCmHandlesSet.remove(cmHandleId);
- log.debug("Removed cmHandleId from untrustworthy set : {}", cmHandleId);
+ if (cmHandleId != null && deviceTrustLevel != null) {
+ final String trustLevel = deviceTrustLevel.getData().getTrustLevel();
+ trustLevelPerCmHandle.put(cmHandleId, TrustLevel.valueOf(trustLevel));
+ log.debug("Added cmHandleId to trustLevelPerCmHandle map as {}:{}", cmHandleId, trustLevel);
}
}