b10c1ad8fdf708f1de1493cdf8ddaf19405ae664
[dcaegen2/services/prh.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * PNF-REGISTRATION-HANDLER
4  * ================================================================================
5  * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.dcaegen2.services.prh.integration;
22
23 import com.fasterxml.jackson.core.JsonProcessingException;
24 import com.fasterxml.jackson.databind.JsonMappingException;
25 import com.github.tomakehurst.wiremock.client.WireMock;
26 import com.google.gson.Gson;
27 import com.google.gson.JsonObject;
28 import com.jayway.jsonpath.JsonPath;
29 import reactor.core.publisher.Flux;
30 import org.junit.jupiter.api.BeforeEach;
31 import org.junit.jupiter.api.Test;
32 import org.onap.dcaegen2.services.prh.MainApp;
33 import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel;
34 import org.onap.dcaegen2.services.prh.adapter.kafka.ImmutableKafkaConfiguration;
35 import org.onap.dcaegen2.services.prh.adapter.kafka.KafkaConfiguration;
36 import org.onap.dcaegen2.services.prh.configuration.CbsConfigurationForAutoCommitDisabledMode;
37 import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
38 import org.onap.dcaegen2.services.prh.tasks.commit.KafkaConsumerTaskImpl;
39 import org.onap.dcaegen2.services.prh.tasks.commit.ScheduledTasksRunnerWithCommit;
40 import org.onap.dcaegen2.services.prh.tasks.commit.ScheduledTasksWithCommit;
41 import org.springframework.beans.factory.annotation.Autowired;
42 import org.springframework.beans.factory.annotation.Value;
43 import org.springframework.boot.configurationprocessor.json.JSONException;
44 import org.springframework.boot.test.context.SpringBootTest;
45 import org.springframework.boot.test.mock.mockito.MockBean;
46 import org.springframework.cloud.contract.wiremock.AutoConfigureWireMock;
47 import org.springframework.context.annotation.Bean;
48 import org.springframework.context.annotation.Configuration;
49 import org.springframework.context.annotation.Import;
50 import org.springframework.test.context.ActiveProfiles;
51 import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
52 import static com.github.stefanbirkner.systemlambda.SystemLambda.withEnvironmentVariable;
53 import static com.github.tomakehurst.wiremock.client.WireMock.urlPathMatching;
54 import static com.github.tomakehurst.wiremock.client.WireMock.ok;
55 import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
56 import static com.github.tomakehurst.wiremock.client.WireMock.get;
57 import static com.github.tomakehurst.wiremock.client.WireMock.post;
58 import static com.github.tomakehurst.wiremock.client.WireMock.anyRequestedFor;
59 import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor;
60 import static com.github.tomakehurst.wiremock.client.WireMock.matchingJsonPath;
61 import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
62 import static com.github.tomakehurst.wiremock.client.WireMock.verify;
63 import static com.github.tomakehurst.wiremock.client.WireMock.patch;
64 import java.nio.file.Files;
65 import java.nio.file.Paths;
66 import java.util.ArrayList;
67 import static java.lang.ClassLoader.getSystemResource;
68 import static org.mockito.Mockito.when;
69
70 /**
71  *  * @author <a href="mailto:PRANIT.KAPDULE@t-systems.com">Pranit Kapdule</a> on
72  *   *        24/08/23
73  *    */
74
75 @SpringBootTest
76 @AutoConfigureWireMock(port = 0)
77 @ActiveProfiles(value = "autoCommitDisabled")
78 class PrhWorkflowIntegrationForAutoCommitDisabledTest {
79
80     @Autowired
81     private ScheduledTasksWithCommit scheduledTasksWithCommit;
82
83     @MockBean
84     private ScheduledTasksRunnerWithCommit scheduledTasksRunnerWithCommit; // just to disable scheduling - some
85                                                                            // configurability in ScheduledTaskRunner not
86                                                                            // to start tasks at app startup would be
87                                                                            // welcome
88
89     @MockBean
90     private KafkaConsumerTaskImpl kafkaConsumerTaskImpl;
91
92     @Autowired
93     private DmaapConsumerJsonParser dmaapConsumerJsonParser;
94
95     @Configuration
96     @Import(MainApp.class)
97     static class CbsConfigTestConfig {
98
99         @Value("http://localhost:${wiremock.server.port}")
100         private String wiremockServerAddress;
101
102         protected KafkaConfiguration kafkaConfiguration;
103
104         @Bean
105         public CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabledMode() {
106
107             JsonObject cbsConfigJson = new Gson()
108                     .fromJson(getResourceContent("autoCommitDisabledConfigurationFromCbs2.json")
109                             .replaceAll("https?://dmaap-mr[\\w.]*:\\d+", wiremockServerAddress)
110                             .replaceAll("https?://aai[\\w.]*:\\d+", wiremockServerAddress), JsonObject.class);
111
112             CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabledMode = new CbsConfigurationForAutoCommitDisabledMode();
113
114             try {
115                 cbsConfigurationForAutoCommitDisabledMode.parseCBSConfig(cbsConfigJson);
116             } catch (Exception e) {
117                //Exception is expected as environment variable for JAAS_CONFIG is not available
118                 if (e.getMessage() == "kafkaJaasConfig") {
119                     kafkaConfiguration = new ImmutableKafkaConfiguration.Builder().kafkaBoostrapServerConfig("0.0.0.0")
120                             .groupIdConfig("CG1").kafkaSaslMechanism("SASL_MECHANISM")
121                             .kafkaSecurityProtocol("SEC-PROTOCOL").kafkaJaasConfig("JAAS_CONFIG").build();
122                     cbsConfigurationForAutoCommitDisabledMode.setKafkaConfiguration(kafkaConfiguration);
123
124                 }
125
126             }
127             return cbsConfigurationForAutoCommitDisabledMode;
128         };
129
130     }
131
132     @BeforeEach
133     void resetWireMock() {
134         WireMock.reset();
135     }
136
137     @Test
138     void beforeCbsConfigurationForAutoCommitDisabledMode() throws Exception {
139         withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
140             this.whenThereAreNoEventsInDmaap_WorkflowShouldFinish();
141         });
142     }
143
144     void whenThereAreNoEventsInDmaap_WorkflowShouldFinish() throws JSONException {
145
146         when(kafkaConsumerTaskImpl.execute()).thenReturn(Flux.empty());
147        
148         scheduledTasksWithCommit.scheduleKafkaPrhEventTask();
149
150         verify(0, anyRequestedFor(urlPathMatching("/aai.*")));
151         verify(0, postRequestedFor(urlPathMatching("/events.*")));
152     }
153
154     @Test
155     void beforeWhenThereIsAnEventsInDmaap_ShouldSendPnfReadyNotification() throws Exception {
156         withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
157             this.whenThereIsAnEventsInDmaap_ShouldSendPnfReadyNotification();
158         });
159     }
160
161     void whenThereIsAnEventsInDmaap_ShouldSendPnfReadyNotification()
162             throws JSONException, JsonMappingException, JsonProcessingException {
163
164         String event = getResourceContent("integration/event.json");
165         String pnfName = JsonPath.read(event, "$.event.commonEventHeader.sourceName");
166
167         java.util.List<String> eventList = new ArrayList<>();
168         eventList.add(event);
169
170         Flux<ConsumerDmaapModel> fluxList = dmaapConsumerJsonParser
171                 .getConsumerDmaapModelFromKafkaConsumerRecord(eventList);
172
173         stubFor(get(urlEqualTo("/aai/v23/network/pnfs/pnf/" + pnfName)).willReturn(ok().withBody("{}")));
174         stubFor(patch(urlEqualTo("/aai/v23/network/pnfs/pnf/" + pnfName)));
175         stubFor(post(urlEqualTo("/events/unauthenticated.PNF_READY")));
176
177         when(kafkaConsumerTaskImpl.execute()).thenReturn(fluxList);
178
179         scheduledTasksWithCommit.scheduleKafkaPrhEventTask();
180
181         verify(1, postRequestedFor(urlEqualTo("/events/unauthenticated.PNF_READY"))
182                 .withRequestBody(matchingJsonPath("$[0].correlationId", equalTo(pnfName))));
183
184     }
185
186     private static String getResourceContent(String resourceName) {
187         try {
188             return new String(Files.readAllBytes(Paths.get(getSystemResource(resourceName).toURI())));
189         } catch (Exception e) {
190             throw new RuntimeException("failed loading content of '" + resourceName + "'", e);
191         }
192     }
193 }