2 * ============LICENSE_START=======================================================
3 * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
4 * ================================================================================
5 * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.bbs.event.processor.utilities;
23 import static org.onap.bbs.event.processor.utilities.CommonEventFields.COMMON_EVENT_HEADER;
24 import static org.onap.bbs.event.processor.utilities.CommonEventFields.COMMON_FORMAT;
25 import static org.onap.bbs.event.processor.utilities.CommonEventFields.CORRELATION_ID;
26 import static org.onap.bbs.event.processor.utilities.CommonEventFields.EVENT;
27 import static org.onap.bbs.event.processor.utilities.CommonEventFields.SOURCE_NAME;
28 import static org.onap.bbs.event.processor.utilities.CpeAuthenticationEventFields.ADDITIONAL_FIELDS;
29 import static org.onap.bbs.event.processor.utilities.CpeAuthenticationEventFields.NEW_AUTHENTICATION_STATE;
30 import static org.onap.bbs.event.processor.utilities.CpeAuthenticationEventFields.OLD_AUTHENTICATION_STATE;
31 import static org.onap.bbs.event.processor.utilities.CpeAuthenticationEventFields.RGW_MAC_ADDRESS;
32 import static org.onap.bbs.event.processor.utilities.CpeAuthenticationEventFields.STATE_CHANGE_FIELDS;
33 import static org.onap.bbs.event.processor.utilities.CpeAuthenticationEventFields.STATE_INTERFACE;
34 import static org.onap.bbs.event.processor.utilities.CpeAuthenticationEventFields.SW_VERSION;
36 import com.google.gson.Gson;
37 import com.google.gson.JsonElement;
38 import com.google.gson.JsonObject;
39 import com.google.gson.JsonParser;
40 import com.google.gson.JsonSyntaxException;
42 import java.util.Optional;
43 import java.util.stream.StreamSupport;
45 import org.onap.bbs.event.processor.exceptions.DmaapException;
46 import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel;
47 import org.onap.bbs.event.processor.model.ImmutableCpeAuthenticationConsumerDmaapModel;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50 import org.springframework.util.StringUtils;
52 import reactor.core.publisher.Flux;
53 import reactor.core.publisher.Mono;
55 public class CpeAuthenticationDmaapConsumerJsonParser {
57 private static final Logger LOGGER = LoggerFactory.getLogger(CpeAuthenticationDmaapConsumerJsonParser.class);
58 private static final Gson gson = new Gson();
60 private static final String CPE_AUTHENTICATION_DUMPING_TEMPLATE = "%n{"
61 + "\"" + CORRELATION_ID + COMMON_FORMAT + ","
62 + "\"" + OLD_AUTHENTICATION_STATE + COMMON_FORMAT + ","
63 + "\"" + NEW_AUTHENTICATION_STATE + COMMON_FORMAT + ","
64 + "\"" + STATE_INTERFACE + COMMON_FORMAT + ","
65 + "\"" + RGW_MAC_ADDRESS + COMMON_FORMAT + ","
66 + "\"" + SW_VERSION + COMMON_FORMAT
69 private String pnfSourceName;
71 private String oldAuthenticationStatus;
72 private String newAuthenticationStatus;
73 private String stateInterface;
74 private String rgwMacAddress;
75 private String swVersion;
78 * Translates a response from DMaaP to a reactive {@link CpeAuthenticationConsumerDmaapModel} model.
79 * @param dmaapResponse Response from DMaaP
80 * @return CPE Authentication Consumer DMaaP reactive model
82 public Flux<CpeAuthenticationConsumerDmaapModel> extractModelFromDmaap(Mono<String> dmaapResponse) {
84 .flatMapMany(this::parseToMono)
85 .flatMap(this::createTargetFlux);
88 private Mono<JsonElement> parseToMono(String message) {
89 if (StringUtils.isEmpty(message)) {
90 LOGGER.warn("DMaaP response is empty");
93 return Mono.fromCallable(() -> new JsonParser().parse(message))
94 .doOnError(e -> e instanceof JsonSyntaxException, e -> LOGGER.error("Invalid JSON. Ignoring"))
95 .onErrorResume(e -> e instanceof JsonSyntaxException, e -> Mono.empty());
98 private Flux<CpeAuthenticationConsumerDmaapModel> createTargetFlux(JsonElement jsonElement) {
99 if (jsonElement.isJsonObject()) {
100 return doCreateTargetFlux(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject())));
102 return doCreateTargetFlux(
103 Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
104 .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
105 .orElseGet(JsonObject::new)))));
108 private Flux<CpeAuthenticationConsumerDmaapModel> doCreateTargetFlux(Flux<JsonObject> jsonObject) {
110 .flatMap(this::transform)
111 .onErrorResume(exception -> exception instanceof DmaapException, e -> Mono.empty());
114 private Mono<CpeAuthenticationConsumerDmaapModel> transform(JsonObject dmaapResponseJsonObject) {
116 LOGGER.trace("Event from DMaaP to be parsed: \n{}", gson.toJson(dmaapResponseJsonObject));
118 if (!containsProperHeaders(dmaapResponseJsonObject)) {
119 LOGGER.warn("Incorrect CPE Authentication JSON event - missing headers");
123 JsonObject commonEventHeader = dmaapResponseJsonObject.getAsJsonObject(EVENT)
124 .getAsJsonObject(COMMON_EVENT_HEADER);
125 JsonObject stateChangeFields = dmaapResponseJsonObject.getAsJsonObject(EVENT)
126 .getAsJsonObject(STATE_CHANGE_FIELDS);
128 pnfSourceName = getValueFromJson(commonEventHeader, SOURCE_NAME);
130 oldAuthenticationStatus = getValueFromJson(stateChangeFields, OLD_AUTHENTICATION_STATE);
131 newAuthenticationStatus = getValueFromJson(stateChangeFields, NEW_AUTHENTICATION_STATE);
132 stateInterface = getValueFromJson(stateChangeFields, STATE_INTERFACE);
134 if (stateChangeFields.has(ADDITIONAL_FIELDS)) {
135 JsonObject additionalFields = stateChangeFields.getAsJsonObject(ADDITIONAL_FIELDS);
136 rgwMacAddress = getValueFromJson(additionalFields, RGW_MAC_ADDRESS);
137 swVersion = getValueFromJson(additionalFields, SW_VERSION);
140 if (StringUtils.isEmpty(pnfSourceName)
141 || authenticationStatusMissing(oldAuthenticationStatus)
142 || authenticationStatusMissing(newAuthenticationStatus)) {
143 String incorrectEvent = dumpJsonData();
144 LOGGER.warn("Incorrect CPE Authentication JSON event: {}", incorrectEvent);
148 return Mono.just(ImmutableCpeAuthenticationConsumerDmaapModel.builder()
149 .correlationId(pnfSourceName)
150 .oldAuthenticationState(oldAuthenticationStatus)
151 .newAuthenticationState(newAuthenticationStatus)
152 .stateInterface(stateInterface)
153 .rgwMacAddress(rgwMacAddress)
154 .swVersion(swVersion)
158 private boolean authenticationStatusMissing(String authenticationStatus) {
159 return StringUtils.isEmpty(authenticationStatus);
162 private boolean containsProperHeaders(JsonObject jsonObject) {
163 return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(COMMON_EVENT_HEADER)
164 && jsonObject.getAsJsonObject(EVENT).has(STATE_CHANGE_FIELDS);
167 private String dumpJsonData() {
168 return String.format(CPE_AUTHENTICATION_DUMPING_TEMPLATE,
170 oldAuthenticationStatus,
171 newAuthenticationStatus,
178 Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
179 JsonParser jsonParser = new JsonParser();
180 return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
181 : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
184 private String getValueFromJson(JsonObject jsonObject, String jsonKey) {
185 return jsonObject.has(jsonKey) ? jsonObject.get(jsonKey).getAsString() : "";