Device heartbeat listener 09/135609/10
authormpriyank <priyank.maheshwari@est.tech>
Wed, 26 Jul 2023 16:33:35 +0000 (17:33 +0100)
committermpriyank <priyank.maheshwari@est.tech>
Fri, 18 Aug 2023 12:40:10 +0000 (13:40 +0100)
- Infrastructure code to have the kafka listener and distributed set in
  place
- performance tested locally
- testware added

Issue-ID: CPS-1642
Change-Id: I775dbe6e6b520b8777faa08610db439877757572
Signed-off-by: mpriyank <priyank.maheshwari@est.tech>
cps-application/src/main/resources/application.yml
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/TrustLevelCacheConfig.java [new file with mode: 0644]
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/DeviceHeartbeatConsumer.java [new file with mode: 0644]
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/DeviceTrustLevel.java [new file with mode: 0644]
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/TrustLevel.java [new file with mode: 0644]
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/trustlevel/DeviceHeartbeatConsumerSpec.groovy [new file with mode: 0644]
cps-service/src/main/java/org/onap/cps/cache/HazelcastCacheConfig.java
cps-service/src/test/groovy/org/onap/cps/cache/HazelcastCacheConfigSpec.groovy

index a18de2a..6aefda9 100644 (file)
@@ -109,6 +109,8 @@ app:
     dmi:
         cm-events:
             topic: ${DMI_CM_EVENTS_TOPIC:dmi-cm-events}
