9cfddf0b0f0a1e7a654ee0ae3c419817c39ad87c
[dcaegen2/services/pm-mapper.git] / src / main / java / org / onap / dcaegen2 / services / pmmapper / App.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2019-2020 Nordix Foundation.
4  *  Copyright (C) 2021 Nokia.
5  *  Copyright (C) 2021 Samsung Electronics.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  * SPDX-License-Identifier: Apache-2.0
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.dcaegen2.services.pmmapper;
24
25 import ch.qos.logback.classic.util.ContextInitializer;
26 import io.undertow.Undertow;
27 import io.undertow.server.RoutingHandler;
28 import io.undertow.util.StatusCodes;
29
30 import java.util.Arrays;
31 import lombok.Data;
32 import lombok.NonNull;
33 import org.onap.dcaegen2.services.pmmapper.config.ConfigHandler;
34 import org.onap.dcaegen2.services.pmmapper.config.DynamicConfiguration;
35 import org.onap.dcaegen2.services.pmmapper.config.EnvironmentReader;
36 import org.onap.dcaegen2.services.pmmapper.config.FilesProcessingConfig;
37 import org.onap.dcaegen2.services.pmmapper.datarouter.DeliveryHandler;
38 import org.onap.dcaegen2.services.pmmapper.exceptions.CBSServerError;
39 import org.onap.dcaegen2.services.pmmapper.exceptions.EnvironmentConfigException;
40 import org.onap.dcaegen2.services.pmmapper.exceptions.MapperConfigException;
41 import org.onap.dcaegen2.services.pmmapper.exceptions.ProcessEventException;
42 import org.onap.dcaegen2.services.pmmapper.filtering.MetadataFilter;
43 import org.onap.dcaegen2.services.pmmapper.filtering.MeasFilterHandler;
44 import org.onap.dcaegen2.services.pmmapper.mapping.Mapper;
45 import org.onap.dcaegen2.services.pmmapper.messagerouter.VESPublisher;
46 import org.onap.dcaegen2.services.pmmapper.model.Event;
47 import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
48 import org.onap.dcaegen2.services.pmmapper.healthcheck.HealthCheckHandler;
49 import org.onap.dcaegen2.services.pmmapper.model.ServerResource;
50 import org.onap.dcaegen2.services.pmmapper.ssl.SSLContextFactory;
51 import org.onap.dcaegen2.services.pmmapper.utils.DataRouterUtils;
52 import org.onap.dcaegen2.services.pmmapper.utils.IncomingEventsCache;
53 import org.onap.dcaegen2.services.pmmapper.utils.MeasConverter;
54 import org.onap.dcaegen2.services.pmmapper.utils.MeasSplitter;
55 import org.onap.dcaegen2.services.pmmapper.utils.XMLValidator;
56 import org.onap.logging.ref.slf4j.ONAPLogAdapter;
57 import org.slf4j.LoggerFactory;
58 import org.slf4j.MDC;
59 import reactor.core.publisher.Flux;
60 import reactor.core.publisher.FluxSink;
61 import reactor.core.scheduler.Scheduler;
62 import reactor.core.scheduler.Schedulers;
63
64 import javax.net.ssl.SSLContext;
65 import java.io.IOException;
66 import java.nio.file.Path;
67 import java.nio.file.Paths;
68 import java.util.List;
69 import java.util.concurrent.TimeUnit;
70
71 @Data
72 public class App {
73
74     private static final int PREFETCH_ONE_PER_THREAD = 1;
75
76     static {
77         System.setProperty(ContextInitializer.CONFIG_FILE_PROPERTY, "/opt/app/pm-mapper/etc/logback.xml");
78     }
79
80     private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(App.class));
81     private static final int HTTP_PORT = 8081;
82     private static final int HTTPS_PORT = 8443;
83     private static final int INITIAL_RECONFIGURATION_PERIOD = 60;
84     private static final int RECONFIGURATION_PERIOD = 60;
85     private static final IncomingEventsCache eventsCache = IncomingEventsCache.INSTANCE;
86     private static Path templates = Paths.get("/opt/app/pm-mapper/etc/templates/");
87     private static Path schemas = Paths.get("/opt/app/pm-mapper/etc/schemas/");
88
89     private final FilesProcessingConfig processingConfig;
90
91     private MapperConfig mapperConfig;
92     private MetadataFilter metadataFilter;
93     private MeasConverter measConverter;
94     private MeasFilterHandler filterHandler;
95     private Mapper mapper;
96     private MeasSplitter splitter;
97     private XMLValidator validator;
98     private VESPublisher vesPublisher;
99     private DeliveryHandler deliveryHandler;
100     private DynamicConfiguration dynamicConfiguration;
101     private HealthCheckHandler healthCheckHandler;
102     private int httpPort;
103     private int httpsPort;
104
105     private Undertow applicationServer;
106     private List<ServerResource> serverResources;
107     private Flux<Event> flux;
108     private FluxSink<Event> fluxSink;
109     private Scheduler configScheduler;
110
111     /**
112      * Creates an instance of the application.
113      * @param templatesDirectory path to directory containing templates used for mapping.
114      * @param schemasDirectory path to directory containing schemas used to verify incoming XML will work with templates.
115      * @param httpPort http port to start http server on.
116      * @param httpsPort https port to start https server on.
117      * @param configHandler instance of the ConfigurationHandler used to acquire config.
118      */
119     public App(Path templatesDirectory, Path schemasDirectory, int httpPort, int httpsPort, ConfigHandler configHandler, FilesProcessingConfig filesProcessingConfig)
120         throws EnvironmentConfigException {
121         try {
122             this.mapperConfig = configHandler.getMapperConfig();
123         } catch (EnvironmentConfigException | CBSServerError | MapperConfigException e) {
124             logger.unwrap().error("Failed to acquire initial configuration, Application cannot start", e);
125             throw new IllegalStateException("Config acquisition failed");
126         }
127         this.processingConfig = filesProcessingConfig;
128
129         this.httpPort = httpPort;
130         this.httpsPort = httpsPort;
131         this.metadataFilter = new MetadataFilter(mapperConfig);
132         this.measConverter = new MeasConverter();
133         this.filterHandler = new MeasFilterHandler(measConverter);
134         this.mapper = new Mapper(templatesDirectory, this.measConverter);
135         this.splitter =  new MeasSplitter(measConverter);
136         this.validator = new XMLValidator(schemasDirectory);
137         this.vesPublisher = new VESPublisher(mapperConfig);
138         this.flux = Flux.create(eventFluxSink -> this.fluxSink = eventFluxSink);
139         this.configScheduler = Schedulers.newSingle("Config");
140
141         int processingThreads = processingConfig.getThreadsCount();
142
143         this.flux.onBackpressureDrop(App::handleBackPressure)
144                 .doOnNext(App::receiveRequest)
145                 .filter(event -> !isCached(event.getPublishIdentity()))
146                 .doOnNext(App::addToCache)
147                 .limitRate(processingConfig.getLimitRate())
148                 .parallel(processingThreads)
149                 .runOn(Schedulers.newParallel("Thread", processingThreads), PREFETCH_ONE_PER_THREAD)
150                 .doOnNext(event -> MDC.setContextMap(event.getMdc()))
151                 .filter(this.metadataFilter::filter)
152                 .filter(event -> App.filterByFileType(this.filterHandler, event, this.mapperConfig))
153                 .filter(event -> App.validate(this.validator, event, this.mapperConfig))
154                 .concatMap(event -> App.split(this.splitter,event, this.mapperConfig))
155                 .filter(events -> App.filter(this.filterHandler, events, this.mapperConfig))
156                 .concatMap(events -> App.map(this.mapper, events, this.mapperConfig))
157                 .concatMap(this.vesPublisher::publish)
158                 .subscribe(event -> App.sendEventProcessed(this.mapperConfig, event));
159
160         this.configScheduler.schedulePeriodically(this::reconfigure, INITIAL_RECONFIGURATION_PERIOD, RECONFIGURATION_PERIOD, TimeUnit.SECONDS);
161         this.healthCheckHandler = new HealthCheckHandler();
162         this.deliveryHandler = new DeliveryHandler(fluxSink::next);
163         this.dynamicConfiguration = new DynamicConfiguration(Arrays.asList(mapperConfig), mapperConfig);
164         this.serverResources = Arrays.asList(healthCheckHandler, deliveryHandler, dynamicConfiguration);
165         try {
166             this.applicationServer = server(this.mapperConfig, this.serverResources);
167         } catch (IOException e) {
168             logger.unwrap().error("Failed to create server instance.", e);
169             throw new IllegalStateException("Server instantiation failed");
170         }
171     }
172
173     /**
174      * Starts the application server.
175      */
176     public void start() {
177         this.applicationServer.start();
178         this.configScheduler.start();
179     }
180
181     /**
182      * Stops the application server.
183      */
184     public void stop() {
185         this.applicationServer.stop();
186         this.configScheduler.dispose();
187     }
188
189     private Undertow server(MapperConfig config, List<ServerResource> serverResources) throws IOException {
190         SSLContextFactory sslContextFactory = new SSLContextFactory(config);
191         SSLContext sslContext = sslContextFactory.createSSLContext(config);
192         SSLContext.setDefault(sslContext);
193         Undertow.Builder builder = Undertow.builder();
194         if (config.getEnableHttp()) {
195             builder.addHttpListener(this.httpPort, "0.0.0.0");
196         }
197         RoutingHandler routes = new RoutingHandler();
198         serverResources.forEach(handler -> routes.add(handler.getHTTPMethod(), handler.getEndpointTemplate(), handler.getHandler()));
199         return builder.addHttpsListener(this.httpsPort, "0.0.0.0", sslContext)
200                 .setHandler(routes)
201                 .build();
202     }
203
204     private void reconfigure() {
205         try {
206             this.dynamicConfiguration.reconfigure();
207         } catch (Exception e) {
208             logger.unwrap().error("Failed to reconfigure service.", e);
209         }
210     }
211
212     private boolean isCached(String id) {
213         boolean isPresent = eventsCache.contains(id);
214         if(isPresent) {
215             logger.unwrap().info("Skipping. This event is already waiting in cache to be processed: {}", id);
216         }
217         return isPresent;
218     }
219
220     public static void main(String[] args) throws EnvironmentConfigException {
221         FilesProcessingConfig processingConfig = new FilesProcessingConfig(new EnvironmentReader());
222         new App(templates, schemas, HTTP_PORT, HTTPS_PORT, new ConfigHandler(), processingConfig).start();
223     }
224
225     public static boolean filterByFileType(MeasFilterHandler filterHandler,Event event, MapperConfig config) {
226         boolean hasValidFileName = false;
227         try {
228             hasValidFileName = filterHandler.filterByFileType(event);
229             if(!hasValidFileName) {
230                 sendEventProcessed(config,event);
231             }
232         } catch (Exception exception) {
233             logger.unwrap().error("Unable to filter by file type", exception);
234             sendEventProcessed(config,event);
235         }
236         return hasValidFileName;
237     }
238
239     public static boolean validate(XMLValidator validator, Event event, MapperConfig config) {
240         boolean isValidXML = false;
241         try {
242             isValidXML = validator.validate(event);
243             if(!isValidXML) {
244                 sendEventProcessed(config,event);
245             }
246         } catch (Exception exception) {
247             logger.unwrap().error("Unable to validate XML",exception);
248             sendEventProcessed(config,event);
249         }
250         return isValidXML;
251     }
252
253     public static boolean filter(MeasFilterHandler filterHandler, List<Event> events, MapperConfig config) {
254         Event event = events.get(0);
255         boolean hasMatchingFilter = false;
256         try {
257             hasMatchingFilter = filterHandler.filterByMeasType(events);
258             if(!hasMatchingFilter) {
259                 logger.unwrap().info("No filter match from all measurement files.");
260                 sendEventProcessed(config,event);
261             }
262         } catch (Exception exception) {
263             logger.unwrap().error("Unable to filter by Meas Types",exception);
264             sendEventProcessed(config,event);
265         }
266         return hasMatchingFilter;
267     }
268
269     public static Flux<List<Event>> map(Mapper mapper, List<Event> events, MapperConfig config) {
270         List<Event> mappedEvents;
271         try {
272             mappedEvents = mapper.mapEvents(events);
273         } catch (Exception exception) {
274             logger.unwrap().error("Unable to map XML to VES",exception);
275             sendEventProcessed(config,events.get(0));
276             return Flux.empty();
277         }
278         return Flux.just(mappedEvents);
279     }
280
281     public static Flux<List<Event>> split(MeasSplitter splitter, Event event, MapperConfig config) {
282         List<Event> splitEvents;
283         try {
284             splitEvents = splitter.split(event);
285         } catch (Exception exception) {
286             logger.unwrap().error("Unable to split MeasCollecFile",exception);
287             sendEventProcessed(config,event);
288             return Flux.empty();
289         }
290         return Flux.just(splitEvents);
291     }
292
293     public static void sendEventProcessed(MapperConfig config, Event event) {
294       try {
295           DataRouterUtils.processEvent(config, event);
296           eventsCache.remove(event.getPublishIdentity());
297       } catch (ProcessEventException exception) {
298           logger.unwrap().error("Process event failure", exception);
299       }
300     }
301
302     /**
303      * Takes the exchange from an event, responds with a 429 and un-dispatches the exchange.
304      * @param event to be ignored.
305      */
306     public static void handleBackPressure(@NonNull Event event) {
307         logger.unwrap().debug("Event will not be processed, responding with 429");
308         event.getHttpServerExchange()
309                 .setStatusCode(StatusCodes.TOO_MANY_REQUESTS)
310                 .getResponseSender()
311                 .send(StatusCodes.TOO_MANY_REQUESTS_STRING);
312         event.getHttpServerExchange()
313                 .unDispatch();
314     }
315
316     /**
317      * Adds received event to cache
318      * @param event to be cached.
319      */
320     public static void addToCache(@NonNull Event event) {
321         eventsCache.add(event.getPublishIdentity());
322     }
323
324     /**
325      * Takes the exchange from an event, responds with a 200 and un-dispatches the exchange.
326      * @param event to be received.
327      */
328     public static void receiveRequest(@NonNull Event event) {
329         logger.unwrap().debug("Event will be processed, responding with 200");
330         event.getHttpServerExchange()
331                 .setStatusCode(StatusCodes.OK)
332                 .getResponseSender()
333                 .send(StatusCodes.OK_STRING);
334         event.getHttpServerExchange()
335                 .unDispatch();
336     }
337 }