2 * ============LICENSE_START=======================================================
3 * Copyright (c) 2021 Bell Canada.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 * ============LICENSE_END=========================================================
19 package org.onap.cps.temporal.controller.event.listener.kafka;
21 import static org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException.InvalidField.ErrorType.MISSING;
22 import static org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException.InvalidField.ErrorType.UNEXPECTED;
25 import java.net.URISyntaxException;
26 import lombok.extern.slf4j.Slf4j;
27 import org.onap.cps.event.model.CpsDataUpdatedEvent;
28 import org.onap.cps.temporal.controller.event.listener.exception.EventListenerException;
29 import org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException;
30 import org.onap.cps.temporal.controller.event.model.CpsDataUpdatedEventMapper;
31 import org.onap.cps.temporal.service.NetworkDataService;
32 import org.springframework.kafka.annotation.KafkaListener;
33 import org.springframework.stereotype.Component;
34 import org.springframework.util.StringUtils;
37 * Listener for data updated events.
41 public class DataUpdatedEventListener {
43 private static final URI EVENT_SOURCE;
47 EVENT_SOURCE = new URI("urn:cps:org.onap.cps");
48 } catch (final URISyntaxException e) {
49 throw new EventListenerException("Invalid URI for event source.", e);
53 private static final String EVENT_TYPE = "org.onap.cps.data-updated-event";
55 private final NetworkDataService networkDataService;
56 private final CpsDataUpdatedEventMapper cpsDataUpdatedEventMapper;
61 public DataUpdatedEventListener(
62 final NetworkDataService networkDataService, final CpsDataUpdatedEventMapper cpsDataUpdatedEventMapper) {
63 this.networkDataService = networkDataService;
64 this.cpsDataUpdatedEventMapper = cpsDataUpdatedEventMapper;
68 * Consume the specified event.
70 * @param cpsDataUpdatedEvent the data updated event to be consumed and persisted.
72 @KafkaListener(topics = "${app.listener.data-updated.topic}", errorHandler = "dataUpdatedEventListenerErrorHandler")
73 public void consume(final CpsDataUpdatedEvent cpsDataUpdatedEvent) {
75 log.debug("Receiving {} ...", cpsDataUpdatedEvent);
77 // Validate event envelop
78 validateEventEnvelop(cpsDataUpdatedEvent);
80 // Map event to entity
81 final var networkData = this.cpsDataUpdatedEventMapper.eventToEntity(cpsDataUpdatedEvent);
82 log.debug("Persisting {} ...", networkData);
85 final var persistedNetworkData = this.networkDataService.addNetworkData(networkData);
86 log.debug("Persisted {}", persistedNetworkData);
90 private void validateEventEnvelop(final CpsDataUpdatedEvent cpsDataUpdatedEvent) {
92 final var invalidEventEnvelopException =
93 new InvalidEventEnvelopException("Validation failure", cpsDataUpdatedEvent);
96 if (cpsDataUpdatedEvent.getSchema() == null) {
97 invalidEventEnvelopException.addInvalidField(
98 new InvalidEventEnvelopException.InvalidField(
99 MISSING, "schema", null,
100 CpsDataUpdatedEvent.Schema.URN_CPS_ORG_ONAP_CPS_DATA_UPDATED_EVENT_SCHEMA_1_1_0_SNAPSHOT
104 if (!StringUtils.hasText(cpsDataUpdatedEvent.getId())) {
105 invalidEventEnvelopException.addInvalidField(
106 new InvalidEventEnvelopException.InvalidField(
107 MISSING, "id", null, null));
110 if (!EVENT_SOURCE.equals(cpsDataUpdatedEvent.getSource())) {
111 invalidEventEnvelopException.addInvalidField(
112 new InvalidEventEnvelopException.InvalidField(
113 UNEXPECTED, "source",
114 cpsDataUpdatedEvent.getSource() != null
115 ? cpsDataUpdatedEvent.getSource().toString() : null, EVENT_SOURCE.toString()));
118 if (!EVENT_TYPE.equals(cpsDataUpdatedEvent.getType())) {
119 invalidEventEnvelopException.addInvalidField(
120 new InvalidEventEnvelopException.InvalidField(
121 UNEXPECTED, "type", cpsDataUpdatedEvent.getType(), EVENT_TYPE));
124 if (invalidEventEnvelopException.hasInvalidFields()) {
125 throw invalidEventEnvelopException;