b548120348eae7f95212e90da030508ca392da49
[dcaegen2/services/sdk.git] /
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.in;
25 import static org.junit.jupiter.api.Assertions.*;
26
27 import com.google.gson.JsonObject;
28 import com.google.gson.JsonParser;
29 import io.vavr.Function1;
30 import io.vavr.control.Either;
31 import java.io.IOException;
32 import java.io.InputStreamReader;
33 import org.junit.jupiter.api.Test;
34 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
35 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
36 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
37 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
38
39 /**
40  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
41  * @since March 2019
42  */
43 class KafkaSinkParserTest {
44
45     private final StreamFromGsonParser<KafkaSink> cut = StreamFromGsonParsers.kafkaSinkParser();
46
47     @Test
48     void precondition_assureInstanceOf() {
49         assertThat(cut).isInstanceOf(KafkaSinkParser.class);
50     }
51
52     @Test
53     void shouldParseMinimalKafkaSinkDefinition() throws IOException {
54         // given
55         JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink_minimal.json");
56
57         // when
58         final KafkaSink result = cut.unsafeParse(input);
59
60         // then
61         assertThat(result.aafCredentials()).isNull();
62         assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060");
63         assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP");
64         assertThat(result.clientId()).isNull();
65         assertThat(result.clientRole()).isNull();
66     }
67
68     @Test
69     void shouldParseBasicKafkaSinkDefinition() throws IOException {
70         // given
71         JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink.json");
72
73         // when
74         final KafkaSink result = cut.unsafeParse(input);
75
76         // then
77         assertThat(result.aafCredentials()).isNull();
78         assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060");
79         assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP");
80         assertThat(result.clientId()).isEqualTo("1500462518108");
81         assertThat(result.clientRole()).isEqualTo("com.dcae.member");
82     }
83
84     @Test
85     void shouldReturnErrorWhenStructureIsWrong() throws IOException {
86         // given
87         JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink_missing_child.json");
88
89         // when
90         final Either<StreamParserError, KafkaSink> result = cut.parse(input);
91
92         // then
93         assertThat(result.isRight()).describedAs("should not be right").isFalse();
94         result.peekLeft(error -> {
95             assertThat(error.message()).contains("kafka_info");
96         });
97     }
98
99     @Test
100     void shouldReturnErrorWhenTypeIsWrong() throws IOException {
101         // given
102         JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink_invalid_type.json");
103
104         // when
105         final Either<StreamParserError, KafkaSink> result = cut.parse(input);
106
107         // then
108         assertThat(result.isRight()).describedAs("should not be right").isFalse();
109         result.peekLeft(error -> {
110             assertThat(error.message()).containsIgnoringCase("invalid stream type");
111             assertThat(error.message()).containsIgnoringCase("kafka");
112             assertThat(error.message()).containsIgnoringCase("message_router");
113         });
114     }
115 }