2  *  ============LICENSE_START=======================================================
 
   3  *  Copyright (C) 2023-2024 Nordix Foundation
 
   4  *  ================================================================================
 
   5  *  Licensed under the Apache License, Version 2.0 (the "License");
 
   6  *  you may not use this file except in compliance with the License.
 
   7  *  You may obtain a copy of the License at
 
   9  *        http://www.apache.org/licenses/LICENSE-2.0
 
  11  *  Unless required by applicable law or agreed to in writing, software
 
  12  *  distributed under the License is distributed on an "AS IS" BASIS,
 
  13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  14  *  See the License for the specific language governing permissions and
 
  15  *  limitations under the License.
 
  17  *  SPDX-License-Identifier: Apache-2.0
 
  18  *  ============LICENSE_END=========================================================
 
  21 package org.onap.cps.ncmp.impl.data.utils
 
  23 import com.fasterxml.jackson.databind.ObjectMapper
 
  24 import io.cloudevents.CloudEvent
 
  25 import io.cloudevents.kafka.CloudEventDeserializer
 
  26 import io.cloudevents.kafka.impl.KafkaHeaders
 
  27 import org.apache.kafka.clients.consumer.KafkaConsumer
 
  28 import org.onap.cps.events.EventsPublisher
 
  29 import org.onap.cps.ncmp.api.data.models.DataOperationRequest
 
  30 import org.onap.cps.ncmp.api.data.models.OperationType
 
  31 import org.onap.cps.ncmp.api.impl.utils.context.CpsApplicationContext
 
  32 import org.onap.cps.ncmp.api.inventory.models.CompositeStateBuilder
 
  33 import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
 
  34 import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent
 
  35 import org.onap.cps.ncmp.impl.data.models.DmiDataOperation
 
  36 import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
 
  37 import org.onap.cps.ncmp.utils.TestUtils
 
  38 import org.onap.cps.utils.JsonObjectMapper
 
  39 import org.spockframework.spring.SpringBean
 
  40 import org.springframework.test.context.ContextConfiguration
 
  41 import org.springframework.util.LinkedMultiValueMap
 
  43 import java.time.Duration
 
  45 import static org.onap.cps.ncmp.impl.inventory.models.CmHandleState.ADVISED
 
  46 import static org.onap.cps.ncmp.impl.inventory.models.CmHandleState.READY
 
  47 import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent
 
  49 @ContextConfiguration(classes = [EventsPublisher, CpsApplicationContext])
 
  50 class DmiDataOperationsHelperSpec extends MessagingBaseSpec {
 
  52     def static clientTopic = 'my-topic-name'
 
  53     def static dataOperationType = 'org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent'
 
  56     JsonObjectMapper jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
 
  59     EventsPublisher eventPublisher = new EventsPublisher<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
 
  61     def 'Process per data operation request with #serviceName.'() {
 
  62         given: 'data operation request with 3 operations'
 
  63             def dataOperationRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json')
 
  64             def dataOperationRequest = jsonObjectMapper.convertJsonString(dataOperationRequestJsonData, DataOperationRequest.class)
 
  65         and: '4 known cm handles: ch1-dmi1, ch2-dmi1, ch3-dmi2, ch4-dmi2'
 
  66             def yangModelCmHandles = getYangModelCmHandles()
 
  67         when: 'data operation request is processed'
 
  68             def operationsOutPerDmiServiceName = DmiDataOperationsHelper.processPerDefinitionInDataOperationsRequest(clientTopic,'request-id', dataOperationRequest, yangModelCmHandles)
 
  69         and: 'converted to a json node'
 
  70             def dmiDataOperationRequestBody = jsonObjectMapper.asJsonString(operationsOutPerDmiServiceName.get(serviceName))
 
  71             def dmiDataOperationRequestBodyAsJsonNode = jsonObjectMapper.convertToJsonNode(dmiDataOperationRequestBody).get(operationIndex)
 
  72         then: 'it contains the correct operation details'
 
  73             assert dmiDataOperationRequestBodyAsJsonNode.get('operation').asText() == 'read'
 
  74             assert dmiDataOperationRequestBodyAsJsonNode.get('operationId').asText() == expectedOperationId
 
  75             assert dmiDataOperationRequestBodyAsJsonNode.get('datastore').asText() == expectedDatastore
 
  76         and: 'the correct cm handles (just for #serviceName)'
 
  77             assert dmiDataOperationRequestBodyAsJsonNode.get('cmHandles').size() == expectedCmHandleIds.size()
 
  78             expectedCmHandleIds.each {
 
  79                 dmiDataOperationRequestBodyAsJsonNode.get('cmHandles').toString().contains(it)
 
  81         where: 'the following dmi service and operations are checked'
 
  82             serviceName | operationIndex || expectedOperationId | expectedDatastore                        | expectedCmHandleIds
 
  83             'dmi1'      | 0              || 'operational-14'    | 'ncmp-datastore:passthrough-operational' | ['ch6-dmi1']
 
  84             'dmi1'      | 1              || 'running-12'        | 'ncmp-datastore:passthrough-running'     | ['ch1-dmi1', 'ch2-dmi1']
 
  85             'dmi1'      | 2              || 'operational-15'    | 'ncmp-datastore:passthrough-operational' | ['ch6-dmi1']
 
  86             'dmi2'      | 0              || 'operational-14'    | 'ncmp-datastore:passthrough-operational' | ['ch3-dmi2']
 
  87             'dmi2'      | 1              || 'running-12'        | 'ncmp-datastore:passthrough-running'     | ['ch7-dmi2']
 
  88             'dmi2'      | 2              || 'operational-15'    | 'ncmp-datastore:passthrough-operational' | ['ch4-dmi2']
 
  91     def 'Process one data operation request with #serviceName and Module Set Tag set.'() {
 
  92         given: 'data operation request'
 
  93             def dataOperationRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json')
 
  94             def dataOperationRequest = jsonObjectMapper.convertJsonString(dataOperationRequestJsonData, DataOperationRequest.class)
 
  95         and: '1 known cm handles: ch1-dmi1'
 
  96             def yangModelCmHandles = getYangModelCmHandlesForOneCmHandle()
 
  97         when: 'data operation request is processed'
 
  98             def operationsOutPerDmiServiceName = DmiDataOperationsHelper.processPerDefinitionInDataOperationsRequest(clientTopic,'request-id', dataOperationRequest, yangModelCmHandles)
 
  99         and: 'converted to a json node'
 
 100             def dmiDataOperationRequestBody = operationsOutPerDmiServiceName['dmi1']
 
 101             def cmHandlesInRequestBody = dmiDataOperationRequestBody[0].cmHandles
 
 102         then: 'it contains the correct operation details'
 
 103             assert cmHandlesInRequestBody.size() == 1
 
 104             assert cmHandlesInRequestBody[0].id == 'ch1-dmi1'
 
 105             assert cmHandlesInRequestBody[0].moduleSetTag == 'module-set-tag1'
 
 108     def 'Process per data operation request with non-ready, non-existing cm handle and publish event to client specified topic'() {
 
 109         given: 'consumer subscribing to client topic'
 
 110             def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test-1', CloudEventDeserializer))
 
 111             cloudEventKafkaConsumer.subscribe([clientTopic])
 
 112         and: 'data operation request having non-ready and non-existing cm handle ids'
 
 113             def dataOperationRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json')
 
 114             def dataOperationRequest = jsonObjectMapper.convertJsonString(dataOperationRequestJsonData, DataOperationRequest.class)
 
 115         when: 'data operation request is processed'
 
 116             DmiDataOperationsHelper.processPerDefinitionInDataOperationsRequest(clientTopic, 'request-id', dataOperationRequest, yangModelCmHandles)
 
 117         and: 'subscribed client specified topic is polled and first record is selected'
 
 118             def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)).last()
 
 119         then: 'verify cloud compliant headers'
 
 120             def consumerRecordOutHeaders = consumerRecordOut.headers()
 
 121             assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_id') != null
 
 122             assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_type') == dataOperationType
 
 123         and: 'verify that extension is included into header'
 
 124             assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_correlationid') == 'request-id'
 
 125             assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_destination') == clientTopic
 
 126         and: 'map consumer record to expected event type'
 
 127             def dataOperationResponseEvent =
 
 128                 toTargetEvent(consumerRecordOut.value(), DataOperationEvent.class)
 
 129         and: 'data operation response event response size is 3'
 
 130             dataOperationResponseEvent.data.responses.size() == 3
 
 131         and: 'verify published data operation response as json string'
 
 132             def dataOperationResponseEventJson = TestUtils.getResourceFileContent('dataOperationResponseEvent.json')
 
 133             jsonObjectMapper.asJsonString(dataOperationResponseEvent.data.responses) == dataOperationResponseEventJson
 
 136     static def getYangModelCmHandles() {
 
 137         def dmiProperties = [new YangModelCmHandle.Property('prop', 'some DMI property')]
 
 138         def readyState = new CompositeStateBuilder().withCmHandleState(READY).withLastUpdatedTimeNow().build()
 
 139         def advisedState = new CompositeStateBuilder().withCmHandleState(ADVISED).withLastUpdatedTimeNow().build()
 
 140         return [new YangModelCmHandle(id: 'ch1-dmi1', dmiServiceName: 'dmi1', dmiProperties: dmiProperties, compositeState: readyState),
 
 141                 new YangModelCmHandle(id: 'ch2-dmi1', dmiServiceName: 'dmi1', dmiProperties: dmiProperties, compositeState: readyState),
 
 142                 new YangModelCmHandle(id: 'ch6-dmi1', dmiServiceName: 'dmi1', dmiProperties: dmiProperties, compositeState: readyState),
 
 143                 new YangModelCmHandle(id: 'ch8-dmi1', dmiServiceName: 'dmi1', dmiProperties: dmiProperties, compositeState: readyState),
 
 144                 new YangModelCmHandle(id: 'ch3-dmi2', dmiServiceName: 'dmi2', dmiProperties: dmiProperties, compositeState: readyState),
 
 145                 new YangModelCmHandle(id: 'ch4-dmi2', dmiServiceName: 'dmi2', dmiProperties: dmiProperties, compositeState: readyState),
 
 146                 new YangModelCmHandle(id: 'ch7-dmi2', dmiServiceName: 'dmi2', dmiProperties: dmiProperties, compositeState: readyState),
 
 147                 new YangModelCmHandle(id: 'non-ready-cm-handle', dmiServiceName: 'dmi2', dmiProperties: dmiProperties, compositeState: advisedState)
 
 151     static def getYangModelCmHandlesForOneCmHandle() {
 
 152         def dmiProperties = [new YangModelCmHandle.Property('prop', 'some DMI property')]
 
 153         def readyState = new CompositeStateBuilder().withCmHandleState(READY).withLastUpdatedTimeNow().build()
 
 154         return [new YangModelCmHandle(id: 'ch1-dmi1', dmiServiceName: 'dmi1', moduleSetTag: 'module-set-tag1', dmiProperties: dmiProperties, compositeState: readyState)]
 
 157     def mockAndPopulateErrorMap(errorReportedToClientTopic) {
 
 158         def dmiDataOperation = DmiDataOperation.builder().operation(OperationType.fromOperationName('read'))
 
 159                 .operationId('some-op-id').datastore('ncmp-datastore:passthrough-operational')
 
 160                 .options('some-option').resourceIdentifier('some-resource-identifier').build()
 
 161         def cmHandleIdsPerResponseCodesPerOperation = new LinkedMultiValueMap<>()
 
 162         cmHandleIdsPerResponseCodesPerOperation.add(dmiDataOperation, Map.of(errorReportedToClientTopic, ['some-cm-handle-id']))
 
 163         return cmHandleIdsPerResponseCodesPerOperation