2  * ============LICENSE_START=======================================================
 
   3  * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
 
   4  * ================================================================================
 
   5  * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.bbs.event.processor.utilities;
 
  23 import static org.onap.bbs.event.processor.utilities.CommonEventFields.COMMON_FORMAT;
 
  24 import static org.onap.bbs.event.processor.utilities.CommonEventFields.CORRELATION_ID;
 
  25 import static org.onap.bbs.event.processor.utilities.ReRegistrationEventFields.ADDITIONAL_FIELDS;
 
  26 import static org.onap.bbs.event.processor.utilities.ReRegistrationEventFields.ATTACHMENT_POINT;
 
  27 import static org.onap.bbs.event.processor.utilities.ReRegistrationEventFields.CVLAN;
 
  28 import static org.onap.bbs.event.processor.utilities.ReRegistrationEventFields.REMOTE_ID;
 
  29 import static org.onap.bbs.event.processor.utilities.ReRegistrationEventFields.SVLAN;
 
  31 import com.google.gson.JsonElement;
 
  32 import com.google.gson.JsonObject;
 
  33 import com.google.gson.JsonParser;
 
  34 import com.google.gson.JsonSyntaxException;
 
  36 import java.util.Optional;
 
  37 import java.util.stream.StreamSupport;
 
  39 import org.onap.bbs.event.processor.exceptions.DmaapException;
 
  40 import org.onap.bbs.event.processor.model.ImmutableReRegistrationConsumerDmaapModel;
 
  41 import org.onap.bbs.event.processor.model.ReRegistrationConsumerDmaapModel;
 
  42 import org.slf4j.Logger;
 
  43 import org.slf4j.LoggerFactory;
 
  44 import org.springframework.util.StringUtils;
 
  46 import reactor.core.publisher.Flux;
 
  47 import reactor.core.publisher.Mono;
 
  49 public class ReRegistrationDmaapConsumerJsonParser {
 
  51     private static final Logger LOGGER = LoggerFactory.getLogger(ReRegistrationDmaapConsumerJsonParser.class);
 
  53     private static final String RE_REGISTRATION_DUMPING_TEMPLATE = "%n{"
 
  54             + "\"" + CORRELATION_ID + COMMON_FORMAT + ","
 
  55             + "\"" + ATTACHMENT_POINT + COMMON_FORMAT + ","
 
  56             + "\"" + REMOTE_ID + COMMON_FORMAT + ","
 
  57             + "\"" + CVLAN + COMMON_FORMAT + ","
 
  58             + "\"" + SVLAN + COMMON_FORMAT
 
  61     private String pnfCorrelationId;
 
  63     private String attachmentPoint;
 
  64     private String remoteId;
 
  69      * Translates a response from DMaaP to a reactive {@link ReRegistrationConsumerDmaapModel} model.
 
  70      * @param dmaapResponse Response from DMaaP
 
  71      * @return Re-Registration Consumer DMaaP reactive model
 
  73     public Flux<ReRegistrationConsumerDmaapModel> extractModelFromDmaap(Mono<String> dmaapResponse) {
 
  75                 .flatMapMany(this::parseToMono)
 
  76                 .flatMap(this::createTargetFlux);
 
  79     private Mono<JsonElement> parseToMono(String message) {
 
  80         if (StringUtils.isEmpty(message)) {
 
  81             LOGGER.warn("DMaaP response is empty");
 
  84         return Mono.fromCallable(() -> new JsonParser().parse(message))
 
  85                 .doOnError(e -> e instanceof JsonSyntaxException, e -> LOGGER.error("Invalid JSON. Ignoring"))
 
  86                 .onErrorResume(e -> e instanceof JsonSyntaxException, e -> Mono.empty());
 
  89     private Flux<ReRegistrationConsumerDmaapModel> createTargetFlux(JsonElement jsonElement) {
 
  90         if (jsonElement.isJsonObject()) {
 
  91             return doCreateTargetFlux(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject())));
 
  93         return doCreateTargetFlux(
 
  94                 Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
 
  95                         .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
 
  96                                 .orElseGet(JsonObject::new)))));
 
  99     private Flux<ReRegistrationConsumerDmaapModel> doCreateTargetFlux(Flux<JsonObject> jsonObject) {
 
 101                 .flatMap(this::transform)
 
 102                 .onErrorResume(exception -> exception instanceof DmaapException, e -> Mono.empty());
 
 105     private Mono<ReRegistrationConsumerDmaapModel> transform(JsonObject dmaapResponseJsonObject) {
 
 107         if (!containsProperHeaders(dmaapResponseJsonObject)) {
 
 108             LOGGER.warn("Incorrect JsonObject - missing headers");
 
 112         JsonObject pnfReRegistrationFields =
 
 113                 dmaapResponseJsonObject.getAsJsonObject(ADDITIONAL_FIELDS);
 
 115         pnfCorrelationId = getValueFromJson(dmaapResponseJsonObject, CORRELATION_ID);
 
 117         attachmentPoint = getValueFromJson(pnfReRegistrationFields, ATTACHMENT_POINT);
 
 118         remoteId = getValueFromJson(pnfReRegistrationFields, REMOTE_ID);
 
 119         cvlan = getValueFromJson(pnfReRegistrationFields, CVLAN);
 
 120         svlan = getValueFromJson(pnfReRegistrationFields, SVLAN);
 
 122         if (StringUtils.isEmpty(pnfCorrelationId) || anyImportantPropertyMissing()) {
 
 123             String incorrectEvent = dumpJsonData();
 
 124             LOGGER.warn("Incorrect Re-Registration JSON event: {}", incorrectEvent);
 
 128         return Mono.just(ImmutableReRegistrationConsumerDmaapModel.builder()
 
 129                 .correlationId(pnfCorrelationId)
 
 130                 .attachmentPoint(attachmentPoint)
 
 137     private boolean anyImportantPropertyMissing() {
 
 138         return StringUtils.isEmpty(attachmentPoint)
 
 139                 || StringUtils.isEmpty(remoteId)
 
 140                 || StringUtils.isEmpty(cvlan)
 
 141                 || StringUtils.isEmpty(svlan);
 
 144     private boolean containsProperHeaders(JsonObject jsonObject) {
 
 145         return jsonObject.has(ADDITIONAL_FIELDS);
 
 148     private String dumpJsonData() {
 
 149         return String.format(RE_REGISTRATION_DUMPING_TEMPLATE,
 
 158     Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
 
 159         JsonParser jsonParser = new JsonParser();
 
 160         return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
 
 161                 : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
 
 164     private String getValueFromJson(JsonObject jsonObject, String jsonKey) {
 
 165         return jsonObject.has(jsonKey) ? jsonObject.get(jsonKey).getAsString() : "";