2 * ============LICENSE_START=======================================================
3 * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
4 * ================================================================================
5 * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.bbs.event.processor.utilities;
23 import static org.onap.bbs.event.processor.utilities.CommonEventFields.COMMON_FORMAT;
24 import static org.onap.bbs.event.processor.utilities.CommonEventFields.CORRELATION_ID;
25 import static org.onap.bbs.event.processor.utilities.ReRegistrationEventFields.ADDITIONAL_FIELDS;
26 import static org.onap.bbs.event.processor.utilities.ReRegistrationEventFields.ATTACHMENT_POINT;
27 import static org.onap.bbs.event.processor.utilities.ReRegistrationEventFields.CVLAN;
28 import static org.onap.bbs.event.processor.utilities.ReRegistrationEventFields.REMOTE_ID;
29 import static org.onap.bbs.event.processor.utilities.ReRegistrationEventFields.SVLAN;
31 import com.google.gson.JsonElement;
32 import com.google.gson.JsonObject;
33 import com.google.gson.JsonParser;
34 import com.google.gson.JsonSyntaxException;
36 import java.util.Optional;
37 import java.util.stream.StreamSupport;
39 import org.onap.bbs.event.processor.exceptions.DmaapException;
40 import org.onap.bbs.event.processor.model.ImmutableReRegistrationConsumerDmaapModel;
41 import org.onap.bbs.event.processor.model.ReRegistrationConsumerDmaapModel;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44 import org.springframework.util.StringUtils;
46 import reactor.core.publisher.Flux;
47 import reactor.core.publisher.Mono;
49 public class ReRegistrationDmaapConsumerJsonParser {
51 private static final Logger LOGGER = LoggerFactory.getLogger(ReRegistrationDmaapConsumerJsonParser.class);
53 private static final String RE_REGISTRATION_DUMPING_TEMPLATE = "%n{"
54 + "\"" + CORRELATION_ID + COMMON_FORMAT + ","
55 + "\"" + ATTACHMENT_POINT + COMMON_FORMAT + ","
56 + "\"" + REMOTE_ID + COMMON_FORMAT + ","
57 + "\"" + CVLAN + COMMON_FORMAT + ","
58 + "\"" + SVLAN + COMMON_FORMAT
61 private String pnfCorrelationId;
63 private String attachmentPoint;
64 private String remoteId;
69 * Translates a response from DMaaP to a reactive {@link ReRegistrationConsumerDmaapModel} model.
70 * @param dmaapResponse Response from DMaaP
71 * @return Re-Registration Consumer DMaaP reactive model
73 public Flux<ReRegistrationConsumerDmaapModel> extractModelFromDmaap(Mono<String> dmaapResponse) {
75 .flatMapMany(this::parseToMono)
76 .flatMap(this::createTargetFlux);
79 private Mono<JsonElement> parseToMono(String message) {
80 if (StringUtils.isEmpty(message)) {
81 LOGGER.warn("DMaaP response is empty");
84 return Mono.fromCallable(() -> new JsonParser().parse(message))
85 .doOnError(e -> e instanceof JsonSyntaxException, e -> LOGGER.error("Invalid JSON. Ignoring"))
86 .onErrorResume(e -> e instanceof JsonSyntaxException, e -> Mono.empty());
89 private Flux<ReRegistrationConsumerDmaapModel> createTargetFlux(JsonElement jsonElement) {
90 if (jsonElement.isJsonObject()) {
91 return doCreateTargetFlux(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject())));
93 return doCreateTargetFlux(
94 Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
95 .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
96 .orElseGet(JsonObject::new)))));
99 private Flux<ReRegistrationConsumerDmaapModel> doCreateTargetFlux(Flux<JsonObject> jsonObject) {
101 .flatMap(this::transform)
102 .onErrorResume(exception -> exception instanceof DmaapException, e -> Mono.empty());
105 private Mono<ReRegistrationConsumerDmaapModel> transform(JsonObject dmaapResponseJsonObject) {
107 if (!containsProperHeaders(dmaapResponseJsonObject)) {
108 LOGGER.warn("Incorrect JsonObject - missing headers");
112 JsonObject pnfReRegistrationFields =
113 dmaapResponseJsonObject.getAsJsonObject(ADDITIONAL_FIELDS);
115 pnfCorrelationId = getValueFromJson(dmaapResponseJsonObject, CORRELATION_ID);
117 attachmentPoint = getValueFromJson(pnfReRegistrationFields, ATTACHMENT_POINT);
118 remoteId = getValueFromJson(pnfReRegistrationFields, REMOTE_ID);
119 cvlan = getValueFromJson(pnfReRegistrationFields, CVLAN);
120 svlan = getValueFromJson(pnfReRegistrationFields, SVLAN);
122 if (StringUtils.isEmpty(pnfCorrelationId) || anyImportantPropertyMissing()) {
123 String incorrectEvent = dumpJsonData();
124 LOGGER.warn("Incorrect Re-Registration JSON event: {}", incorrectEvent);
128 return Mono.just(ImmutableReRegistrationConsumerDmaapModel.builder()
129 .correlationId(pnfCorrelationId)
130 .attachmentPoint(attachmentPoint)
137 private boolean anyImportantPropertyMissing() {
138 return StringUtils.isEmpty(attachmentPoint)
139 || StringUtils.isEmpty(remoteId)
140 || StringUtils.isEmpty(cvlan)
141 || StringUtils.isEmpty(svlan);
144 private boolean containsProperHeaders(JsonObject jsonObject) {
145 return jsonObject.has(ADDITIONAL_FIELDS);
148 private String dumpJsonData() {
149 return String.format(RE_REGISTRATION_DUMPING_TEMPLATE,
158 Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
159 JsonParser jsonParser = new JsonParser();
160 return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
161 : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
164 private String getValueFromJson(JsonObject jsonObject, String jsonKey) {
165 return jsonObject.has(jsonKey) ? jsonObject.get(jsonKey).getAsString() : "";