2  * ============LICENSE_START=======================================================
 
   3  * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
 
   4  * ================================================================================
 
   5  * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.bbs.event.processor.utilities;
 
  23 import static org.onap.bbs.event.processor.utilities.CommonEventFields.COMMON_EVENT_HEADER;
 
  24 import static org.onap.bbs.event.processor.utilities.CommonEventFields.COMMON_FORMAT;
 
  25 import static org.onap.bbs.event.processor.utilities.CommonEventFields.CORRELATION_ID;
 
  26 import static org.onap.bbs.event.processor.utilities.CommonEventFields.EVENT;
 
  27 import static org.onap.bbs.event.processor.utilities.CommonEventFields.SOURCE_NAME;
 
  28 import static org.onap.bbs.event.processor.utilities.CpeAuthenticationEventFields.ADDITIONAL_FIELDS;
 
  29 import static org.onap.bbs.event.processor.utilities.CpeAuthenticationEventFields.NEW_AUTHENTICATION_STATE;
 
  30 import static org.onap.bbs.event.processor.utilities.CpeAuthenticationEventFields.OLD_AUTHENTICATION_STATE;
 
  31 import static org.onap.bbs.event.processor.utilities.CpeAuthenticationEventFields.RGW_MAC_ADDRESS;
 
  32 import static org.onap.bbs.event.processor.utilities.CpeAuthenticationEventFields.STATE_CHANGE_FIELDS;
 
  33 import static org.onap.bbs.event.processor.utilities.CpeAuthenticationEventFields.STATE_INTERFACE;
 
  34 import static org.onap.bbs.event.processor.utilities.CpeAuthenticationEventFields.SW_VERSION;
 
  36 import com.google.gson.JsonElement;
 
  37 import com.google.gson.JsonObject;
 
  38 import com.google.gson.JsonParser;
 
  39 import com.google.gson.JsonSyntaxException;
 
  41 import java.util.Optional;
 
  42 import java.util.stream.StreamSupport;
 
  44 import org.onap.bbs.event.processor.exceptions.DmaapException;
 
  45 import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel;
 
  46 import org.onap.bbs.event.processor.model.ImmutableCpeAuthenticationConsumerDmaapModel;
 
  47 import org.slf4j.Logger;
 
  48 import org.slf4j.LoggerFactory;
 
  49 import org.springframework.util.StringUtils;
 
  51 import reactor.core.publisher.Flux;
 
  52 import reactor.core.publisher.Mono;
 
  54 public class CpeAuthenticationDmaapConsumerJsonParser {
 
  56     private static final Logger LOGGER = LoggerFactory.getLogger(CpeAuthenticationDmaapConsumerJsonParser.class);
 
  58     private static final String CPE_AUTHENTICATION_DUMPING_TEMPLATE = "%n{"
 
  59             + "\"" + CORRELATION_ID + COMMON_FORMAT + ","
 
  60             + "\"" + OLD_AUTHENTICATION_STATE + COMMON_FORMAT + ","
 
  61             + "\"" + NEW_AUTHENTICATION_STATE + COMMON_FORMAT + ","
 
  62             + "\"" + STATE_INTERFACE + COMMON_FORMAT + ","
 
  63             + "\"" + RGW_MAC_ADDRESS + COMMON_FORMAT + ","
 
  64             + "\"" + SW_VERSION + COMMON_FORMAT
 
  67     private String pnfSourceName;
 
  69     private String oldAuthenticationStatus;
 
  70     private String newAuthenticationStatus;
 
  71     private String stateInterface;
 
  72     private String rgwMacAddress;
 
  73     private String swVersion;
 
  76      * Translates a response from DMaaP to a reactive {@link CpeAuthenticationConsumerDmaapModel} model.
 
  77      * @param dmaapResponse Response from DMaaP
 
  78      * @return CPE Authentication Consumer DMaaP reactive model
 
  80     public Flux<CpeAuthenticationConsumerDmaapModel> extractModelFromDmaap(Mono<String> dmaapResponse) {
 
  82                 .flatMapMany(this::parseToMono)
 
  83                 .flatMap(this::createTargetFlux);
 
  86     private Mono<JsonElement> parseToMono(String message) {
 
  87         if (StringUtils.isEmpty(message)) {
 
  88             LOGGER.warn("DMaaP response is empty");
 
  91         return Mono.fromCallable(() -> new JsonParser().parse(message))
 
  92                 .doOnError(e -> e instanceof JsonSyntaxException, e -> LOGGER.error("Invalid JSON. Ignoring"))
 
  93                 .onErrorResume(e -> e instanceof JsonSyntaxException, e -> Mono.empty());
 
  96     private Flux<CpeAuthenticationConsumerDmaapModel> createTargetFlux(JsonElement jsonElement) {
 
  97         if (jsonElement.isJsonObject()) {
 
  98             return doCreateTargetFlux(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject())));
 
 100         return doCreateTargetFlux(
 
 101                 Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
 
 102                         .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
 
 103                                 .orElseGet(JsonObject::new)))));
 
 106     private Flux<CpeAuthenticationConsumerDmaapModel> doCreateTargetFlux(Flux<JsonObject> jsonObject) {
 
 108                 .flatMap(this::transform)
 
 109                 .onErrorResume(exception -> exception instanceof DmaapException, e -> Mono.empty());
 
 112     private Mono<CpeAuthenticationConsumerDmaapModel> transform(JsonObject dmaapResponseJsonObject) {
 
 114         if (!containsProperHeaders(dmaapResponseJsonObject)) {
 
 115             LOGGER.warn("Incorrect CPE Authentication JSON event - missing headers");
 
 119         JsonObject commonEventHeader = dmaapResponseJsonObject.getAsJsonObject(EVENT)
 
 120                 .getAsJsonObject(COMMON_EVENT_HEADER);
 
 121         JsonObject stateChangeFields = dmaapResponseJsonObject.getAsJsonObject(EVENT)
 
 122                 .getAsJsonObject(STATE_CHANGE_FIELDS);
 
 124         pnfSourceName = getValueFromJson(commonEventHeader, SOURCE_NAME);
 
 126         oldAuthenticationStatus = getValueFromJson(stateChangeFields, OLD_AUTHENTICATION_STATE);
 
 127         newAuthenticationStatus = getValueFromJson(stateChangeFields, NEW_AUTHENTICATION_STATE);
 
 128         stateInterface = getValueFromJson(stateChangeFields, STATE_INTERFACE);
 
 130         if (stateChangeFields.has(ADDITIONAL_FIELDS)) {
 
 131             JsonObject additionalFields = stateChangeFields.getAsJsonObject(ADDITIONAL_FIELDS);
 
 132             rgwMacAddress = getValueFromJson(additionalFields, RGW_MAC_ADDRESS);
 
 133             swVersion = getValueFromJson(additionalFields, SW_VERSION);
 
 136         if (StringUtils.isEmpty(pnfSourceName)
 
 137                 || authenticationStatusMissing(oldAuthenticationStatus)
 
 138                 || authenticationStatusMissing(newAuthenticationStatus)) {
 
 139             String incorrectEvent = dumpJsonData();
 
 140             LOGGER.warn("Incorrect CPE Authentication JSON event: {}", incorrectEvent);
 
 144         return Mono.just(ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 145                 .correlationId(pnfSourceName)
 
 146                 .oldAuthenticationState(oldAuthenticationStatus)
 
 147                 .newAuthenticationState(newAuthenticationStatus)
 
 148                 .stateInterface(stateInterface)
 
 149                 .rgwMacAddress(rgwMacAddress)
 
 150                 .swVersion(swVersion)
 
 154     private boolean authenticationStatusMissing(String authenticationStatus) {
 
 155         return StringUtils.isEmpty(authenticationStatus);
 
 158     private boolean containsProperHeaders(JsonObject jsonObject) {
 
 159         return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(COMMON_EVENT_HEADER)
 
 160                 && jsonObject.getAsJsonObject(EVENT).has(STATE_CHANGE_FIELDS);
 
 163     private String dumpJsonData() {
 
 164         return String.format(CPE_AUTHENTICATION_DUMPING_TEMPLATE,
 
 166                 oldAuthenticationStatus,
 
 167                 newAuthenticationStatus,
 
 174     Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
 
 175         JsonParser jsonParser = new JsonParser();
 
 176         return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
 
 177                 : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
 
 180     private String getValueFromJson(JsonObject jsonObject, String jsonKey) {
 
 181         return jsonObject.has(jsonKey) ? jsonObject.get(jsonKey).getAsString() : "";