947d7a7c74cdccd5a4c96a5b8102b60da7b5087b
[dcaegen2/services.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
4  * ================================================================================
5  * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.bbs.event.processor.utilities;
22
23 import static org.onap.bbs.event.processor.utilities.CommonEventFields.COMMON_FORMAT;
24 import static org.onap.bbs.event.processor.utilities.CommonEventFields.CORRELATION_ID;
25 import static org.onap.bbs.event.processor.utilities.ReRegistrationEventFields.ADDITIONAL_FIELDS;
26 import static org.onap.bbs.event.processor.utilities.ReRegistrationEventFields.ATTACHMENT_POINT;
27 import static org.onap.bbs.event.processor.utilities.ReRegistrationEventFields.CVLAN;
28 import static org.onap.bbs.event.processor.utilities.ReRegistrationEventFields.REMOTE_ID;
29 import static org.onap.bbs.event.processor.utilities.ReRegistrationEventFields.SVLAN;
30
31 import com.google.gson.Gson;
32 import com.google.gson.JsonElement;
33 import com.google.gson.JsonObject;
34 import com.google.gson.JsonParser;
35 import com.google.gson.JsonSyntaxException;
36
37 import java.util.Optional;
38 import java.util.stream.StreamSupport;
39
40 import org.onap.bbs.event.processor.exceptions.DmaapException;
41 import org.onap.bbs.event.processor.model.ImmutableReRegistrationConsumerDmaapModel;
42 import org.onap.bbs.event.processor.model.ReRegistrationConsumerDmaapModel;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45 import org.springframework.util.StringUtils;
46
47 import reactor.core.publisher.Flux;
48 import reactor.core.publisher.Mono;
49
50 public class ReRegistrationDmaapConsumerJsonParser {
51
52     private static final Logger LOGGER = LoggerFactory.getLogger(ReRegistrationDmaapConsumerJsonParser.class);
53     private static final Gson gson = new Gson();
54
55     private static final String RE_REGISTRATION_DUMPING_TEMPLATE = "%n{"
56             + "\"" + CORRELATION_ID + COMMON_FORMAT + ","
57             + "\"" + ATTACHMENT_POINT + COMMON_FORMAT + ","
58             + "\"" + REMOTE_ID + COMMON_FORMAT + ","
59             + "\"" + CVLAN + COMMON_FORMAT + ","
60             + "\"" + SVLAN + COMMON_FORMAT
61             + "}";
62
63     private String pnfCorrelationId;
64
65     private String attachmentPoint;
66     private String remoteId;
67     private String cvlan;
68     private String svlan;
69
70     /**
71      * Translates a response from DMaaP to a reactive {@link ReRegistrationConsumerDmaapModel} model.
72      * @param dmaapResponse Response from DMaaP
73      * @return Re-Registration Consumer DMaaP reactive model
74      */
75     public Flux<ReRegistrationConsumerDmaapModel> extractModelFromDmaap(Mono<String> dmaapResponse) {
76         return dmaapResponse
77                 .flatMapMany(this::parseToMono)
78                 .flatMap(this::createTargetFlux);
79     }
80
81     private Mono<JsonElement> parseToMono(String message) {
82         if (StringUtils.isEmpty(message)) {
83             LOGGER.warn("DMaaP response is empty");
84             return Mono.empty();
85         }
86         return Mono.fromCallable(() -> new JsonParser().parse(message))
87                 .doOnError(e -> e instanceof JsonSyntaxException, e -> LOGGER.error("Invalid JSON. Ignoring"))
88                 .onErrorResume(e -> e instanceof JsonSyntaxException, e -> Mono.empty());
89     }
90
91     private Flux<ReRegistrationConsumerDmaapModel> createTargetFlux(JsonElement jsonElement) {
92         if (jsonElement.isJsonObject()) {
93             return doCreateTargetFlux(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject())));
94         }
95         return doCreateTargetFlux(
96                 Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
97                         .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
98                                 .orElseGet(JsonObject::new)))));
99     }
100
101     private Flux<ReRegistrationConsumerDmaapModel> doCreateTargetFlux(Flux<JsonObject> jsonObject) {
102         return jsonObject
103                 .flatMap(this::transform)
104                 .onErrorResume(exception -> exception instanceof DmaapException, e -> Mono.empty());
105     }
106
107     private Mono<ReRegistrationConsumerDmaapModel> transform(JsonObject dmaapResponseJsonObject) {
108
109         LOGGER.trace("Event from DMaaP to be parsed: \n{}", gson.toJson(dmaapResponseJsonObject));
110
111         if (!containsProperHeaders(dmaapResponseJsonObject)) {
112             LOGGER.warn("Incorrect JsonObject - missing headers");
113             return Mono.empty();
114         }
115
116         JsonObject pnfReRegistrationFields =
117                 dmaapResponseJsonObject.getAsJsonObject(ADDITIONAL_FIELDS);
118
119         pnfCorrelationId = getValueFromJson(dmaapResponseJsonObject, CORRELATION_ID);
120
121         attachmentPoint = getValueFromJson(pnfReRegistrationFields, ATTACHMENT_POINT);
122         remoteId = getValueFromJson(pnfReRegistrationFields, REMOTE_ID);
123         cvlan = getValueFromJson(pnfReRegistrationFields, CVLAN);
124         svlan = getValueFromJson(pnfReRegistrationFields, SVLAN);
125
126         if (StringUtils.isEmpty(pnfCorrelationId) || anyImportantPropertyMissing()) {
127             String incorrectEvent = dumpJsonData();
128             LOGGER.warn("Incorrect Re-Registration JSON event: {}", incorrectEvent);
129             return Mono.empty();
130         }
131
132         return Mono.just(ImmutableReRegistrationConsumerDmaapModel.builder()
133                 .correlationId(pnfCorrelationId)
134                 .attachmentPoint(attachmentPoint)
135                 .remoteId(remoteId)
136                 .cVlan(cvlan)
137                 .sVlan(svlan)
138                 .build());
139     }
140
141     private boolean anyImportantPropertyMissing() {
142         return StringUtils.isEmpty(attachmentPoint)
143                 || StringUtils.isEmpty(remoteId)
144                 || StringUtils.isEmpty(cvlan)
145                 || StringUtils.isEmpty(svlan);
146     }
147
148     private boolean containsProperHeaders(JsonObject jsonObject) {
149         return jsonObject.has(ADDITIONAL_FIELDS);
150     }
151
152     private String dumpJsonData() {
153         return String.format(RE_REGISTRATION_DUMPING_TEMPLATE,
154                 pnfCorrelationId,
155                 attachmentPoint,
156                 remoteId,
157                 cvlan,
158                 svlan
159         );
160     }
161
162     Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
163         JsonParser jsonParser = new JsonParser();
164         return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
165                 : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
166     }
167
168     private String getValueFromJson(JsonObject jsonObject, String jsonKey) {
169         return jsonObject.has(jsonKey) ? jsonObject.get(jsonKey).getAsString() : "";
170     }
171 }