import io.cloudevents.CloudEvent;
import io.cloudevents.kafka.impl.KafkaHeaders;
-import java.util.Map;
import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper;
import org.onap.cps.ncmp.events.trustlevel.DeviceTrustLevel;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
-@Slf4j
@Component
@RequiredArgsConstructor
public class DeviceHeartbeatConsumer {
private static final String CLOUD_EVENT_ID_HEADER_NAME = "ce_id";
- private final Map<String, TrustLevel> trustLevelPerCmHandle;
+ private final TrustLevelManager trustLevelManager;
+
/**
* Listening the device heartbeats.
* @param deviceHeartbeatConsumerRecord Device Heartbeat record.
*/
@KafkaListener(topics = "${app.dmi.device-heartbeat.topic}",
- containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
+ containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
public void heartbeatListener(final ConsumerRecord<String, CloudEvent> deviceHeartbeatConsumerRecord) {
final String cmHandleId = KafkaHeaders.getParsedKafkaHeader(deviceHeartbeatConsumerRecord.headers(),
- CLOUD_EVENT_ID_HEADER_NAME);
+ CLOUD_EVENT_ID_HEADER_NAME);
final DeviceTrustLevel deviceTrustLevel =
- CloudEventMapper.toTargetEvent(deviceHeartbeatConsumerRecord.value(), DeviceTrustLevel.class);
+ CloudEventMapper.toTargetEvent(deviceHeartbeatConsumerRecord.value(), DeviceTrustLevel.class);
+
+ trustLevelManager.handleUpdateOfTrustLevels(cmHandleId, deviceTrustLevel.getData().getTrustLevel());
- if (cmHandleId != null && deviceTrustLevel != null) {
- final String trustLevel = deviceTrustLevel.getData().getTrustLevel();
- trustLevelPerCmHandle.put(cmHandleId, TrustLevel.valueOf(trustLevel));
- log.debug("Added cmHandleId to trustLevelPerCmHandle map as {}:{}", cmHandleId, trustLevel);
- }
}
}