From f4778800c815fbc962b194a177525957a564231d Mon Sep 17 00:00:00 2001 From: mpriyank Date: Wed, 26 Jul 2023 17:33:35 +0100 Subject: [PATCH] Device heartbeat listener - Infrastructure code to have the kafka listener and distributed set in place - performance tested locally - testware added Issue-ID: CPS-1642 Change-Id: I775dbe6e6b520b8777faa08610db439877757572 Signed-off-by: mpriyank --- cps-application/src/main/resources/application.yml | 2 + .../embeddedcache/TrustLevelCacheConfig.java | 46 ++++++++++ .../impl/trustlevel/DeviceHeartbeatConsumer.java | 71 ++++++++++++++ .../ncmp/api/impl/trustlevel/DeviceTrustLevel.java | 37 ++++++++ .../cps/ncmp/api/impl/trustlevel/TrustLevel.java | 25 +++++ .../trustlevel/DeviceHeartbeatConsumerSpec.groovy | 102 +++++++++++++++++++++ .../org/onap/cps/cache/HazelcastCacheConfig.java | 12 +++ .../onap/cps/cache/HazelcastCacheConfigSpec.groovy | 13 ++- 8 files changed, 305 insertions(+), 3 deletions(-) create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/TrustLevelCacheConfig.java create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/DeviceHeartbeatConsumer.java create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/DeviceTrustLevel.java create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/TrustLevel.java create mode 100644 cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/trustlevel/DeviceHeartbeatConsumerSpec.groovy diff --git a/cps-application/src/main/resources/application.yml b/cps-application/src/main/resources/application.yml index a18de2acd..6aefda9c3 100644 --- a/cps-application/src/main/resources/application.yml +++ b/cps-application/src/main/resources/application.yml @@ -109,6 +109,8 @@ app: dmi: cm-events: topic: ${DMI_CM_EVENTS_TOPIC:dmi-cm-events} + device-heartbeat: + topic: ${DMI_DEVICE_HEARTBEAT_TOPIC:dmi-device-heartbeat} notification: diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/TrustLevelCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/TrustLevelCacheConfig.java new file mode 100644 index 000000000..816fc5067 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/TrustLevelCacheConfig.java @@ -0,0 +1,46 @@ +/* + * ============LICENSE_START======================================================== + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.config.embeddedcache; + +import com.hazelcast.collection.ISet; +import com.hazelcast.config.SetConfig; +import org.onap.cps.cache.HazelcastCacheConfig; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class TrustLevelCacheConfig extends HazelcastCacheConfig { + + private static final SetConfig untrustworthyCmHandlesSetConfig = createSetConfig("untrustworthyCmHandlesSetConfig"); + + /** + * Untrustworthy cmhandle set instance. + * + * @return instance of distributed set of untrustworthy cmhandles. + */ + @Bean + public ISet untrustworthyCmHandlesSet() { + return createHazelcastInstance("untrustworthyCmHandlesSet", untrustworthyCmHandlesSetConfig).getSet( + "untrustworthyCmHandlesSet"); + } + + +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/DeviceHeartbeatConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/DeviceHeartbeatConsumer.java new file mode 100644 index 000000000..458c1b851 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/DeviceHeartbeatConsumer.java @@ -0,0 +1,71 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.trustlevel; + +import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent; + +import com.hazelcast.collection.ISet; +import io.cloudevents.CloudEvent; +import io.cloudevents.kafka.impl.KafkaHeaders; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@RequiredArgsConstructor +public class DeviceHeartbeatConsumer { + + private final ISet untrustworthyCmHandlesSet; + + /** + * Listening the device heartbeats. + * + * @param deviceHeartbeatConsumerRecord Device Heartbeat record. + */ + @KafkaListener(topics = "${app.dmi.device-heartbeat.topic}", + containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory") + public void heartbeatListener(final ConsumerRecord deviceHeartbeatConsumerRecord) { + + final String cmHandleId = KafkaHeaders.getParsedKafkaHeader(deviceHeartbeatConsumerRecord.headers(), "ce_id"); + + final DeviceTrustLevel deviceTrustLevel = + toTargetEvent(deviceHeartbeatConsumerRecord.value(), DeviceTrustLevel.class); + + if (deviceTrustLevel == null || deviceTrustLevel.getTrustLevel() == null) { + log.warn("No or Invalid trust level defined"); + return; + } + + if (deviceTrustLevel.getTrustLevel().equals(TrustLevel.NONE)) { + untrustworthyCmHandlesSet.add(cmHandleId); + log.debug("Added cmHandleId to untrustworthy set : {}", cmHandleId); + } else if (deviceTrustLevel.getTrustLevel().equals(TrustLevel.COMPLETE) && untrustworthyCmHandlesSet.contains( + cmHandleId)) { + untrustworthyCmHandlesSet.remove(cmHandleId); + log.debug("Removed cmHandleId from untrustworthy set : {}", cmHandleId); + } + } + +} + diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/DeviceTrustLevel.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/DeviceTrustLevel.java new file mode 100644 index 000000000..2ed4e4522 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/DeviceTrustLevel.java @@ -0,0 +1,37 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.trustlevel; + +import java.io.Serializable; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@AllArgsConstructor +@Data +@NoArgsConstructor +class DeviceTrustLevel implements Serializable { + + private static final long serialVersionUID = -1705715024067165212L; + + private TrustLevel trustLevel; + +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/TrustLevel.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/TrustLevel.java new file mode 100644 index 000000000..f4254bb47 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/TrustLevel.java @@ -0,0 +1,25 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.trustlevel; + +public enum TrustLevel { + NONE, COMPLETE; +} \ No newline at end of file diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/trustlevel/DeviceHeartbeatConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/trustlevel/DeviceHeartbeatConsumerSpec.groovy new file mode 100644 index 000000000..48de23dca --- /dev/null +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/trustlevel/DeviceHeartbeatConsumerSpec.groovy @@ -0,0 +1,102 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.trustlevel + +import com.fasterxml.jackson.databind.ObjectMapper +import com.hazelcast.collection.ISet +import io.cloudevents.CloudEvent +import io.cloudevents.core.builder.CloudEventBuilder +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.onap.cps.utils.JsonObjectMapper +import org.springframework.boot.test.context.SpringBootTest +import spock.lang.Specification + +@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper]) +class DeviceHeartbeatConsumerSpec extends Specification { + + def mockUntrustworthyCmHandlesSet = Mock(ISet) + def objectMapper = new ObjectMapper() + + def objectUnderTest = new DeviceHeartbeatConsumer(mockUntrustworthyCmHandlesSet) + + def 'Operations to be done in an empty untrustworthy set for #scenario'() { + given: 'an event with trustlevel as #trustLevel' + def incomingEvent = testCloudEvent(trustLevel) + and: 'transformed as a kafka record' + def consumerRecord = new ConsumerRecord('test-device-heartbeat', 0, 0, 'cmhandle1', incomingEvent) + consumerRecord.headers().add('ce_id', objectMapper.writeValueAsBytes('cmhandle1')) + when: 'the event is consumed' + objectUnderTest.heartbeatListener(consumerRecord) + then: 'untrustworthy cmhandles are stored' + untrustworthyCmHandlesSetInvocationForAdd * mockUntrustworthyCmHandlesSet.add(_) + and: 'trustworthy cmHandles will be removed from untrustworthy set' + untrustworthyCmHandlesSetInvocationForContains * mockUntrustworthyCmHandlesSet.contains(_) + + where: 'below scenarios are applicable' + scenario | trustLevel || untrustworthyCmHandlesSetInvocationForAdd | untrustworthyCmHandlesSetInvocationForContains + 'None trust' | TrustLevel.NONE || 1 | 0 + 'Complete trust' | TrustLevel.COMPLETE || 0 | 1 + } + + def 'Invalid trust'() { + when: 'we provide an invalid trust in the event' + def consumerRecord = new ConsumerRecord('test-device-heartbeat', 0, 0, 'cmhandle1', testCloudEvent(null)) + consumerRecord.headers().add('ce_id', objectMapper.writeValueAsBytes('cmhandle1')) + objectUnderTest.heartbeatListener(consumerRecord) + then: 'no interaction with the untrustworthy cmhandles set' + 0 * mockUntrustworthyCmHandlesSet.add(_) + 0 * mockUntrustworthyCmHandlesSet.contains(_) + 0 * mockUntrustworthyCmHandlesSet.remove(_) + and: 'control flow returns without any exception' + noExceptionThrown() + + } + + def 'Remove trustworthy cmhandles from untrustworthy cmhandles set'() { + given: 'an event with COMPLETE trustlevel' + def incomingEvent = testCloudEvent(TrustLevel.COMPLETE) + and: 'transformed as a kafka record' + def consumerRecord = new ConsumerRecord('test-device-heartbeat', 0, 0, 'cmhandle1', incomingEvent) + consumerRecord.headers().add('ce_id', objectMapper.writeValueAsBytes('cmhandle1')) + and: 'untrustworthy cmhandles set contains cmhandle1' + 1 * mockUntrustworthyCmHandlesSet.contains(_) >> true + when: 'the event is consumed' + objectUnderTest.heartbeatListener(consumerRecord) + then: 'cmhandle removed from untrustworthy cmhandles set' + 1 * mockUntrustworthyCmHandlesSet.remove(_) >> { + args -> + { + args[0].equals('cmhandle1') + } + } + + } + + def testCloudEvent(trustLevel) { + return CloudEventBuilder.v1().withData(objectMapper.writeValueAsBytes(new DeviceTrustLevel(trustLevel))) + .withId("cmhandle1") + .withSource(URI.create('DMI')) + .withDataSchema(URI.create('test')) + .withType('org.onap.cm.events.trustlevel-notification') + .build() + } + +} diff --git a/cps-service/src/main/java/org/onap/cps/cache/HazelcastCacheConfig.java b/cps-service/src/main/java/org/onap/cps/cache/HazelcastCacheConfig.java index 405e6e2a8..067191b5a 100644 --- a/cps-service/src/main/java/org/onap/cps/cache/HazelcastCacheConfig.java +++ b/cps-service/src/main/java/org/onap/cps/cache/HazelcastCacheConfig.java @@ -24,6 +24,7 @@ import com.hazelcast.config.Config; import com.hazelcast.config.MapConfig; import com.hazelcast.config.NamedConfig; import com.hazelcast.config.QueueConfig; +import com.hazelcast.config.SetConfig; import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; import lombok.extern.slf4j.Slf4j; @@ -57,6 +58,10 @@ public class HazelcastCacheConfig { if (namedConfig instanceof QueueConfig) { config.addQueueConfig((QueueConfig) namedConfig); } + if (namedConfig instanceof SetConfig) { + config.addSetConfig((SetConfig) namedConfig); + } + config.setClusterName(clusterName); updateDiscoveryMode(config); return config; @@ -76,6 +81,13 @@ public class HazelcastCacheConfig { return commonQueueConfig; } + protected static SetConfig createSetConfig(final String configName) { + final SetConfig commonSetConfig = new SetConfig(configName); + commonSetConfig.setBackupCount(1); + commonSetConfig.setAsyncBackupCount(1); + return commonSetConfig; + } + protected void updateDiscoveryMode(final Config config) { if (cacheKubernetesEnabled) { log.info("Enabling kubernetes mode with service-name : {}", cacheKubernetesServiceName); diff --git a/cps-service/src/test/groovy/org/onap/cps/cache/HazelcastCacheConfigSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/cache/HazelcastCacheConfigSpec.groovy index 8efd48547..415e9fd49 100644 --- a/cps-service/src/test/groovy/org/onap/cps/cache/HazelcastCacheConfigSpec.groovy +++ b/cps-service/src/test/groovy/org/onap/cps/cache/HazelcastCacheConfigSpec.groovy @@ -45,10 +45,17 @@ class HazelcastCacheConfigSpec extends Specification { } else { assert result.config.queueConfigs.isEmpty() } + and: 'if applicable it has a set config with the expected name' + if (expectSetConfig) { + assert result.config.setConfigs.values()[0].name == 'my set config' + } else { + assert result.config.setConfigs.isEmpty() + } where: 'the following configs are used' - scenario | config || expectMapConfig | expectQueueConfig - 'Map Config' | HazelcastCacheConfig.createMapConfig('my map config') || true | false - 'Queue Config' | HazelcastCacheConfig.createQueueConfig('my queue config') || false | true + scenario | config || expectMapConfig | expectQueueConfig | expectSetConfig + 'Map Config' | HazelcastCacheConfig.createMapConfig('my map config') || true | false | false + 'Queue Config' | HazelcastCacheConfig.createQueueConfig('my queue config') || false | true | false + 'Set Config' | HazelcastCacheConfig.createSetConfig('my set config') || false | false | true } } -- 2.16.6