Add Native Kafka streams support in bp-generator
[dcaegen2/platform.git] / mod / bpgenerator / onap / src / test / java / org / onap / blueprintgenerator / service / common / kafka / KafkaStreamTest.java
1 /*
2  *
3  *  * ============LICENSE_START=======================================================
4  *  *  org.onap.dcae
5  *  *  ================================================================================
6  *  *  Copyright (c) 2021 Nokia Intellectual Property. All rights reserved.
7  *  *  ================================================================================
8  *  *  Licensed under the Apache License, Version 2.0 (the "License");
9  *  *  you may not use this file except in compliance with the License.
10  *  *  You may obtain a copy of the License at
11  *  *
12  *  *       http://www.apache.org/licenses/LICENSE-2.0
13  *  *
14  *  *  Unless required by applicable law or agreed to in writing, software
15  *  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  *  See the License for the specific language governing permissions and
18  *  *  limitations under the License.
19  *  *  ============LICENSE_END=========================================================
20  *
21  *
22  */
23
24 package org.onap.blueprintgenerator.service.common.kafka;
25
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertNotNull;
28
29 import com.fasterxml.jackson.databind.JsonNode;
30 import com.fasterxml.jackson.databind.ObjectMapper;
31 import java.io.IOException;
32 import org.junit.Before;
33 import org.junit.Test;
34
35 public class KafkaStreamTest {
36
37     private final static String TEST_TOPIC_NAME = "test_topic";
38     private static final String GET_INPUT_KAFKA_USERNAME = "{\"get_input\":\"kafka_username\"}";
39     private static final String GET_INPUT_KAFKA_PASSWORD = "{\"get_input\":\"kafka_password\"}";
40     private static final String AAF_USERNAME = "username";
41     private static final String AAF_PASSWORD = "password";
42     private static final String AAF_CREDENTIAL_NODE = "aaf_credentials";
43     private static final String KAFKA_TYPE_NODE = "type";
44     private static final String EXPECTED_KAFKA_TYPE = "\"kafka\"";
45     private static final String KAFKA_BOOTSTRAP_SERVERS = "bootstrap_servers";
46     private static final String KAFKA_TOPIC_NAME = "topic_name";
47     private static final String EXPECTED_GET_INPUT_TOPIC = "{\"get_input\":\"" + TEST_TOPIC_NAME + "\"}";
48     private static final String EXPECTED_GET_INPUT_BOOTSTRAP_SERVERS = "{\"get_input\":\"kafka_bootstrap_servers\"}";
49     private static final String KAFKA_INFO_NODE = "kafka_info";
50
51     private KafkaStream kafkaStream;
52     private ObjectMapper mapper = new ObjectMapper();
53
54     @Before
55     public void setUp() {
56         kafkaStream = new KafkaStream(TEST_TOPIC_NAME);
57     }
58
59     @Test
60     public void kafkaStreamHasCorrectAafCredential() throws IOException {
61
62         String kafkaStreamJson = mapper.writeValueAsString(kafkaStream);
63
64         JsonNode kafkaStreamNode = mapper.readTree(kafkaStreamJson);
65         JsonNode aafCredential = kafkaStreamNode.get(AAF_CREDENTIAL_NODE);
66
67         assertNotNull(aafCredential);
68         assertEquals(GET_INPUT_KAFKA_USERNAME, aafCredential.get(AAF_USERNAME).toString());
69         assertEquals(GET_INPUT_KAFKA_PASSWORD, aafCredential.get(AAF_PASSWORD).toString());
70     }
71
72     @Test
73     public void kafkaStreamHasCorrectKafkaInfo() throws IOException {
74
75         String kafkaStreamJson = mapper.writeValueAsString(kafkaStream);
76
77         JsonNode kafkaStreamNode = mapper.readTree(kafkaStreamJson);
78         JsonNode kafkaInfo = kafkaStreamNode.get(KAFKA_INFO_NODE);
79
80         assertNotNull(kafkaInfo);
81         assertEquals(EXPECTED_GET_INPUT_BOOTSTRAP_SERVERS, kafkaInfo.get(KAFKA_BOOTSTRAP_SERVERS).toString());
82         assertEquals(EXPECTED_GET_INPUT_TOPIC, kafkaInfo.get(KAFKA_TOPIC_NAME).toString());
83
84     }
85
86     @Test
87     public void kafkaStreamHasCorrectType() throws IOException {
88
89         String kafkaStreamJson = mapper.writeValueAsString(kafkaStream);
90
91         JsonNode kafkaStreamNode = mapper.readTree(kafkaStreamJson);
92         JsonNode kafkaType = kafkaStreamNode.get(KAFKA_TYPE_NODE);
93
94         assertNotNull(kafkaType);
95         assertEquals(EXPECTED_KAFKA_TYPE, kafkaType.toString());
96     }
97
98 }