72ca190ff1c005b2d202e729e5a6fd4786bfe2e9
[cps.git] /
1 /*
2  *  ============LICENSE_START=======================================================
3  *  Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
4  *  ================================================================================
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *        http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  *  SPDX-License-Identifier: Apache-2.0
18  *  ============LICENSE_END=========================================================
19  */
20
21 package org.onap.cps.ncmp.impl.inventory.trustlevel
22
23 import com.hazelcast.core.Hazelcast
24 import com.hazelcast.map.IMap
25 import org.onap.cps.ncmp.api.inventory.models.DmiPluginRegistration
26 import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle
27 import org.onap.cps.ncmp.api.inventory.models.TrustLevel
28 import org.onap.cps.ncmp.impl.inventory.InventoryPersistence
29 import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
30 import org.onap.cps.ncmp.utils.events.InventoryEventProducer
31 import spock.lang.Specification
32
33 class TrustLevelManagerSpec extends Specification {
34
35     TrustLevelManager objectUnderTest
36
37     def hazelcastInstance
38     IMap<String, TrustLevel> trustLevelPerCmHandleId
39     IMap<String, TrustLevel>  trustLevelPerDmiPlugin
40
41     def mockInventoryPersistence = Mock(InventoryPersistence)
42     def mockInventoryEventProducer = Mock(InventoryEventProducer)
43
44     def setup() {
45         hazelcastInstance = Hazelcast.newHazelcastInstance()
46         trustLevelPerCmHandleId = hazelcastInstance.getMap("trustLevelPerCmHandle")
47         trustLevelPerDmiPlugin = hazelcastInstance.getMap("trustLevelPerCmHandle")
48         objectUnderTest = new TrustLevelManager(trustLevelPerCmHandleId, trustLevelPerDmiPlugin, mockInventoryPersistence, mockInventoryEventProducer)
49     }
50
51     def cleanup() {
52         hazelcastInstance.shutdown()
53     }
54
55     def 'Initial dmi registration'() {
56         given: 'a dmi plugin'
57             def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: dmiPlugin, dmiDataPlugin: dmiDataPlugin)
58         when: 'method to register to the cache is called'
59             objectUnderTest.registerDmiPlugin(dmiPluginRegistration)
60         then: 'dmi plugin in the cache and trusted'
61             assert trustLevelPerDmiPlugin.get(expectedDmiPlugin) == TrustLevel.COMPLETE
62         where: 'the following parameters are used'
63             dmiPlugin | dmiDataPlugin || expectedDmiPlugin
64             'dmi-1'   | ''            || 'dmi-1'
65             ''        | 'dmi-2'       || 'dmi-2'
66     }
67
68     def 'Initial cm handle registration'() {
69         given: 'two cm handles: one with no trust level and one trusted'
70             def cmHandleModelsToBeCreated = ['ch-1': null, 'ch-2': TrustLevel.COMPLETE]
71         when: 'method to register to the cache is called'
72             objectUnderTest.registerCmHandles(cmHandleModelsToBeCreated)
73         then: 'no notification sent'
74             0 * mockInventoryEventProducer.publishAvcEvent(*_)
75         and: 'both cm handles are in the cache and are trusted'
76             assert trustLevelPerCmHandleId.get('ch-1') == TrustLevel.COMPLETE
77             assert trustLevelPerCmHandleId.get('ch-2') == TrustLevel.COMPLETE
78     }
79
80     def 'Initial cm handle registration with a cm handle that is not trusted'() {
81         given: 'a not trusted cm handle'
82             def cmHandleModelsToBeCreated = ['ch-2': TrustLevel.NONE]
83         when: 'method to register to the cache is called'
84             objectUnderTest.registerCmHandles(cmHandleModelsToBeCreated)
85         then: 'notification is sent'
86             1 * mockInventoryEventProducer.publishAvcEvent(*_)
87     }
88
89     def 'Dmi trust level updated'() {
90         given: 'a trusted dmi plugin'
91             trustLevelPerDmiPlugin.put('my-dmi', TrustLevel.COMPLETE)
92         and: 'a trusted cm handle'
93             trustLevelPerCmHandleId.put('ch-1', TrustLevel.COMPLETE)
94         when: 'the update is handled'
95             objectUnderTest.updateDmi('my-dmi', ['ch-1'], TrustLevel.NONE)
96         then: 'notification is sent'
97             1 * mockInventoryEventProducer.publishAvcEvent('ch-1', 'trustLevel', 'COMPLETE', 'NONE')
98         and: 'the dmi in the cache is not trusted'
99             assert trustLevelPerDmiPlugin.get('my-dmi') == TrustLevel.NONE
100     }
101
102     def 'Dmi trust level updated with same value'() {
103         given: 'a trusted dmi plugin'
104             trustLevelPerDmiPlugin.put('my-dmi', TrustLevel.COMPLETE)
105         and: 'a trusted cm handle'
106             trustLevelPerCmHandleId.put('ch-1', TrustLevel.COMPLETE)
107         when: 'the update is handled'
108             objectUnderTest.updateDmi('my-dmi', ['ch-1'], TrustLevel.COMPLETE)
109         then: 'no notification is sent'
110             0 * mockInventoryEventProducer.publishAvcEvent(*_)
111         and: 'the dmi in the cache is trusted'
112             assert trustLevelPerDmiPlugin.get('my-dmi') == TrustLevel.COMPLETE
113     }
114
115     def 'CmHandle trust level updated'() {
116         given: 'a non trusted cm handle'
117             trustLevelPerCmHandleId.put('ch-1', TrustLevel.NONE)
118         and: 'a trusted dmi plugin'
119             trustLevelPerDmiPlugin.put('my-dmi', TrustLevel.COMPLETE)
120         and: 'inventory persistence service returns yang model cm handle'
121             mockInventoryPersistence.getYangModelCmHandle('ch-1') >> new YangModelCmHandle(id: 'ch-1', dmiDataServiceName: 'my-dmi')
122         when: 'update of CmHandle to COMPLETE trust level handled'
123             objectUnderTest.updateCmHandleTrustLevel('ch-1', TrustLevel.COMPLETE)
124         then: 'the cm handle in the cache is trusted'
125             assert trustLevelPerCmHandleId.get('ch-1', TrustLevel.COMPLETE)
126         and: 'notification is sent'
127             1 * mockInventoryEventProducer.publishAvcEvent('ch-1', 'trustLevel', 'NONE', 'COMPLETE')
128     }
129
130     def 'CmHandle trust level updated with same value'() {
131         given: 'a non trusted cm handle'
132             trustLevelPerCmHandleId.put('ch-1', TrustLevel.NONE)
133         and: 'a trusted dmi plugin'
134             trustLevelPerDmiPlugin.put('my-dmi', TrustLevel.COMPLETE)
135         and: 'inventory persistence service returns yang model cm handle'
136             mockInventoryPersistence.getYangModelCmHandle('ch-1') >> new YangModelCmHandle(id: 'ch-1', dmiDataServiceName: 'my-dmi')
137         when: 'update of CmHandle trust to the same level (NONE)'
138             objectUnderTest.updateCmHandleTrustLevel('ch-1', TrustLevel.NONE)
139         then: 'the cm handle in the cache is not trusted'
140             assert trustLevelPerCmHandleId.get('ch-1', TrustLevel.NONE)
141         and: 'no notification is sent'
142             0 * mockInventoryEventProducer.publishAvcEvent(*_)
143     }
144
145     def 'Dmi trust level restored to complete with non trusted CmHandle'() {
146         given: 'a non trusted dmi'
147             trustLevelPerDmiPlugin.put('my-dmi', TrustLevel.NONE)
148         and: 'a non trusted CmHandle'
149             trustLevelPerCmHandleId.put('ch-1', TrustLevel.NONE)
150         when: 'restore the dmi trust level to COMPLETE'
151             objectUnderTest.updateDmi('my-dmi', ['ch-1'], TrustLevel.COMPLETE)
152         then: 'the cm handle in the cache is still NONE'
153             assert trustLevelPerCmHandleId.get('ch-1') == TrustLevel.NONE
154         and: 'no notification is sent'
155             0 * mockInventoryEventProducer.publishAvcEvent(*_)
156     }
157
158     def 'Apply effective trust level among CmHandle and dmi plugin'() {
159         given: 'a non trusted dmi'
160             trustLevelPerDmiPlugin.put('my-dmi', TrustLevel.NONE)
161         and: 'a trusted CmHandle'
162             trustLevelPerCmHandleId.put('ch-1', TrustLevel.COMPLETE)
163         and: 'a cm handle object'
164             def ncmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: 'ch-1')
165         when: 'effective trust level selected'
166             objectUnderTest.applyEffectiveTrustLevel(ncmpServiceCmHandle)
167         then: 'effective trust level is none'
168             assert ncmpServiceCmHandle.currentTrustLevel == TrustLevel.NONE
169     }
170
171     def 'Apply effective trust levels from CmHandle batch'() {
172         given: 'a trusted dmi'
173             trustLevelPerDmiPlugin.put('my-dmi', TrustLevel.COMPLETE)
174         and: 'a trusted CmHandle'
175             trustLevelPerCmHandleId.put('ch-1', TrustLevel.COMPLETE)
176         and: 'a not trusted CmHandle'
177             trustLevelPerCmHandleId.put('ch-2', TrustLevel.NONE)
178         and: 'cm handle objects'
179             def ncmpServiceCmHandle1 = new NcmpServiceCmHandle(cmHandleId: 'ch-1', dmiServiceName: 'my-dmi')
180             def ncmpServiceCmHandle2 = new NcmpServiceCmHandle(cmHandleId: 'ch-2', dmiDataServiceName: 'my-dmi')
181         when: 'effective trust level selected'
182             objectUnderTest.applyEffectiveTrustLevels([ncmpServiceCmHandle1, ncmpServiceCmHandle2])
183         then: 'effective trust levels are correctly applied'
184             assert ncmpServiceCmHandle1.currentTrustLevel == TrustLevel.COMPLETE
185             assert ncmpServiceCmHandle2.currentTrustLevel == TrustLevel.NONE
186     }
187
188     def 'Apply effective trust level  when the trust level caches are empty (restart case)'() {
189         given: 'a cm-handle that is not in the cache'
190             def ncmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: 'ch-1')
191         when: 'effective trust level is applied'
192             objectUnderTest.applyEffectiveTrustLevel(ncmpServiceCmHandle)
193         then:
194             assert ncmpServiceCmHandle.currentTrustLevel == TrustLevel.NONE
195     }
196
197     def 'Removing cm Handle ids from trust level cache.'() {
198         given: 'a cm handle id with trustlevel in the cache'
199             trustLevelPerCmHandleId.put('ch-1', TrustLevel.COMPLETE)
200         when: 'remove existing and non-existing cm handle ids'
201             objectUnderTest.removeCmHandles(['non-existing-id','ch-1'])
202         then: 'both cm handle ids are not included in the cache'
203             assert !trustLevelPerCmHandleId.contains('non-existing-id')
204             assert !trustLevelPerCmHandleId.contains('ch-1')
205         and: 'removing the non-existing id did not cause an exception'
206             noExceptionThrown()
207     }
208
209 }