2 * ============LICENSE_START=======================================================
3 * PNF-REGISTRATION-HANDLER
4 * ================================================================================
5 * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.dcaegen2.services.prh.integration;
23 import com.fasterxml.jackson.core.JsonProcessingException;
24 import com.fasterxml.jackson.databind.JsonMappingException;
25 import com.github.tomakehurst.wiremock.client.WireMock;
26 import com.google.gson.Gson;
27 import com.google.gson.JsonObject;
28 import com.jayway.jsonpath.JsonPath;
29 import reactor.core.publisher.Flux;
30 import org.junit.jupiter.api.BeforeEach;
31 import org.junit.jupiter.api.Test;
32 import org.onap.dcaegen2.services.prh.MainApp;
33 import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel;
34 import org.onap.dcaegen2.services.prh.adapter.kafka.ImmutableKafkaConfiguration;
35 import org.onap.dcaegen2.services.prh.adapter.kafka.KafkaConfiguration;
36 import org.onap.dcaegen2.services.prh.configuration.CbsConfigurationForAutoCommitDisabledMode;
37 import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
38 import org.onap.dcaegen2.services.prh.tasks.commit.KafkaConsumerTaskImpl;
39 import org.onap.dcaegen2.services.prh.tasks.commit.ScheduledTasksRunnerWithCommit;
40 import org.onap.dcaegen2.services.prh.tasks.commit.ScheduledTasksWithCommit;
41 import org.springframework.beans.factory.annotation.Autowired;
42 import org.springframework.beans.factory.annotation.Value;
43 import org.springframework.boot.configurationprocessor.json.JSONException;
44 import org.springframework.boot.test.context.SpringBootTest;
45 import org.springframework.boot.test.mock.mockito.MockBean;
46 import org.springframework.cloud.contract.wiremock.AutoConfigureWireMock;
47 import org.springframework.context.annotation.Bean;
48 import org.springframework.context.annotation.Configuration;
49 import org.springframework.context.annotation.Import;
50 import org.springframework.test.context.ActiveProfiles;
51 import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
52 import static com.github.stefanbirkner.systemlambda.SystemLambda.withEnvironmentVariable;
53 import static com.github.tomakehurst.wiremock.client.WireMock.urlPathMatching;
54 import static com.github.tomakehurst.wiremock.client.WireMock.ok;
55 import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
56 import static com.github.tomakehurst.wiremock.client.WireMock.get;
57 import static com.github.tomakehurst.wiremock.client.WireMock.post;
58 import static com.github.tomakehurst.wiremock.client.WireMock.anyRequestedFor;
59 import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor;
60 import static com.github.tomakehurst.wiremock.client.WireMock.matchingJsonPath;
61 import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
62 import static com.github.tomakehurst.wiremock.client.WireMock.verify;
63 import static com.github.tomakehurst.wiremock.client.WireMock.patch;
64 import java.nio.file.Files;
65 import java.nio.file.Paths;
66 import java.util.ArrayList;
67 import static java.lang.ClassLoader.getSystemResource;
68 import static org.mockito.Mockito.when;
71 * * @author <a href="mailto:PRANIT.KAPDULE@t-systems.com">Pranit Kapdule</a> on
76 @AutoConfigureWireMock(port = 0)
77 @ActiveProfiles(value = "autoCommitDisabled")
78 class PrhWorkflowIntegrationForAutoCommitDisabledTest {
81 private ScheduledTasksWithCommit scheduledTasksWithCommit;
84 private ScheduledTasksRunnerWithCommit scheduledTasksRunnerWithCommit; // just to disable scheduling - some
85 // configurability in ScheduledTaskRunner not
86 // to start tasks at app startup would be
90 private KafkaConsumerTaskImpl kafkaConsumerTaskImpl;
93 private DmaapConsumerJsonParser dmaapConsumerJsonParser;
96 @Import(MainApp.class)
97 static class CbsConfigTestConfig {
99 @Value("http://localhost:${wiremock.server.port}")
100 private String wiremockServerAddress;
102 protected KafkaConfiguration kafkaConfiguration;
105 public CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabledMode() {
107 JsonObject cbsConfigJson = new Gson()
108 .fromJson(getResourceContent("autoCommitDisabledConfigurationFromCbs2.json")
109 .replaceAll("https?://dmaap-mr[\\w.]*:\\d+", wiremockServerAddress)
110 .replaceAll("https?://aai[\\w.]*:\\d+", wiremockServerAddress), JsonObject.class);
112 CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabledMode = new CbsConfigurationForAutoCommitDisabledMode();
115 cbsConfigurationForAutoCommitDisabledMode.parseCBSConfig(cbsConfigJson);
116 } catch (Exception e) {
117 //Exception is expected as environment variable for JAAS_CONFIG is not available
118 if (e.getMessage() == "kafkaJaasConfig") {
119 kafkaConfiguration = new ImmutableKafkaConfiguration.Builder().kafkaBoostrapServerConfig("0.0.0.0")
120 .groupIdConfig("CG1").kafkaSaslMechanism("SASL_MECHANISM")
121 .kafkaSecurityProtocol("SEC-PROTOCOL").kafkaJaasConfig("JAAS_CONFIG").build();
122 cbsConfigurationForAutoCommitDisabledMode.setKafkaConfiguration(kafkaConfiguration);
127 return cbsConfigurationForAutoCommitDisabledMode;
133 void resetWireMock() {
138 void beforeCbsConfigurationForAutoCommitDisabledMode() throws Exception {
139 withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
140 this.whenThereAreNoEventsInDmaap_WorkflowShouldFinish();
144 void whenThereAreNoEventsInDmaap_WorkflowShouldFinish() throws JSONException {
146 when(kafkaConsumerTaskImpl.execute()).thenReturn(Flux.empty());
148 scheduledTasksWithCommit.scheduleKafkaPrhEventTask();
150 verify(0, anyRequestedFor(urlPathMatching("/aai.*")));
151 verify(0, postRequestedFor(urlPathMatching("/events.*")));
155 void beforeWhenThereIsAnEventsInDmaap_ShouldSendPnfReadyNotification() throws Exception {
156 withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
157 this.whenThereIsAnEventsInDmaap_ShouldSendPnfReadyNotification();
161 void whenThereIsAnEventsInDmaap_ShouldSendPnfReadyNotification()
162 throws JSONException, JsonMappingException, JsonProcessingException {
164 String event = getResourceContent("integration/event.json");
165 String pnfName = JsonPath.read(event, "$.event.commonEventHeader.sourceName");
167 java.util.List<String> eventList = new ArrayList<>();
168 eventList.add(event);
170 Flux<ConsumerDmaapModel> fluxList = dmaapConsumerJsonParser
171 .getConsumerDmaapModelFromKafkaConsumerRecord(eventList);
173 stubFor(get(urlEqualTo("/aai/v23/network/pnfs/pnf/" + pnfName)).willReturn(ok().withBody("{}")));
174 stubFor(patch(urlEqualTo("/aai/v23/network/pnfs/pnf/" + pnfName)));
175 stubFor(post(urlEqualTo("/events/unauthenticated.PNF_READY")));
177 when(kafkaConsumerTaskImpl.execute()).thenReturn(fluxList);
179 scheduledTasksWithCommit.scheduleKafkaPrhEventTask();
181 verify(1, postRequestedFor(urlEqualTo("/events/unauthenticated.PNF_READY"))
182 .withRequestBody(matchingJsonPath("$[0].correlationId", equalTo(pnfName))));
186 private static String getResourceContent(String resourceName) {
188 return new String(Files.readAllBytes(Paths.get(getSystemResource(resourceName).toURI())));
189 } catch (Exception e) {
190 throw new RuntimeException("failed loading content of '" + resourceName + "'", e);