X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=cps-ncmp-service%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fcps%2Fncmp%2Fapi%2Fimpl%2Ftrustlevel%2FDeviceHeartbeatConsumer.java;h=b6d74d980f40ee539a88a810b367b668bec2c779;hb=7aae9b8af2c82336882d0ae864b970b1885a63a8;hp=458c1b85187faa2bfbef305c13ccbda3a34ffbc0;hpb=4bc19fe118ca7d81891c12d40719a569bc528a21;p=cps.git diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/DeviceHeartbeatConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/DeviceHeartbeatConsumer.java index 458c1b851..b6d74d980 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/DeviceHeartbeatConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/DeviceHeartbeatConsumer.java @@ -20,14 +20,14 @@ package org.onap.cps.ncmp.api.impl.trustlevel; -import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent; - -import com.hazelcast.collection.ISet; import io.cloudevents.CloudEvent; import io.cloudevents.kafka.impl.KafkaHeaders; +import java.util.Map; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper; +import org.onap.cps.ncmp.events.trustlevel.DeviceTrustLevel; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @@ -36,7 +36,8 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class DeviceHeartbeatConsumer { - private final ISet untrustworthyCmHandlesSet; + private static final String CLOUD_EVENT_ID_HEADER_NAME = "ce_id"; + private final Map trustLevelPerCmHandle; /** * Listening the device heartbeats. @@ -47,23 +48,16 @@ public class DeviceHeartbeatConsumer { containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory") public void heartbeatListener(final ConsumerRecord deviceHeartbeatConsumerRecord) { - final String cmHandleId = KafkaHeaders.getParsedKafkaHeader(deviceHeartbeatConsumerRecord.headers(), "ce_id"); + final String cmHandleId = KafkaHeaders.getParsedKafkaHeader(deviceHeartbeatConsumerRecord.headers(), + CLOUD_EVENT_ID_HEADER_NAME); final DeviceTrustLevel deviceTrustLevel = - toTargetEvent(deviceHeartbeatConsumerRecord.value(), DeviceTrustLevel.class); - - if (deviceTrustLevel == null || deviceTrustLevel.getTrustLevel() == null) { - log.warn("No or Invalid trust level defined"); - return; - } + CloudEventMapper.toTargetEvent(deviceHeartbeatConsumerRecord.value(), DeviceTrustLevel.class); - if (deviceTrustLevel.getTrustLevel().equals(TrustLevel.NONE)) { - untrustworthyCmHandlesSet.add(cmHandleId); - log.debug("Added cmHandleId to untrustworthy set : {}", cmHandleId); - } else if (deviceTrustLevel.getTrustLevel().equals(TrustLevel.COMPLETE) && untrustworthyCmHandlesSet.contains( - cmHandleId)) { - untrustworthyCmHandlesSet.remove(cmHandleId); - log.debug("Removed cmHandleId from untrustworthy set : {}", cmHandleId); + if (cmHandleId != null && deviceTrustLevel != null) { + final String trustLevel = deviceTrustLevel.getData().getTrustLevel(); + trustLevelPerCmHandle.put(cmHandleId, TrustLevel.valueOf(trustLevel)); + log.debug("Added cmHandleId to trustLevelPerCmHandle map as {}:{}", cmHandleId, trustLevel); } }