import com.google.gson.Gson;
import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import io.vavr.collection.List;
import java.nio.charset.StandardCharsets;
import org.jetbrains.annotations.NotNull;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
final ImmutableMessageRouterSubscribeResponse.Builder builder =
ImmutableMessageRouterSubscribeResponse.builder();
return httpResponse.successful()
- ? builder.items(httpResponse.bodyAsJson(StandardCharsets.UTF_8, gson, JsonArray.class)).build()
+ ? builder.items(getAsJsonElements(httpResponse)).build()
: builder.failReason(extractFailReason(httpResponse)).build();
}
+ private List<JsonElement> getAsJsonElements(HttpResponse httpResponse){
+ JsonParser parser = new JsonParser();
+
+ JsonArray bodyAsJsonArray = httpResponse
+ .bodyAsJson(StandardCharsets.UTF_8, gson, JsonArray.class);
+
+ return List.ofAll(bodyAsJsonArray).map(arrayElement -> parser.parse(arrayElement.getAsString()));
+ }
private String buildSubscribeUrl(MessageRouterSubscribeRequest request) {
return String.format("%s/%s/%s", request.sourceDefinition().topicUrl(), request.consumerGroup(),
package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
import com.google.gson.Gson;
-import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
+import io.vavr.collection.List;
import java.io.File;
import java.net.URL;
import java.time.Duration;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
@Testcontainers
class MessageRouterSubscriberCIT {
- private static final Gson gson = new Gson();
private static final JsonParser parser = new JsonParser();
private static final Duration TIMEOUT = Duration.ofSeconds(10);
private static final int DMAAP_SERVICE_EXPOSED_PORT = 3904;
final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
- final List<String> singleJsonMessage = Arrays.asList("{\"message\":\"message1\"}");
+ final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
+ final List<JsonElement> expectedItems = singleJsonMessage.map(parser::parse);
final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
- final JsonArray expectedItems = getAsJsonArray(singleJsonMessage);
final ImmutableMessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
.builder()
.items(expectedItems)
final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
- final List<String> twoJsonMessages = Arrays.asList("{\"message\":\"message1\"}",
+ final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
"{\"differentMessage\":\"message2\"}");
+ final List<JsonElement> expectedElements = twoJsonMessages.map(parser::parse);
final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
- final JsonArray expectedItems = getAsJsonArray(twoJsonMessages);
final ImmutableMessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
.builder()
- .items(expectedItems)
+ .items(expectedElements)
.build();
//when
final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
- final List<String> twoJsonMessages = Arrays.asList("{\"message\":\"message1\"}",
+ final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
"{\"differentMessage\":\"message2\"}");
final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
//when
registerTopic(publishRequest, subscribeRequest);
- final Flux<String> result = publisher.put(publishRequest, jsonMessageBatch)
- .thenMany(subscriber.getElements(subscribeRequest).map(JsonElement::getAsString));
+ final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
+ .thenMany(subscriber.getElements(subscribeRequest));
//then
StepVerifier.create(result)
- .expectNext(twoJsonMessages.get(0))
- .expectNext(twoJsonMessages.get(1))
+ .expectNext(getAsJsonObject(twoJsonMessages.get(0)))
+ .expectNext(getAsJsonObject(twoJsonMessages.get(1)))
.expectComplete()
.verify(TIMEOUT);
}
final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
- final List<String> twoJsonMessages = Arrays.asList("{\"message\":\"message1\"}",
+ final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
"{\"differentMessage\":\"message2\"}");
+ final List<JsonElement> messages = twoJsonMessages.map(parser::parse);
final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
//when
registerTopic(publishRequest, subscribeRequest);
- final Flux<String> result = publisher.put(publishRequest, jsonMessageBatch)
- .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1))
- .map(JsonElement::getAsString));
+ final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
+ .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1)));
//then
StepVerifier.create(result.take(2))
- .expectNext(twoJsonMessages.get(0))
- .expectNext(twoJsonMessages.get(1))
+ .expectNext(messages.get(0))
+ .expectNext(messages.get(1))
.expectComplete()
.verify(TIMEOUT);
}
private void registerTopic(MessageRouterPublishRequest publishRequest,
MessageRouterSubscribeRequest subscribeRequest) {
- final List<String> sampleJsonMessages = Arrays.asList("{\"message\":\"message1\"}",
+ final List<String> sampleJsonMessages = List.of("{\"message\":\"message1\"}",
"{\"differentMessage\":\"message2\"}");
- final Flux<JsonObject> jsonMessageBatch = Flux.fromIterable(sampleJsonMessages)
- .map(parser::parse).map(JsonElement::getAsJsonObject);
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(sampleJsonMessages);
publisher.put(publishRequest, jsonMessageBatch).blockLast();
subscriber.get(subscribeRequest).block();
}
- private JsonArray getAsJsonArray(List<String> list) {
- String listsJsonString = gson.toJson(list);
- return new JsonParser().parse(listsJsonString).getAsJsonArray();
- }
-
private static Flux<JsonObject> jsonBatch(List<String> messages){
return Flux.fromIterable(messages).map(parser::parse).map(JsonElement::getAsJsonObject);
}
+
+ private JsonObject getAsJsonObject(String item){
+ return new Gson().fromJson(item, JsonObject.class);
+ }
}