2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2023-2024 Nordix Foundation
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.cps.ncmp.impl.inventory.trustlevel
23 import com.hazelcast.core.Hazelcast
24 import com.hazelcast.map.IMap
25 import org.onap.cps.ncmp.api.inventory.models.DmiPluginRegistration
26 import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle
27 import org.onap.cps.ncmp.api.inventory.models.TrustLevel
28 import org.onap.cps.ncmp.impl.inventory.InventoryPersistence
29 import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
30 import org.onap.cps.ncmp.utils.events.CmAvcEventPublisher
31 import spock.lang.Specification
33 class TrustLevelManagerSpec extends Specification {
35 TrustLevelManager objectUnderTest
38 IMap<String, TrustLevel> trustLevelPerCmHandleId
39 IMap<String, TrustLevel> trustLevelPerDmiPlugin
41 def mockInventoryPersistence = Mock(InventoryPersistence)
42 def mockAttributeValueChangeEventPublisher = Mock(CmAvcEventPublisher)
45 hazelcastInstance = Hazelcast.newHazelcastInstance()
46 trustLevelPerCmHandleId = hazelcastInstance.getMap("trustLevelPerCmHandle")
47 trustLevelPerDmiPlugin = hazelcastInstance.getMap("trustLevelPerCmHandle")
48 objectUnderTest = new TrustLevelManager(trustLevelPerCmHandleId, trustLevelPerDmiPlugin, mockInventoryPersistence, mockAttributeValueChangeEventPublisher)
52 hazelcastInstance.shutdown()
55 def 'Initial dmi registration'() {
57 def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: dmiPlugin, dmiDataPlugin: dmiDataPlugin)
58 when: 'method to register to the cache is called'
59 objectUnderTest.registerDmiPlugin(dmiPluginRegistration)
60 then: 'dmi plugin in the cache and trusted'
61 assert trustLevelPerDmiPlugin.get(expectedDmiPlugin) == TrustLevel.COMPLETE
62 where: 'the following parameters are used'
63 dmiPlugin | dmiDataPlugin || expectedDmiPlugin
64 'dmi-1' | '' || 'dmi-1'
65 '' | 'dmi-2' || 'dmi-2'
68 def 'Initial cm handle registration'() {
69 given: 'two cm handles: one with no trust level and one trusted'
70 def cmHandleModelsToBeCreated = ['ch-1': null, 'ch-2': TrustLevel.COMPLETE]
71 when: 'method to register to the cache is called'
72 objectUnderTest.registerCmHandles(cmHandleModelsToBeCreated)
73 then: 'no notification sent'
74 0 * mockAttributeValueChangeEventPublisher.publishAvcEvent(*_)
75 and: 'both cm handles are in the cache and are trusted'
76 assert trustLevelPerCmHandleId.get('ch-1') == TrustLevel.COMPLETE
77 assert trustLevelPerCmHandleId.get('ch-2') == TrustLevel.COMPLETE
80 def 'Initial cm handle registration where a cm handle is already in the cache'() {
81 given: 'a trusted cm handle'
82 def cmHandleModelsToBeCreated = ['ch-1': TrustLevel.NONE]
83 and: 'the cm handle id already in the cache'
84 trustLevelPerCmHandleId.put('ch-1', TrustLevel.COMPLETE)
85 when: 'method to register to the cache is called'
86 objectUnderTest.registerCmHandles(cmHandleModelsToBeCreated)
87 then: 'no notification sent'
88 0 * mockAttributeValueChangeEventPublisher.publishAvcEvent(*_)
89 and: 'cm handle cache is not updated'
90 assert trustLevelPerCmHandleId.get('ch-1') == TrustLevel.COMPLETE
93 def 'Initial cm handle registration with a cm handle that is not trusted'() {
94 given: 'a not trusted cm handle'
95 def cmHandleModelsToBeCreated = ['ch-2': TrustLevel.NONE]
96 when: 'method to register to the cache is called'
97 objectUnderTest.registerCmHandles(cmHandleModelsToBeCreated)
98 then: 'notification is sent'
99 1 * mockAttributeValueChangeEventPublisher.publishAvcEvent(*_)
102 def 'Dmi trust level updated'() {
103 given: 'a trusted dmi plugin'
104 trustLevelPerDmiPlugin.put('my-dmi', TrustLevel.COMPLETE)
105 and: 'a trusted cm handle'
106 trustLevelPerCmHandleId.put('ch-1', TrustLevel.COMPLETE)
107 when: 'the update is handled'
108 objectUnderTest.updateDmi('my-dmi', ['ch-1'], TrustLevel.NONE)
109 then: 'notification is sent'
110 1 * mockAttributeValueChangeEventPublisher.publishAvcEvent('ch-1', 'trustLevel', 'COMPLETE', 'NONE')
111 and: 'the dmi in the cache is not trusted'
112 assert trustLevelPerDmiPlugin.get('my-dmi') == TrustLevel.NONE
115 def 'Dmi trust level updated with same value'() {
116 given: 'a trusted dmi plugin'
117 trustLevelPerDmiPlugin.put('my-dmi', TrustLevel.COMPLETE)
118 and: 'a trusted cm handle'
119 trustLevelPerCmHandleId.put('ch-1', TrustLevel.COMPLETE)
120 when: 'the update is handled'
121 objectUnderTest.updateDmi('my-dmi', ['ch-1'], TrustLevel.COMPLETE)
122 then: 'no notification is sent'
123 0 * mockAttributeValueChangeEventPublisher.publishAvcEvent(*_)
124 and: 'the dmi in the cache is trusted'
125 assert trustLevelPerDmiPlugin.get('my-dmi') == TrustLevel.COMPLETE
128 def 'CmHandle trust level updated'() {
129 given: 'a non trusted cm handle'
130 trustLevelPerCmHandleId.put('ch-1', TrustLevel.NONE)
131 and: 'a trusted dmi plugin'
132 trustLevelPerDmiPlugin.put('my-dmi', TrustLevel.COMPLETE)
133 and: 'inventory persistence service returns yang model cm handle'
134 mockInventoryPersistence.getYangModelCmHandle('ch-1') >> new YangModelCmHandle(id: 'ch-1', dmiDataServiceName: 'my-dmi')
135 when: 'update of CmHandle to COMPLETE trust level handled'
136 objectUnderTest.updateCmHandleTrustLevel('ch-1', TrustLevel.COMPLETE)
137 then: 'the cm handle in the cache is trusted'
138 assert trustLevelPerCmHandleId.get('ch-1', TrustLevel.COMPLETE)
139 and: 'notification is sent'
140 1 * mockAttributeValueChangeEventPublisher.publishAvcEvent('ch-1', 'trustLevel', 'NONE', 'COMPLETE')
143 def 'CmHandle trust level updated with same value'() {
144 given: 'a non trusted cm handle'
145 trustLevelPerCmHandleId.put('ch-1', TrustLevel.NONE)
146 and: 'a trusted dmi plugin'
147 trustLevelPerDmiPlugin.put('my-dmi', TrustLevel.COMPLETE)
148 and: 'inventory persistence service returns yang model cm handle'
149 mockInventoryPersistence.getYangModelCmHandle('ch-1') >> new YangModelCmHandle(id: 'ch-1', dmiDataServiceName: 'my-dmi')
150 when: 'update of CmHandle trust to the same level (NONE)'
151 objectUnderTest.updateCmHandleTrustLevel('ch-1', TrustLevel.NONE)
152 then: 'the cm handle in the cache is not trusted'
153 assert trustLevelPerCmHandleId.get('ch-1', TrustLevel.NONE)
154 and: 'no notification is sent'
155 0 * mockAttributeValueChangeEventPublisher.publishAvcEvent(*_)
158 def 'Dmi trust level restored to complete with non trusted CmHandle'() {
159 given: 'a non trusted dmi'
160 trustLevelPerDmiPlugin.put('my-dmi', TrustLevel.NONE)
161 and: 'a non trusted CmHandle'
162 trustLevelPerCmHandleId.put('ch-1', TrustLevel.NONE)
163 when: 'restore the dmi trust level to COMPLETE'
164 objectUnderTest.updateDmi('my-dmi', ['ch-1'], TrustLevel.COMPLETE)
165 then: 'the cm handle in the cache is still NONE'
166 assert trustLevelPerCmHandleId.get('ch-1') == TrustLevel.NONE
167 and: 'no notification is sent'
168 0 * mockAttributeValueChangeEventPublisher.publishAvcEvent(*_)
171 def 'Apply effective trust level among CmHandle and dmi plugin'() {
172 given: 'a non trusted dmi'
173 trustLevelPerDmiPlugin.put('my-dmi', TrustLevel.NONE)
174 and: 'a trusted CmHandle'
175 trustLevelPerCmHandleId.put('ch-1', TrustLevel.COMPLETE)
176 and: 'a cm handle object'
177 def ncmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: 'ch-1')
178 when: 'effective trust level selected'
179 objectUnderTest.applyEffectiveTrustLevel(ncmpServiceCmHandle)
180 then: 'effective trust level is none'
181 assert ncmpServiceCmHandle.currentTrustLevel == TrustLevel.NONE
184 def 'Apply effective trust levels from CmHandle batch'() {
185 given: 'a trusted dmi'
186 trustLevelPerDmiPlugin.put('my-dmi', TrustLevel.COMPLETE)
187 and: 'a trusted CmHandle'
188 trustLevelPerCmHandleId.put('ch-1', TrustLevel.COMPLETE)
189 and: 'a not trusted CmHandle'
190 trustLevelPerCmHandleId.put('ch-2', TrustLevel.NONE)
191 and: 'cm handle objects'
192 def ncmpServiceCmHandle1 = new NcmpServiceCmHandle(cmHandleId: 'ch-1', dmiServiceName: 'my-dmi')
193 def ncmpServiceCmHandle2 = new NcmpServiceCmHandle(cmHandleId: 'ch-2', dmiDataServiceName: 'my-dmi')
194 when: 'effective trust level selected'
195 objectUnderTest.applyEffectiveTrustLevels([ncmpServiceCmHandle1, ncmpServiceCmHandle2])
196 then: 'effective trust levels are correctly applied'
197 assert ncmpServiceCmHandle1.currentTrustLevel == TrustLevel.COMPLETE
198 assert ncmpServiceCmHandle2.currentTrustLevel == TrustLevel.NONE
201 def 'Apply effective trust level when the trust level caches are empty (restart case)'() {
202 given: 'a cm-handle that is not in the cache'
203 def ncmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: 'ch-1')
204 when: 'effective trust level is applied'
205 objectUnderTest.applyEffectiveTrustLevel(ncmpServiceCmHandle)
207 assert ncmpServiceCmHandle.currentTrustLevel == TrustLevel.NONE
210 def 'Removing cm Handle ids from trust level cache.'() {
211 given: 'a cm handle id with trustlevel in the cache'
212 trustLevelPerCmHandleId.put('ch-1', TrustLevel.COMPLETE)
213 when: 'remove existing and non-existing cm handle ids'
214 objectUnderTest.removeCmHandles(['non-existing-id','ch-1'])
215 then: 'both cm handle ids are not included in the cache'
216 assert !trustLevelPerCmHandleId.contains('non-existing-id')
217 assert !trustLevelPerCmHandleId.contains('ch-1')
218 and: 'removing the non-existing id did not cause an exception'