2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.cps.ncmp.impl.inventory.trustlevel
23 import com.hazelcast.core.Hazelcast
24 import com.hazelcast.map.IMap
25 import org.onap.cps.ncmp.api.inventory.models.DmiPluginRegistration
26 import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle
27 import org.onap.cps.ncmp.api.inventory.models.TrustLevel
28 import org.onap.cps.ncmp.impl.inventory.InventoryPersistence
29 import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
30 import org.onap.cps.ncmp.utils.events.InventoryEventProducer
31 import spock.lang.Specification
33 class TrustLevelManagerSpec extends Specification {
35 TrustLevelManager objectUnderTest
38 IMap<String, TrustLevel> trustLevelPerCmHandleId
39 IMap<String, TrustLevel> trustLevelPerDmiPlugin
41 def mockInventoryPersistence = Mock(InventoryPersistence)
42 def mockInventoryEventProducer = Mock(InventoryEventProducer)
45 hazelcastInstance = Hazelcast.newHazelcastInstance()
46 trustLevelPerCmHandleId = hazelcastInstance.getMap("trustLevelPerCmHandle")
47 trustLevelPerDmiPlugin = hazelcastInstance.getMap("trustLevelPerCmHandle")
48 objectUnderTest = new TrustLevelManager(trustLevelPerCmHandleId, trustLevelPerDmiPlugin, mockInventoryPersistence, mockInventoryEventProducer)
52 hazelcastInstance.shutdown()
55 def 'Initial dmi registration'() {
57 def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: dmiPlugin, dmiDataPlugin: dmiDataPlugin)
58 when: 'method to register to the cache is called'
59 objectUnderTest.registerDmiPlugin(dmiPluginRegistration)
60 then: 'dmi plugin in the cache and trusted'
61 assert trustLevelPerDmiPlugin.get(expectedDmiPlugin) == TrustLevel.COMPLETE
62 where: 'the following parameters are used'
63 dmiPlugin | dmiDataPlugin || expectedDmiPlugin
64 'dmi-1' | '' || 'dmi-1'
65 '' | 'dmi-2' || 'dmi-2'
68 def 'Initial cm handle registration'() {
69 given: 'two cm handles: one with no trust level and one trusted'
70 def cmHandleModelsToBeCreated = ['ch-1': null, 'ch-2': TrustLevel.COMPLETE]
71 when: 'method to register to the cache is called'
72 objectUnderTest.registerCmHandles(cmHandleModelsToBeCreated)
73 then: 'no notification sent'
74 0 * mockInventoryEventProducer.publishAvcEvent(*_)
75 and: 'both cm handles are in the cache and are trusted'
76 assert trustLevelPerCmHandleId.get('ch-1') == TrustLevel.COMPLETE
77 assert trustLevelPerCmHandleId.get('ch-2') == TrustLevel.COMPLETE
80 def 'Initial cm handle registration with a cm handle that is not trusted'() {
81 given: 'a not trusted cm handle'
82 def cmHandleModelsToBeCreated = ['ch-2': TrustLevel.NONE]
83 when: 'method to register to the cache is called'
84 objectUnderTest.registerCmHandles(cmHandleModelsToBeCreated)
85 then: 'notification is sent'
86 1 * mockInventoryEventProducer.publishAvcEvent(*_)
89 def 'Dmi trust level updated'() {
90 given: 'a trusted dmi plugin'
91 trustLevelPerDmiPlugin.put('my-dmi', TrustLevel.COMPLETE)
92 and: 'a trusted cm handle'
93 trustLevelPerCmHandleId.put('ch-1', TrustLevel.COMPLETE)
94 when: 'the update is handled'
95 objectUnderTest.updateDmi('my-dmi', ['ch-1'], TrustLevel.NONE)
96 then: 'notification is sent'
97 1 * mockInventoryEventProducer.publishAvcEvent('ch-1', 'trustLevel', 'COMPLETE', 'NONE')
98 and: 'the dmi in the cache is not trusted'
99 assert trustLevelPerDmiPlugin.get('my-dmi') == TrustLevel.NONE
102 def 'Dmi trust level updated with same value'() {
103 given: 'a trusted dmi plugin'
104 trustLevelPerDmiPlugin.put('my-dmi', TrustLevel.COMPLETE)
105 and: 'a trusted cm handle'
106 trustLevelPerCmHandleId.put('ch-1', TrustLevel.COMPLETE)
107 when: 'the update is handled'
108 objectUnderTest.updateDmi('my-dmi', ['ch-1'], TrustLevel.COMPLETE)
109 then: 'no notification is sent'
110 0 * mockInventoryEventProducer.publishAvcEvent(*_)
111 and: 'the dmi in the cache is trusted'
112 assert trustLevelPerDmiPlugin.get('my-dmi') == TrustLevel.COMPLETE
115 def 'CmHandle trust level updated'() {
116 given: 'a non trusted cm handle'
117 trustLevelPerCmHandleId.put('ch-1', TrustLevel.NONE)
118 and: 'a trusted dmi plugin'
119 trustLevelPerDmiPlugin.put('my-dmi', TrustLevel.COMPLETE)
120 and: 'inventory persistence service returns yang model cm handle'
121 mockInventoryPersistence.getYangModelCmHandle('ch-1') >> new YangModelCmHandle(id: 'ch-1', dmiDataServiceName: 'my-dmi')
122 when: 'update of CmHandle to COMPLETE trust level handled'
123 objectUnderTest.updateCmHandleTrustLevel('ch-1', TrustLevel.COMPLETE)
124 then: 'the cm handle in the cache is trusted'
125 assert trustLevelPerCmHandleId.get('ch-1', TrustLevel.COMPLETE)
126 and: 'notification is sent'
127 1 * mockInventoryEventProducer.publishAvcEvent('ch-1', 'trustLevel', 'NONE', 'COMPLETE')
130 def 'CmHandle trust level updated with same value'() {
131 given: 'a non trusted cm handle'
132 trustLevelPerCmHandleId.put('ch-1', TrustLevel.NONE)
133 and: 'a trusted dmi plugin'
134 trustLevelPerDmiPlugin.put('my-dmi', TrustLevel.COMPLETE)
135 and: 'inventory persistence service returns yang model cm handle'
136 mockInventoryPersistence.getYangModelCmHandle('ch-1') >> new YangModelCmHandle(id: 'ch-1', dmiDataServiceName: 'my-dmi')
137 when: 'update of CmHandle trust to the same level (NONE)'
138 objectUnderTest.updateCmHandleTrustLevel('ch-1', TrustLevel.NONE)
139 then: 'the cm handle in the cache is not trusted'
140 assert trustLevelPerCmHandleId.get('ch-1', TrustLevel.NONE)
141 and: 'no notification is sent'
142 0 * mockInventoryEventProducer.publishAvcEvent(*_)
145 def 'Dmi trust level restored to complete with non trusted CmHandle'() {
146 given: 'a non trusted dmi'
147 trustLevelPerDmiPlugin.put('my-dmi', TrustLevel.NONE)
148 and: 'a non trusted CmHandle'
149 trustLevelPerCmHandleId.put('ch-1', TrustLevel.NONE)
150 when: 'restore the dmi trust level to COMPLETE'
151 objectUnderTest.updateDmi('my-dmi', ['ch-1'], TrustLevel.COMPLETE)
152 then: 'the cm handle in the cache is still NONE'
153 assert trustLevelPerCmHandleId.get('ch-1') == TrustLevel.NONE
154 and: 'no notification is sent'
155 0 * mockInventoryEventProducer.publishAvcEvent(*_)
158 def 'Apply effective trust level among CmHandle and dmi plugin'() {
159 given: 'a non trusted dmi'
160 trustLevelPerDmiPlugin.put('my-dmi', TrustLevel.NONE)
161 and: 'a trusted CmHandle'
162 trustLevelPerCmHandleId.put('ch-1', TrustLevel.COMPLETE)
163 and: 'a cm handle object'
164 def ncmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: 'ch-1')
165 when: 'effective trust level selected'
166 objectUnderTest.applyEffectiveTrustLevel(ncmpServiceCmHandle)
167 then: 'effective trust level is none'
168 assert ncmpServiceCmHandle.currentTrustLevel == TrustLevel.NONE
171 def 'Apply effective trust levels from CmHandle batch'() {
172 given: 'a trusted dmi'
173 trustLevelPerDmiPlugin.put('my-dmi', TrustLevel.COMPLETE)
174 and: 'a trusted CmHandle'
175 trustLevelPerCmHandleId.put('ch-1', TrustLevel.COMPLETE)
176 and: 'a not trusted CmHandle'
177 trustLevelPerCmHandleId.put('ch-2', TrustLevel.NONE)
178 and: 'cm handle objects'
179 def ncmpServiceCmHandle1 = new NcmpServiceCmHandle(cmHandleId: 'ch-1', dmiServiceName: 'my-dmi')
180 def ncmpServiceCmHandle2 = new NcmpServiceCmHandle(cmHandleId: 'ch-2', dmiDataServiceName: 'my-dmi')
181 when: 'effective trust level selected'
182 objectUnderTest.applyEffectiveTrustLevels([ncmpServiceCmHandle1, ncmpServiceCmHandle2])
183 then: 'effective trust levels are correctly applied'
184 assert ncmpServiceCmHandle1.currentTrustLevel == TrustLevel.COMPLETE
185 assert ncmpServiceCmHandle2.currentTrustLevel == TrustLevel.NONE
188 def 'Apply effective trust level when the trust level caches are empty (restart case)'() {
189 given: 'a cm-handle that is not in the cache'
190 def ncmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: 'ch-1')
191 when: 'effective trust level is applied'
192 objectUnderTest.applyEffectiveTrustLevel(ncmpServiceCmHandle)
194 assert ncmpServiceCmHandle.currentTrustLevel == TrustLevel.NONE
197 def 'Removing cm Handle ids from trust level cache.'() {
198 given: 'a cm handle id with trustlevel in the cache'
199 trustLevelPerCmHandleId.put('ch-1', TrustLevel.COMPLETE)
200 when: 'remove existing and non-existing cm handle ids'
201 objectUnderTest.removeCmHandles(['non-existing-id','ch-1'])
202 then: 'both cm handle ids are not included in the cache'
203 assert !trustLevelPerCmHandleId.contains('non-existing-id')
204 assert !trustLevelPerCmHandleId.contains('ch-1')
205 and: 'removing the non-existing id did not cause an exception'