d255d99abd72593df8308fd54b00693f1f753fc3
[dcaegen2/services/sdk.git] /
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24
25 import com.google.gson.JsonObject;
26 import io.vavr.collection.List;
27 import io.vavr.control.Either;
28 import java.io.IOException;
29 import org.junit.jupiter.api.Test;
30 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
31 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
32 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
33 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
34 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSourceParser;
35 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
36 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
37 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
38
39 /**
40  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
41  * @since March 2019
42  */
43 class KafkaSourceParserTest {
44
45     private final StreamFromGsonParser<KafkaSource> cut = StreamFromGsonParsers.kafkaSourceParser();
46
47     @Test
48     void precondition_assureInstanceOf() {
49         assertThat(cut).isInstanceOf(KafkaSourceParser.class);
50     }
51
52     @Test
53     void shouldParseMinimalKafkaSourceDefinition() throws IOException {
54         // given
55         RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/kafka_source_minimal.json");
56
57         // when
58         final KafkaSource result = cut.unsafeParse(input);
59
60         // then
61         assertThat(result.aafCredentials()).isNull();
62         assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060");
63         assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP");
64         assertThat(result.clientId()).isNull();
65         assertThat(result.clientRole()).isNull();
66     }
67
68     @Test
69     void shouldParseFullKafkaSourceDefinition() throws IOException {
70         // given
71         RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/kafka_source.json");
72
73         // when
74         final KafkaSource result = cut.unsafeParse(input);
75
76         // then
77         final ImmutableAafCredentials expectedCredentials = ImmutableAafCredentials.builder()
78                 .username("the user")
79                 .password("the passwd")
80                 .build();
81         assertThat(result.aafCredentials()).isEqualTo(expectedCredentials);
82         assertThat(result.bootstrapServerList()).isEqualTo(List.of("dmaap-mr-kafka-0:6060", "dmaap-mr-kafka-1:6060"));
83         assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP");
84         assertThat(result.consumerGroupId()).isEqualTo("nokia-perf3gpp-processor");
85         assertThat(result.clientId()).isEqualTo("1500462518108");
86         assertThat(result.clientRole()).isEqualTo("com.dcae.member");
87     }
88
89     @Test
90     void shouldReturnErrorWhenTypeIsWrong() throws IOException {
91         // given
92         RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/kafka_invalid_type.json");
93
94         // when
95         final Either<StreamParserError, KafkaSource> result = cut.parse(input);
96
97         // then
98         assertThat(result.isRight()).describedAs("should not be right").isFalse();
99         result.peekLeft(error -> {
100             assertThat(error.message()).containsIgnoringCase("invalid stream type");
101             assertThat(error.message()).containsIgnoringCase("kafka");
102             assertThat(error.message()).containsIgnoringCase("message_router");
103         });
104     }
105
106     @Test
107     void shouldReturnErrorWhenDirectionIsWrong() throws IOException {
108         // given
109         RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_source.json");
110
111         // when
112         final Either<StreamParserError, KafkaSource> result = cut.parse(input);
113
114         // then
115         assertThat(result.isRight()).describedAs("should not be right").isFalse();
116         result.peekLeft(error -> {
117             assertThat(error.message()).containsIgnoringCase("invalid stream direction");
118         });
119     }
120 }