5974639c71e1ebc72695a4c2c424dca166f80aff
[dcaegen2/services/sdk.git] /
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24
25 import com.google.gson.JsonObject;
26 import io.vavr.control.Either;
27 import java.io.IOException;
28 import org.junit.jupiter.api.Test;
29 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
30 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
31 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
32 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
33 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSinkParser;
34 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
35 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
36 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
37
38 /**
39  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
40  * @since March 2019
41  */
42 class KafkaSinkParserTest {
43
44     private final StreamFromGsonParser<KafkaSink> cut = StreamFromGsonParsers.kafkaSinkParser();
45
46     @Test
47     void precondition_assureInstanceOf() {
48         assertThat(cut).isInstanceOf(KafkaSinkParser.class);
49     }
50
51     @Test
52     void shouldParseMinimalKafkaSinkDefinition() throws IOException {
53         // given
54         RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_sink_minimal.json");
55
56         // when
57         final KafkaSink result = cut.unsafeParse(input);
58
59         // then
60         assertThat(result.aafCredentials()).isNull();
61         assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060");
62         assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP");
63         assertThat(result.clientId()).isNull();
64         assertThat(result.clientRole()).isNull();
65     }
66
67     @Test
68     void shouldParseFullKafkaSinkDefinition() throws IOException {
69         // given
70         RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_sink.json");
71
72         // when
73         final KafkaSink result = cut.unsafeParse(input);
74
75         // then
76         final ImmutableAafCredentials expectedCredentials = ImmutableAafCredentials.builder()
77                 .username("the user")
78                 .password("the passwd")
79                 .build();
80         assertThat(result.aafCredentials()).isEqualTo(expectedCredentials);
81         assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060");
82         assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP");
83         assertThat(result.clientId()).isEqualTo("1500462518108");
84         assertThat(result.clientRole()).isEqualTo("com.dcae.member");
85     }
86
87     @Test
88     void shouldReturnErrorWhenStructureIsWrong() throws IOException {
89         // given
90         RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_sink_missing_child.json");
91
92         // when
93         final Either<StreamParserError, KafkaSink> result = cut.parse(input);
94
95         // then
96         assertThat(result.isRight()).describedAs("should not be right").isFalse();
97         result.peekLeft(error -> {
98             assertThat(error.message()).contains("kafka_info");
99         });
100     }
101
102     @Test
103     void shouldReturnErrorWhenTypeIsWrong() throws IOException {
104         // given
105         RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_invalid_type.json");
106
107         // when
108         final Either<StreamParserError, KafkaSink> result = cut.parse(input);
109
110         // then
111         assertThat(result.isRight()).describedAs("should not be right").isFalse();
112         result.peekLeft(error -> {
113             assertThat(error.message()).containsIgnoringCase("invalid stream type");
114             assertThat(error.message()).containsIgnoringCase("kafka");
115             assertThat(error.message()).containsIgnoringCase("message_router");
116         });
117     }
118
119     @Test
120     void shouldReturnErrorWhenDirectionIsWrong() throws IOException {
121         // given
122         RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/kafka_sink.json");
123
124         // when
125         final Either<StreamParserError, KafkaSink> result = cut.parse(input);
126
127         // then
128         assertThat(result.isRight()).describedAs("should not be right").isFalse();
129         result.peekLeft(error -> {
130             assertThat(error.message()).containsIgnoringCase("invalid stream direction");
131         });
132     }
133 }