f33ff43e0acbc71c77f765228afdbf4675ce95f4
[dcaegen2/services/prh.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * PNF-REGISTRATION-HANDLER
4  * ================================================================================
5  * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.dcaegen2.services.prh.tasks.commit;
22
23 import org.apache.kafka.clients.consumer.ConsumerRecord;
24 import org.apache.kafka.common.header.Headers;
25 import org.apache.kafka.common.header.internals.RecordHeaders;
26 import org.apache.kafka.common.record.TimestampType;
27 import org.junit.jupiter.api.BeforeEach;
28 import org.junit.jupiter.api.Test;
29 import org.junit.jupiter.api.extension.ExtendWith;
30 import org.mockito.Mock;
31 import org.mockito.junit.jupiter.MockitoExtension;
32 import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel;
33 import org.onap.dcaegen2.services.prh.configuration.CbsConfigurationForAutoCommitDisabledMode;
34 import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
35 import org.springframework.kafka.support.Acknowledgment;
36 import com.google.gson.Gson;
37 import com.google.gson.JsonObject;
38 import com.google.gson.JsonSyntaxException;
39 import reactor.core.publisher.Flux;
40 import java.io.IOException;
41 import java.net.URISyntaxException;
42 import java.nio.file.Files;
43 import java.nio.file.Paths;
44 import java.util.ArrayList;
45 import java.util.List;
46 import static com.github.stefanbirkner.systemlambda.SystemLambda.withEnvironmentVariable;
47 import static java.lang.ClassLoader.getSystemResource;
48 import static org.assertj.core.api.Assertions.assertThat;
49 import static org.junit.jupiter.api.Assertions.assertEquals;
50
51 @ExtendWith(MockitoExtension.class)
52 public class KafkaConsumerTaskImplTest {
53
54     @Mock
55     private Acknowledgment acknowledgment;
56
57     private KafkaConsumerTaskImpl kafkaConsumerTask;
58
59     private DmaapConsumerJsonParser dmaapConsumerJsonParser;
60
61     private EpochDateTimeConversion epochDateTimeConversion;
62
63     private CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabled;
64
65     private JsonObject cbsConfigJsonForAutoCommitDisabled;
66
67     @BeforeEach
68     void beforeEach() throws JsonSyntaxException, IOException, URISyntaxException {
69         cbsConfigJsonForAutoCommitDisabled = new Gson().fromJson(
70                 new String(Files.readAllBytes(
71                         Paths.get(getSystemResource("autoCommitDisabledConfigurationFromCbs2.json").toURI()))),
72                 JsonObject.class);
73         cbsConfigurationForAutoCommitDisabled = new CbsConfigurationForAutoCommitDisabledMode();
74         dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
75         epochDateTimeConversion = new EpochDateTimeConversion();
76
77     }
78
79     @Test
80     void beforeOnMessageTest() throws Exception {
81         withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
82             cbsConfigurationForAutoCommitDisabled.parseCBSConfig(cbsConfigJsonForAutoCommitDisabled);
83             kafkaConsumerTask = new KafkaConsumerTaskImpl(cbsConfigurationForAutoCommitDisabled,
84                     dmaapConsumerJsonParser, epochDateTimeConversion);
85             List<ConsumerRecord<String, String>> list = new ArrayList<>();
86             TimestampType timestampType = null;
87             Headers headers = new RecordHeaders();
88             epochDateTimeConversion.setDaysForRecords("3");
89             ConsumerRecord<String, String> records = new ConsumerRecord<>("test-topic", 1, 1l, 0l, timestampType, 1, 1,
90                     "test-key", "test-value", headers, null);
91             list.add(records);
92             kafkaConsumerTask.onMessage(list, acknowledgment);
93             String actualTopicInList = list.get(0).topic();
94             String expectedTopicInList = "test-topic";
95             assertEquals(expectedTopicInList, actualTopicInList, "topic is not matching");
96             assertThat(kafkaConsumerTask.getOffset().equals(acknowledgment));
97             assertThat(kafkaConsumerTask.getJsonEvent().contains("test-topic"));
98         });
99     }
100
101     @Test
102     void beforeCommitOffsetTest() throws Exception {
103         withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
104             cbsConfigurationForAutoCommitDisabled.parseCBSConfig(cbsConfigJsonForAutoCommitDisabled);
105             kafkaConsumerTask = new KafkaConsumerTaskImpl(cbsConfigurationForAutoCommitDisabled,
106                     dmaapConsumerJsonParser, epochDateTimeConversion);
107             kafkaConsumerTask.commitOffset();
108         });
109     }
110
111     @Test
112     void beforeExecuteTest() throws Exception {
113         withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
114             cbsConfigurationForAutoCommitDisabled.parseCBSConfig(cbsConfigJsonForAutoCommitDisabled);
115             kafkaConsumerTask = new KafkaConsumerTaskImpl(cbsConfigurationForAutoCommitDisabled,
116                     dmaapConsumerJsonParser, epochDateTimeConversion);
117             String event = getResourceContent("integration/event.json");
118             java.util.List<String> eventList = new ArrayList<>();
119             eventList.add(event);
120             kafkaConsumerTask.setJsonEvent(eventList);
121             Flux<ConsumerDmaapModel> flux = kafkaConsumerTask.execute();
122             String expectedSourceName = "NOK6061ZW8";
123             String actualSourceName = flux.blockFirst().getCorrelationId();
124
125             String expectedOamV4IpAddress = "val3";
126             String actualOamV4IpAddress = flux.blockFirst().getIpv4();
127
128             String expectedOamV6IpAddress = "val4";
129             String actualOamV6IpAddress = flux.blockFirst().getIpv6();
130
131             assertEquals(expectedSourceName, actualSourceName, "SourceName is not matching");
132             assertEquals(expectedOamV4IpAddress, actualOamV4IpAddress, "OamV4IpAddress is not matching");
133             assertEquals(expectedOamV6IpAddress, actualOamV6IpAddress, "OamV6IpAddress is not matching");
134         });
135     }
136
137     private static String getResourceContent(String resourceName) {
138         try {
139             return new String(Files.readAllBytes(Paths.get(getSystemResource(resourceName).toURI())));
140         } catch (Exception e) {
141             throw new RuntimeException("failed loading content of '" + resourceName + "'", e);
142         }
143     }
144
145 }