79c9d92f626cfd6d210fe5f969ac7dafddefedc6
[cps/cps-temporal.git] / src / main / java / org / onap / cps / temporal / controller / event / listener / kafka / DataUpdatedEventListener.java
1 /*
2  * ============LICENSE_START=======================================================
3  * Copyright (c) 2021 Bell Canada.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *        http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * ============LICENSE_END=========================================================
17  */
18
19 package org.onap.cps.temporal.controller.event.listener.kafka;
20
21 import static org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException.InvalidField.ErrorType.MISSING;
22 import static org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException.InvalidField.ErrorType.UNEXPECTED;
23
24 import java.net.URI;
25 import java.net.URISyntaxException;
26 import lombok.extern.slf4j.Slf4j;
27 import org.onap.cps.event.model.CpsDataUpdatedEvent;
28 import org.onap.cps.temporal.controller.event.listener.exception.EventListenerException;
29 import org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException;
30 import org.onap.cps.temporal.controller.event.model.CpsDataUpdatedEventMapper;
31 import org.onap.cps.temporal.service.NetworkDataService;
32 import org.springframework.kafka.annotation.KafkaListener;
33 import org.springframework.stereotype.Component;
34 import org.springframework.util.StringUtils;
35
36 /**
37  * Listener for data updated events.
38  */
39 @Component
40 @Slf4j
41 public class DataUpdatedEventListener {
42
43     private static final URI EVENT_SOURCE;
44
45     static {
46         try {
47             EVENT_SOURCE = new URI("urn:cps:org.onap.cps");
48         } catch (final URISyntaxException e) {
49             throw new EventListenerException("Invalid URI for event source.", e);
50         }
51     }
52
53     private static final String EVENT_TYPE = "org.onap.cps.data-updated-event";
54
55     private final NetworkDataService networkDataService;
56     private final CpsDataUpdatedEventMapper cpsDataUpdatedEventMapper;
57
58     /**
59      * Constructor.
60      */
61     public DataUpdatedEventListener(
62             final NetworkDataService networkDataService, final CpsDataUpdatedEventMapper cpsDataUpdatedEventMapper) {
63         this.networkDataService = networkDataService;
64         this.cpsDataUpdatedEventMapper = cpsDataUpdatedEventMapper;
65     }
66
67     /**
68      * Consume the specified event.
69      *
70      * @param cpsDataUpdatedEvent the data updated event to be consumed and persisted.
71      */
72     @KafkaListener(topics = "${app.listener.data-updated.topic}", errorHandler = "dataUpdatedEventListenerErrorHandler")
73     public void consume(final CpsDataUpdatedEvent cpsDataUpdatedEvent) {
74
75         log.debug("Receiving {} ...", cpsDataUpdatedEvent);
76
77         // Validate event envelop
78         validateEventEnvelop(cpsDataUpdatedEvent);
79
80         // Map event to entity
81         final var networkData = this.cpsDataUpdatedEventMapper.eventToEntity(cpsDataUpdatedEvent);
82         log.debug("Persisting {} ...", networkData);
83
84         // Persist entity
85         final var persistedNetworkData = this.networkDataService.addNetworkData(networkData);
86         log.debug("Persisted {}", persistedNetworkData);
87
88     }
89
90     private void validateEventEnvelop(final CpsDataUpdatedEvent cpsDataUpdatedEvent) {
91
92         final var invalidEventEnvelopException = new InvalidEventEnvelopException("Validation failure");
93
94         // Schema
95         if (cpsDataUpdatedEvent.getSchema() == null) {
96             invalidEventEnvelopException.addInvalidField(
97                     new InvalidEventEnvelopException.InvalidField(
98                             MISSING, "schema", null,
99                             CpsDataUpdatedEvent.Schema.URN_CPS_ORG_ONAP_CPS_DATA_UPDATED_EVENT_SCHEMA_1_1_0_SNAPSHOT
100                                     .value()));
101         }
102         // Id
103         if (!StringUtils.hasText(cpsDataUpdatedEvent.getId())) {
104             invalidEventEnvelopException.addInvalidField(
105                     new InvalidEventEnvelopException.InvalidField(
106                             MISSING, "id", null, null));
107         }
108         // Source
109         if (cpsDataUpdatedEvent.getSource() == null || !cpsDataUpdatedEvent.getSource().equals(EVENT_SOURCE)) {
110             invalidEventEnvelopException.addInvalidField(
111                     new InvalidEventEnvelopException.InvalidField(
112                             UNEXPECTED, "source",
113                             cpsDataUpdatedEvent.getSource() != null
114                                     ? cpsDataUpdatedEvent.getSource().toString() : null, EVENT_SOURCE.toString()));
115         }
116         // Type
117         if (cpsDataUpdatedEvent.getType() == null || !cpsDataUpdatedEvent.getType().equals(EVENT_TYPE)) {
118             invalidEventEnvelopException.addInvalidField(
119                     new InvalidEventEnvelopException.InvalidField(
120                             UNEXPECTED, "type", cpsDataUpdatedEvent.getType(), EVENT_TYPE));
121         }
122
123         if (invalidEventEnvelopException.hasInvalidFields()) {
124             throw invalidEventEnvelopException;
125         }
126
127     }
128
129 }