62ab02d5987dc06cb73787c6de12eab0464be82a
[dcaegen2/services.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
4  * ================================================================================
5  * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.bbs.event.processor.utilities;
22
23 import static org.onap.bbs.event.processor.utilities.CommonEventFields.COMMON_EVENT_HEADER;
24 import static org.onap.bbs.event.processor.utilities.CommonEventFields.COMMON_FORMAT;
25 import static org.onap.bbs.event.processor.utilities.CommonEventFields.CORRELATION_ID;
26 import static org.onap.bbs.event.processor.utilities.CommonEventFields.EVENT;
27 import static org.onap.bbs.event.processor.utilities.CommonEventFields.SOURCE_NAME;
28 import static org.onap.bbs.event.processor.utilities.CpeAuthenticationEventFields.ADDITIONAL_FIELDS;
29 import static org.onap.bbs.event.processor.utilities.CpeAuthenticationEventFields.NEW_AUTHENTICATION_STATE;
30 import static org.onap.bbs.event.processor.utilities.CpeAuthenticationEventFields.OLD_AUTHENTICATION_STATE;
31 import static org.onap.bbs.event.processor.utilities.CpeAuthenticationEventFields.RGW_MAC_ADDRESS;
32 import static org.onap.bbs.event.processor.utilities.CpeAuthenticationEventFields.STATE_CHANGE_FIELDS;
33 import static org.onap.bbs.event.processor.utilities.CpeAuthenticationEventFields.STATE_INTERFACE;
34 import static org.onap.bbs.event.processor.utilities.CpeAuthenticationEventFields.SW_VERSION;
35
36 import com.google.gson.JsonElement;
37 import com.google.gson.JsonObject;
38 import com.google.gson.JsonParser;
39 import com.google.gson.JsonSyntaxException;
40
41 import java.util.Optional;
42 import java.util.stream.StreamSupport;
43
44 import org.onap.bbs.event.processor.exceptions.DmaapException;
45 import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel;
46 import org.onap.bbs.event.processor.model.ImmutableCpeAuthenticationConsumerDmaapModel;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49 import org.springframework.util.StringUtils;
50
51 import reactor.core.publisher.Flux;
52 import reactor.core.publisher.Mono;
53
54 public class CpeAuthenticationDmaapConsumerJsonParser {
55
56     private static final Logger LOGGER = LoggerFactory.getLogger(CpeAuthenticationDmaapConsumerJsonParser.class);
57
58     private static final String CPE_AUTHENTICATION_DUMPING_TEMPLATE = "%n{"
59             + "\"" + CORRELATION_ID + COMMON_FORMAT + ","
60             + "\"" + OLD_AUTHENTICATION_STATE + COMMON_FORMAT + ","
61             + "\"" + NEW_AUTHENTICATION_STATE + COMMON_FORMAT + ","
62             + "\"" + STATE_INTERFACE + COMMON_FORMAT + ","
63             + "\"" + RGW_MAC_ADDRESS + COMMON_FORMAT + ","
64             + "\"" + SW_VERSION + COMMON_FORMAT
65             + "}";
66
67     private String pnfSourceName;
68
69     private String oldAuthenticationStatus;
70     private String newAuthenticationStatus;
71     private String stateInterface;
72     private String rgwMacAddress;
73     private String swVersion;
74
75     /**
76      * Translates a response from DMaaP to a reactive {@link CpeAuthenticationConsumerDmaapModel} model.
77      * @param dmaapResponse Response from DMaaP
78      * @return CPE Authentication Consumer DMaaP reactive model
79      */
80     public Flux<CpeAuthenticationConsumerDmaapModel> extractModelFromDmaap(Mono<String> dmaapResponse) {
81         return dmaapResponse
82                 .flatMapMany(this::parseToMono)
83                 .flatMap(this::createTargetFlux);
84     }
85
86     private Mono<JsonElement> parseToMono(String message) {
87         if (StringUtils.isEmpty(message)) {
88             LOGGER.warn("DMaaP response is empty");
89             return Mono.empty();
90         }
91         return Mono.fromCallable(() -> new JsonParser().parse(message))
92                 .doOnError(e -> e instanceof JsonSyntaxException, e -> LOGGER.error("Invalid JSON. Ignoring"))
93                 .onErrorResume(e -> e instanceof JsonSyntaxException, e -> Mono.empty());
94     }
95
96     private Flux<CpeAuthenticationConsumerDmaapModel> createTargetFlux(JsonElement jsonElement) {
97         if (jsonElement.isJsonObject()) {
98             return doCreateTargetFlux(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject())));
99         }
100         return doCreateTargetFlux(
101                 Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
102                         .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
103                                 .orElseGet(JsonObject::new)))));
104     }
105
106     private Flux<CpeAuthenticationConsumerDmaapModel> doCreateTargetFlux(Flux<JsonObject> jsonObject) {
107         return jsonObject
108                 .flatMap(this::transform)
109                 .onErrorResume(exception -> exception instanceof DmaapException, e -> Mono.empty());
110     }
111
112     private Mono<CpeAuthenticationConsumerDmaapModel> transform(JsonObject dmaapResponseJsonObject) {
113
114         if (!containsProperHeaders(dmaapResponseJsonObject)) {
115             LOGGER.warn("Incorrect CPE Authentication JSON event - missing headers");
116             return Mono.empty();
117         }
118
119         JsonObject commonEventHeader = dmaapResponseJsonObject.getAsJsonObject(EVENT)
120                 .getAsJsonObject(COMMON_EVENT_HEADER);
121         JsonObject stateChangeFields = dmaapResponseJsonObject.getAsJsonObject(EVENT)
122                 .getAsJsonObject(STATE_CHANGE_FIELDS);
123
124         pnfSourceName = getValueFromJson(commonEventHeader, SOURCE_NAME);
125
126         oldAuthenticationStatus = getValueFromJson(stateChangeFields, OLD_AUTHENTICATION_STATE);
127         newAuthenticationStatus = getValueFromJson(stateChangeFields, NEW_AUTHENTICATION_STATE);
128         stateInterface = getValueFromJson(stateChangeFields, STATE_INTERFACE);
129
130         if (stateChangeFields.has(ADDITIONAL_FIELDS)) {
131             JsonObject additionalFields = stateChangeFields.getAsJsonObject(ADDITIONAL_FIELDS);
132             rgwMacAddress = getValueFromJson(additionalFields, RGW_MAC_ADDRESS);
133             swVersion = getValueFromJson(additionalFields, SW_VERSION);
134         }
135
136         if (StringUtils.isEmpty(pnfSourceName)
137                 || authenticationStatusMissing(oldAuthenticationStatus)
138                 || authenticationStatusMissing(newAuthenticationStatus)) {
139             String incorrectEvent = dumpJsonData();
140             LOGGER.warn("Incorrect CPE Authentication JSON event: {}", incorrectEvent);
141             return Mono.empty();
142         }
143
144         return Mono.just(ImmutableCpeAuthenticationConsumerDmaapModel.builder()
145                 .correlationId(pnfSourceName)
146                 .oldAuthenticationState(oldAuthenticationStatus)
147                 .newAuthenticationState(newAuthenticationStatus)
148                 .stateInterface(stateInterface)
149                 .rgwMacAddress(rgwMacAddress)
150                 .swVersion(swVersion)
151                 .build());
152     }
153
154     private boolean authenticationStatusMissing(String authenticationStatus) {
155         return StringUtils.isEmpty(authenticationStatus);
156     }
157
158     private boolean containsProperHeaders(JsonObject jsonObject) {
159         return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(COMMON_EVENT_HEADER)
160                 && jsonObject.getAsJsonObject(EVENT).has(STATE_CHANGE_FIELDS);
161     }
162
163     private String dumpJsonData() {
164         return String.format(CPE_AUTHENTICATION_DUMPING_TEMPLATE,
165                 pnfSourceName,
166                 oldAuthenticationStatus,
167                 newAuthenticationStatus,
168                 stateInterface,
169                 rgwMacAddress,
170                 swVersion
171         );
172     }
173
174     Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
175         JsonParser jsonParser = new JsonParser();
176         return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
177                 : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
178     }
179
180     private String getValueFromJson(JsonObject jsonObject, String jsonKey) {
181         return jsonObject.has(jsonKey) ? jsonObject.get(jsonKey).getAsString() : "";
182     }
183 }