2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019-2020 Nordix Foundation.
4 * Copyright (C) 2021 Nokia.
5 * Copyright (C) 2021 Samsung Electronics.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 * SPDX-License-Identifier: Apache-2.0
20 * ============LICENSE_END=========================================================
23 package org.onap.dcaegen2.services.pmmapper;
25 import ch.qos.logback.classic.util.ContextInitializer;
26 import io.undertow.Undertow;
27 import io.undertow.server.RoutingHandler;
28 import io.undertow.util.StatusCodes;
30 import java.util.Arrays;
32 import lombok.NonNull;
33 import org.onap.dcaegen2.services.pmmapper.config.ConfigHandler;
34 import org.onap.dcaegen2.services.pmmapper.config.DynamicConfiguration;
35 import org.onap.dcaegen2.services.pmmapper.config.EnvironmentReader;
36 import org.onap.dcaegen2.services.pmmapper.config.FilesProcessingConfig;
37 import org.onap.dcaegen2.services.pmmapper.datarouter.DeliveryHandler;
38 import org.onap.dcaegen2.services.pmmapper.exceptions.CBSServerError;
39 import org.onap.dcaegen2.services.pmmapper.exceptions.EnvironmentConfigException;
40 import org.onap.dcaegen2.services.pmmapper.exceptions.MapperConfigException;
41 import org.onap.dcaegen2.services.pmmapper.exceptions.ProcessEventException;
42 import org.onap.dcaegen2.services.pmmapper.filtering.MetadataFilter;
43 import org.onap.dcaegen2.services.pmmapper.filtering.MeasFilterHandler;
44 import org.onap.dcaegen2.services.pmmapper.mapping.Mapper;
45 import org.onap.dcaegen2.services.pmmapper.messagerouter.VESPublisher;
46 import org.onap.dcaegen2.services.pmmapper.model.Event;
47 import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
48 import org.onap.dcaegen2.services.pmmapper.healthcheck.HealthCheckHandler;
49 import org.onap.dcaegen2.services.pmmapper.model.ServerResource;
50 import org.onap.dcaegen2.services.pmmapper.ssl.SSLContextFactory;
51 import org.onap.dcaegen2.services.pmmapper.utils.DataRouterUtils;
52 import org.onap.dcaegen2.services.pmmapper.utils.IncomingEventsCache;
53 import org.onap.dcaegen2.services.pmmapper.utils.MeasConverter;
54 import org.onap.dcaegen2.services.pmmapper.utils.MeasSplitter;
55 import org.onap.dcaegen2.services.pmmapper.utils.XMLValidator;
56 import org.onap.logging.ref.slf4j.ONAPLogAdapter;
57 import org.slf4j.LoggerFactory;
59 import reactor.core.publisher.Flux;
60 import reactor.core.publisher.FluxSink;
61 import reactor.core.scheduler.Scheduler;
62 import reactor.core.scheduler.Schedulers;
64 import javax.net.ssl.SSLContext;
65 import java.io.IOException;
66 import java.nio.file.Path;
67 import java.nio.file.Paths;
68 import java.util.List;
69 import java.util.concurrent.TimeUnit;
74 private static final int PREFETCH_ONE_PER_THREAD = 1;
77 System.setProperty(ContextInitializer.CONFIG_FILE_PROPERTY, "/opt/app/pm-mapper/etc/logback.xml");
80 private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(App.class));
81 private static final int HTTP_PORT = 8081;
82 private static final int HTTPS_PORT = 8443;
83 private static final int INITIAL_RECONFIGURATION_PERIOD = 60;
84 private static final int RECONFIGURATION_PERIOD = 60;
85 private static final IncomingEventsCache eventsCache = IncomingEventsCache.INSTANCE;
86 private static Path templates = Paths.get("/opt/app/pm-mapper/etc/templates/");
87 private static Path schemas = Paths.get("/opt/app/pm-mapper/etc/schemas/");
89 private final FilesProcessingConfig processingConfig;
91 private MapperConfig mapperConfig;
92 private MetadataFilter metadataFilter;
93 private MeasConverter measConverter;
94 private MeasFilterHandler filterHandler;
95 private Mapper mapper;
96 private MeasSplitter splitter;
97 private XMLValidator validator;
98 private VESPublisher vesPublisher;
99 private DeliveryHandler deliveryHandler;
100 private DynamicConfiguration dynamicConfiguration;
101 private HealthCheckHandler healthCheckHandler;
102 private int httpPort;
103 private int httpsPort;
105 private Undertow applicationServer;
106 private List<ServerResource> serverResources;
107 private Flux<Event> flux;
108 private FluxSink<Event> fluxSink;
109 private Scheduler configScheduler;
112 * Creates an instance of the application.
113 * @param templatesDirectory path to directory containing templates used for mapping.
114 * @param schemasDirectory path to directory containing schemas used to verify incoming XML will work with templates.
115 * @param httpPort http port to start http server on.
116 * @param httpsPort https port to start https server on.
117 * @param configHandler instance of the ConfigurationHandler used to acquire config.
119 public App(Path templatesDirectory, Path schemasDirectory, int httpPort, int httpsPort, ConfigHandler configHandler, FilesProcessingConfig filesProcessingConfig)
120 throws EnvironmentConfigException {
122 this.mapperConfig = configHandler.getMapperConfig();
123 } catch (EnvironmentConfigException | CBSServerError | MapperConfigException e) {
124 logger.unwrap().error("Failed to acquire initial configuration, Application cannot start", e);
125 throw new IllegalStateException("Config acquisition failed");
127 this.processingConfig = filesProcessingConfig;
129 this.httpPort = httpPort;
130 this.httpsPort = httpsPort;
131 this.metadataFilter = new MetadataFilter(mapperConfig);
132 this.measConverter = new MeasConverter();
133 this.filterHandler = new MeasFilterHandler(measConverter);
134 this.mapper = new Mapper(templatesDirectory, this.measConverter);
135 this.splitter = new MeasSplitter(measConverter);
136 this.validator = new XMLValidator(schemasDirectory);
137 this.vesPublisher = new VESPublisher(mapperConfig);
138 this.flux = Flux.create(eventFluxSink -> this.fluxSink = eventFluxSink);
139 this.configScheduler = Schedulers.newSingle("Config");
141 int processingThreads = processingConfig.getThreadsCount();
143 this.flux.onBackpressureDrop(App::handleBackPressure)
144 .doOnNext(App::receiveRequest)
145 .filter(event -> !isCached(event.getPublishIdentity()))
146 .doOnNext(App::addToCache)
147 .limitRate(processingConfig.getLimitRate())
148 .parallel(processingThreads)
149 .runOn(Schedulers.newParallel("Thread", processingThreads), PREFETCH_ONE_PER_THREAD)
150 .doOnNext(event -> MDC.setContextMap(event.getMdc()))
151 .filter(this.metadataFilter::filter)
152 .filter(event -> App.filterByFileType(this.filterHandler, event, this.mapperConfig))
153 .filter(event -> App.validate(this.validator, event, this.mapperConfig))
154 .concatMap(event -> App.split(this.splitter,event, this.mapperConfig))
155 .filter(events -> App.filter(this.filterHandler, events, this.mapperConfig))
156 .concatMap(events -> App.map(this.mapper, events, this.mapperConfig))
157 .concatMap(this.vesPublisher::publish)
158 .subscribe(event -> App.sendEventProcessed(this.mapperConfig, event));
160 this.configScheduler.schedulePeriodically(this::reconfigure, INITIAL_RECONFIGURATION_PERIOD, RECONFIGURATION_PERIOD, TimeUnit.SECONDS);
161 this.healthCheckHandler = new HealthCheckHandler();
162 this.deliveryHandler = new DeliveryHandler(fluxSink::next);
163 this.dynamicConfiguration = new DynamicConfiguration(Arrays.asList(mapperConfig), mapperConfig);
164 this.serverResources = Arrays.asList(healthCheckHandler, deliveryHandler, dynamicConfiguration);
166 this.applicationServer = server(this.mapperConfig, this.serverResources);
167 } catch (IOException e) {
168 logger.unwrap().error("Failed to create server instance.", e);
169 throw new IllegalStateException("Server instantiation failed");
174 * Starts the application server.
176 public void start() {
177 this.applicationServer.start();
178 this.configScheduler.start();
182 * Stops the application server.
185 this.applicationServer.stop();
186 this.configScheduler.dispose();
189 private Undertow server(MapperConfig config, List<ServerResource> serverResources) throws IOException {
190 SSLContextFactory sslContextFactory = new SSLContextFactory(config);
191 SSLContext sslContext = sslContextFactory.createSSLContext(config);
192 SSLContext.setDefault(sslContext);
193 Undertow.Builder builder = Undertow.builder();
194 if (config.getEnableHttp()) {
195 builder.addHttpListener(this.httpPort, "0.0.0.0");
197 RoutingHandler routes = new RoutingHandler();
198 serverResources.forEach(handler -> routes.add(handler.getHTTPMethod(), handler.getEndpointTemplate(), handler.getHandler()));
199 return builder.addHttpsListener(this.httpsPort, "0.0.0.0", sslContext)
204 private void reconfigure() {
206 this.dynamicConfiguration.reconfigure();
207 } catch (Exception e) {
208 logger.unwrap().error("Failed to reconfigure service.", e);
212 private boolean isCached(String id) {
213 boolean isPresent = eventsCache.contains(id);
215 logger.unwrap().info("Skipping. This event is already waiting in cache to be processed: {}", id);
220 public static void main(String[] args) throws EnvironmentConfigException {
221 FilesProcessingConfig processingConfig = new FilesProcessingConfig(new EnvironmentReader());
222 new App(templates, schemas, HTTP_PORT, HTTPS_PORT, new ConfigHandler(), processingConfig).start();
225 public static boolean filterByFileType(MeasFilterHandler filterHandler,Event event, MapperConfig config) {
226 boolean hasValidFileName = false;
228 hasValidFileName = filterHandler.filterByFileType(event);
229 if(!hasValidFileName) {
230 sendEventProcessed(config,event);
232 } catch (Exception exception) {
233 logger.unwrap().error("Unable to filter by file type", exception);
234 sendEventProcessed(config,event);
236 return hasValidFileName;
239 public static boolean validate(XMLValidator validator, Event event, MapperConfig config) {
240 boolean isValidXML = false;
242 isValidXML = validator.validate(event);
244 sendEventProcessed(config,event);
246 } catch (Exception exception) {
247 logger.unwrap().error("Unable to validate XML",exception);
248 sendEventProcessed(config,event);
253 public static boolean filter(MeasFilterHandler filterHandler, List<Event> events, MapperConfig config) {
254 Event event = events.get(0);
255 boolean hasMatchingFilter = false;
257 hasMatchingFilter = filterHandler.filterByMeasType(events);
258 if(!hasMatchingFilter) {
259 logger.unwrap().info("No filter match from all measurement files.");
260 sendEventProcessed(config,event);
262 } catch (Exception exception) {
263 logger.unwrap().error("Unable to filter by Meas Types",exception);
264 sendEventProcessed(config,event);
266 return hasMatchingFilter;
269 public static Flux<List<Event>> map(Mapper mapper, List<Event> events, MapperConfig config) {
270 List<Event> mappedEvents;
272 mappedEvents = mapper.mapEvents(events);
273 } catch (Exception exception) {
274 logger.unwrap().error("Unable to map XML to VES",exception);
275 sendEventProcessed(config,events.get(0));
278 return Flux.just(mappedEvents);
281 public static Flux<List<Event>> split(MeasSplitter splitter, Event event, MapperConfig config) {
282 List<Event> splitEvents;
284 splitEvents = splitter.split(event);
285 } catch (Exception exception) {
286 logger.unwrap().error("Unable to split MeasCollecFile",exception);
287 sendEventProcessed(config,event);
290 return Flux.just(splitEvents);
293 public static void sendEventProcessed(MapperConfig config, Event event) {
295 DataRouterUtils.processEvent(config, event);
296 eventsCache.remove(event.getPublishIdentity());
297 } catch (ProcessEventException exception) {
298 logger.unwrap().error("Process event failure", exception);
303 * Takes the exchange from an event, responds with a 429 and un-dispatches the exchange.
304 * @param event to be ignored.
306 public static void handleBackPressure(@NonNull Event event) {
307 logger.unwrap().debug("Event will not be processed, responding with 429");
308 event.getHttpServerExchange()
309 .setStatusCode(StatusCodes.TOO_MANY_REQUESTS)
311 .send(StatusCodes.TOO_MANY_REQUESTS_STRING);
312 event.getHttpServerExchange()
317 * Adds received event to cache
318 * @param event to be cached.
320 public static void addToCache(@NonNull Event event) {
321 eventsCache.add(event.getPublishIdentity());
325 * Takes the exchange from an event, responds with a 200 and un-dispatches the exchange.
326 * @param event to be received.
328 public static void receiveRequest(@NonNull Event event) {
329 logger.unwrap().debug("Event will be processed, responding with 200");
330 event.getHttpServerExchange()
331 .setStatusCode(StatusCodes.OK)
333 .send(StatusCodes.OK_STRING);
334 event.getHttpServerExchange()