+        device-heartbeat:
+            topic: ${DMI_DEVICE_HEARTBEAT_TOPIC:dmi-device-heartbeat}
 
 
 notification:
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/TrustLevelCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/TrustLevelCacheConfig.java
new file mode 100644 (file)
index 0000000..816fc50
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * ============LICENSE_START========================================================
+ *  Copyright (C) 2023 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.config.embeddedcache;
+
+import com.hazelcast.collection.ISet;
+import com.hazelcast.config.SetConfig;
+import org.onap.cps.cache.HazelcastCacheConfig;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class TrustLevelCacheConfig extends HazelcastCacheConfig {
+
+    private static final SetConfig untrustworthyCmHandlesSetConfig = createSetConfig("untrustworthyCmHandlesSetConfig");
+
+    /**
+     * Untrustworthy cmhandle set instance.
+     *
+     * @return instance of distributed set of untrustworthy cmhandles.
+     */
+    @Bean
+    public ISet<String> untrustworthyCmHandlesSet() {
+        return createHazelcastInstance("untrustworthyCmHandlesSet", untrustworthyCmHandlesSetConfig).getSet(
+                "untrustworthyCmHandlesSet");
+    }
+
+
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/DeviceHeartbeatConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/DeviceHeartbeatConsumer.java
new file mode 100644 (file)
index 0000000..458c1b8
--- /dev/null
@@ -0,0 +1,71 @@
+/*
+ *  ============LICENSE_START=======================================================
+ *  Copyright (C) 2023 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.trustlevel;
+
+import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent;
+
+import com.hazelcast.collection.ISet;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.kafka.impl.KafkaHeaders;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class DeviceHeartbeatConsumer {
+
+    private final ISet<String> untrustworthyCmHandlesSet;
+
+    /**
+     * Listening the device heartbeats.
+     *
+     * @param deviceHeartbeatConsumerRecord Device Heartbeat record.
+     */
+    @KafkaListener(topics = "${app.dmi.device-heartbeat.topic}",
+            containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
+    public void heartbeatListener(final ConsumerRecord<String, CloudEvent> deviceHeartbeatConsumerRecord) {
+
+        final String cmHandleId = KafkaHeaders.getParsedKafkaHeader(deviceHeartbeatConsumerRecord.headers(), "ce_id");
+
+        final DeviceTrustLevel deviceTrustLevel =
+                toTargetEvent(deviceHeartbeatConsumerRecord.value(), DeviceTrustLevel.class);
+
+        if (deviceTrustLevel == null || deviceTrustLevel.getTrustLevel() == null) {
+            log.warn("No or Invalid trust level defined");
+            return;
+        }
+
+        if (deviceTrustLevel.getTrustLevel().equals(TrustLevel.NONE)) {
+            untrustworthyCmHandlesSet.add(cmHandleId);
+            log.debug("Added cmHandleId to untrustworthy set : {}", cmHandleId);
+        } else if (deviceTrustLevel.getTrustLevel().equals(TrustLevel.COMPLETE) && untrustworthyCmHandlesSet.contains(
+                cmHandleId)) {
+            untrustworthyCmHandlesSet.remove(cmHandleId);
+            log.debug("Removed cmHandleId from untrustworthy set : {}", cmHandleId);
+        }
+    }
+
+}
+
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/DeviceTrustLevel.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/DeviceTrustLevel.java
new file mode 100644 (file)
index 0000000..2ed4e45
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ *  ============LICENSE_START=======================================================
+ *  Copyright (C) 2023 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.trustlevel;
+
+import java.io.Serializable;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@AllArgsConstructor
+@Data
+@NoArgsConstructor
+class DeviceTrustLevel implements Serializable {
+
+    private static final long serialVersionUID = -1705715024067165212L;
+
+    private TrustLevel trustLevel;
+
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/TrustLevel.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/TrustLevel.java
new file mode 100644 (file)
index 0000000..f4254bb
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ *  ============LICENSE_START=======================================================
+ *  Copyright (C) 2023 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.trustlevel;
+
+public enum TrustLevel {
+    NONE, COMPLETE;
+}
\ No newline at end of file
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/trustlevel/DeviceHeartbeatConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/trustlevel/DeviceHeartbeatConsumerSpec.groovy
new file mode 100644 (file)
index 0000000..48de23d
--- /dev/null
@@ -0,0 +1,102 @@
+/*
+ *  ============LICENSE_START=======================================================
+ *  Copyright (C) 2023 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.trustlevel
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.hazelcast.collection.ISet
+import io.cloudevents.CloudEvent
+import io.cloudevents.core.builder.CloudEventBuilder
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.onap.cps.utils.JsonObjectMapper
+import org.springframework.boot.test.context.SpringBootTest
+import spock.lang.Specification
+
+@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
+class DeviceHeartbeatConsumerSpec extends Specification {
+
+    def mockUntrustworthyCmHandlesSet = Mock(ISet<String>)
+    def objectMapper = new ObjectMapper()
+
+    def objectUnderTest = new DeviceHeartbeatConsumer(mockUntrustworthyCmHandlesSet)
+
+    def 'Operations to be done in an empty untrustworthy set for #scenario'() {
+        given: 'an event with trustlevel as #trustLevel'
+            def incomingEvent = testCloudEvent(trustLevel)
+        and: 'transformed as a kafka record'
+            def consumerRecord = new ConsumerRecord<String, CloudEvent>('test-device-heartbeat', 0, 0, 'cmhandle1', incomingEvent)
+            consumerRecord.headers().add('ce_id', objectMapper.writeValueAsBytes('cmhandle1'))
+        when: 'the event is consumed'
+            objectUnderTest.heartbeatListener(consumerRecord)
+        then: 'untrustworthy cmhandles are stored'
+            untrustworthyCmHandlesSetInvocationForAdd * mockUntrustworthyCmHandlesSet.add(_)
+        and: 'trustworthy cmHandles will be removed from untrustworthy set'
+            untrustworthyCmHandlesSetInvocationForContains * mockUntrustworthyCmHandlesSet.contains(_)
+
+        where: 'below scenarios are applicable'
+            scenario         | trustLevel          || untrustworthyCmHandlesSetInvocationForAdd | untrustworthyCmHandlesSetInvocationForContains
+            'None trust'     | TrustLevel.NONE     || 1                                         | 0
+            'Complete trust' | TrustLevel.COMPLETE || 0                                         | 1
+    }
+
+    def 'Invalid trust'() {
+        when: 'we provide an invalid trust in the event'
+            def consumerRecord = new ConsumerRecord<String, CloudEvent>('test-device-heartbeat', 0, 0, 'cmhandle1', testCloudEvent(null))
+            consumerRecord.headers().add('ce_id', objectMapper.writeValueAsBytes('cmhandle1'))
+            objectUnderTest.heartbeatListener(consumerRecord)
+        then: 'no interaction with the untrustworthy cmhandles set'
+            0 * mockUntrustworthyCmHandlesSet.add(_)
+            0 * mockUntrustworthyCmHandlesSet.contains(_)
+            0 * mockUntrustworthyCmHandlesSet.remove(_)
+        and: 'control flow returns without any exception'
+            noExceptionThrown()
+
+    }
+
+    def 'Remove trustworthy cmhandles from untrustworthy cmhandles set'() {
+        given: 'an event with COMPLETE trustlevel'
+            def incomingEvent = testCloudEvent(TrustLevel.COMPLETE)
+        and: 'transformed as a kafka record'
+            def consumerRecord = new ConsumerRecord<String, CloudEvent>('test-device-heartbeat', 0, 0, 'cmhandle1', incomingEvent)
+            consumerRecord.headers().add('ce_id', objectMapper.writeValueAsBytes('cmhandle1'))
+        and: 'untrustworthy cmhandles set contains cmhandle1'
+            1 * mockUntrustworthyCmHandlesSet.contains(_) >> true
+        when: 'the event is consumed'
+            objectUnderTest.heartbeatListener(consumerRecord)
+        then: 'cmhandle removed from untrustworthy cmhandles set'
+            1 * mockUntrustworthyCmHandlesSet.remove(_) >> {
+                args ->
+                    {
+                        args[0].equals('cmhandle1')
+                    }
+            }
+
+    }
+
+    def testCloudEvent(trustLevel) {
+        return CloudEventBuilder.v1().withData(objectMapper.writeValueAsBytes(new DeviceTrustLevel(trustLevel)))
+            .withId("cmhandle1")
+            .withSource(URI.create('DMI'))
+            .withDataSchema(URI.create('test'))
+            .withType('org.onap.cm.events.trustlevel-notification')
+            .build()
+    }
+
+}
index 405e6e2..067191b 100644 (file)
@@ -24,6 +24,7 @@ import com.hazelcast.config.Config;
 import com.hazelcast.config.MapConfig;
 import com.hazelcast.config.NamedConfig;
 import com.hazelcast.config.QueueConfig;
+import com.hazelcast.config.SetConfig;
 import com.hazelcast.core.Hazelcast;
 import com.hazelcast.core.HazelcastInstance;
 import lombok.extern.slf4j.Slf4j;
@@ -57,6 +58,10 @@ public class HazelcastCacheConfig {
         if (namedConfig instanceof QueueConfig) {
             config.addQueueConfig((QueueConfig) namedConfig);
         }
+        if (namedConfig instanceof SetConfig) {
+            config.addSetConfig((SetConfig) namedConfig);
+        }
+
         config.setClusterName(clusterName);
         updateDiscoveryMode(config);
         return config;
@@ -76,6 +81,13 @@ public class HazelcastCacheConfig {
         return commonQueueConfig;
     }
 
+    protected static SetConfig createSetConfig(final String configName) {
+        final SetConfig commonSetConfig = new SetConfig(configName);
+        commonSetConfig.setBackupCount(1);
+        commonSetConfig.setAsyncBackupCount(1);
+        return commonSetConfig;
+    }
+
     protected void updateDiscoveryMode(final Config config) {
         if (cacheKubernetesEnabled) {
             log.info("Enabling kubernetes mode with service-name : {}", cacheKubernetesServiceName);
index 8efd485..415e9fd 100644 (file)
@@ -45,10 +45,17 @@ class HazelcastCacheConfigSpec extends Specification {
             } else {
                 assert result.config.queueConfigs.isEmpty()
             }
+        and: 'if applicable it has a set config with the expected name'
+            if (expectSetConfig) {
+                assert result.config.setConfigs.values()[0].name == 'my set config'
+            } else {
+                assert result.config.setConfigs.isEmpty()
+            }
         where: 'the following configs are used'
-            scenario       | config                                                    || expectMapConfig | expectQueueConfig
-            'Map Config'   | HazelcastCacheConfig.createMapConfig('my map config')     || true            | false
-            'Queue Config' | HazelcastCacheConfig.createQueueConfig('my queue config') || false           | true
+            scenario       | config                                                    || expectMapConfig | expectQueueConfig | expectSetConfig
+            'Map Config'   | HazelcastCacheConfig.createMapConfig('my map config')     || true            | false             | false
+            'Queue Config' | HazelcastCacheConfig.createQueueConfig('my queue config') || false           | true              | false
+            'Set Config'   | HazelcastCacheConfig.createSetConfig('my set config')     || false           | false             | true
     }
 
 }