2 * ============LICENSE_START=======================================================
3 * Copyright (c) 2021 Bell Canada.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 * ============LICENSE_END=========================================================
19 package org.onap.cps.temporal.controller.event.listener.kafka;
21 import static org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException.InvalidField.ErrorType.MISSING;
22 import static org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException.InvalidField.ErrorType.UNEXPECTED;
25 import java.net.URISyntaxException;
26 import lombok.extern.slf4j.Slf4j;
27 import org.onap.cps.event.model.CpsDataUpdatedEvent;
28 import org.onap.cps.temporal.controller.event.listener.exception.EventListenerException;
29 import org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException;
30 import org.onap.cps.temporal.controller.event.model.CpsDataUpdatedEventMapper;
31 import org.onap.cps.temporal.service.NetworkDataService;
32 import org.springframework.kafka.annotation.KafkaListener;
33 import org.springframework.stereotype.Component;
34 import org.springframework.util.StringUtils;
37 * Listener for data updated events.
41 public class DataUpdatedEventListener {
43 private static final URI EVENT_SOURCE;
47 EVENT_SOURCE = new URI("urn:cps:org.onap.cps");
48 } catch (final URISyntaxException e) {
49 throw new EventListenerException("Invalid URI for event source.", e);
53 private static final String EVENT_TYPE = "org.onap.cps.data-updated-event";
55 private final NetworkDataService networkDataService;
56 private final CpsDataUpdatedEventMapper cpsDataUpdatedEventMapper;
61 public DataUpdatedEventListener(
62 final NetworkDataService networkDataService, final CpsDataUpdatedEventMapper cpsDataUpdatedEventMapper) {
63 this.networkDataService = networkDataService;
64 this.cpsDataUpdatedEventMapper = cpsDataUpdatedEventMapper;
68 * Consume the specified event.
70 * @param cpsDataUpdatedEvent the data updated event to be consumed and persisted.
72 @KafkaListener(topics = "${app.listener.data-updated.topic}", errorHandler = "dataUpdatedEventListenerErrorHandler")
73 public void consume(final CpsDataUpdatedEvent cpsDataUpdatedEvent) {
75 log.debug("Receiving {} ...", cpsDataUpdatedEvent);
77 // Validate event envelop
78 validateEventEnvelop(cpsDataUpdatedEvent);
80 // Map event to entity
81 final var networkData = this.cpsDataUpdatedEventMapper.eventToEntity(cpsDataUpdatedEvent);
82 log.debug("Persisting {} ...", networkData);
85 final var persistedNetworkData = this.networkDataService.addNetworkData(networkData);
86 log.debug("Persisted {}", persistedNetworkData);
90 private void validateEventEnvelop(final CpsDataUpdatedEvent cpsDataUpdatedEvent) {
92 final var invalidEventEnvelopException = new InvalidEventEnvelopException("Validation failure");
95 if (cpsDataUpdatedEvent.getSchema() == null) {
96 invalidEventEnvelopException.addInvalidField(
97 new InvalidEventEnvelopException.InvalidField(
98 MISSING, "schema", null,
99 CpsDataUpdatedEvent.Schema.URN_CPS_ORG_ONAP_CPS_DATA_UPDATED_EVENT_SCHEMA_1_1_0_SNAPSHOT
103 if (!StringUtils.hasText(cpsDataUpdatedEvent.getId())) {
104 invalidEventEnvelopException.addInvalidField(
105 new InvalidEventEnvelopException.InvalidField(
106 MISSING, "id", null, null));
109 if (cpsDataUpdatedEvent.getSource() == null || !cpsDataUpdatedEvent.getSource().equals(EVENT_SOURCE)) {
110 invalidEventEnvelopException.addInvalidField(
111 new InvalidEventEnvelopException.InvalidField(
112 UNEXPECTED, "source",
113 cpsDataUpdatedEvent.getSource() != null
114 ? cpsDataUpdatedEvent.getSource().toString() : null, EVENT_SOURCE.toString()));
117 if (cpsDataUpdatedEvent.getType() == null || !cpsDataUpdatedEvent.getType().equals(EVENT_TYPE)) {
118 invalidEventEnvelopException.addInvalidField(
119 new InvalidEventEnvelopException.InvalidField(
120 UNEXPECTED, "type", cpsDataUpdatedEvent.getType(), EVENT_TYPE));
123 if (invalidEventEnvelopException.hasInvalidFields()) {
124 throw invalidEventEnvelopException;