Merge "Add DCAE MOD design tool project"
[dcaegen2/platform.git] / mod / designtool / designtool-web / src / main / java / org / apache / nifi / NiFi.java
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * Modifications to the original nifi code for the ONAP project are made
18  * available under the Apache License, Version 2.0
19  */
20 package org.apache.nifi;
21
22 import org.apache.nifi.bundle.Bundle;
23 import org.apache.nifi.nar.ExtensionMapping;
24 import org.apache.nifi.nar.NarClassLoaders;
25 import org.apache.nifi.nar.NarClassLoadersHolder;
26 import org.apache.nifi.nar.NarUnpacker;
27 import org.apache.nifi.nar.SystemBundle;
28 import org.apache.nifi.util.FileUtils;
29 import org.apache.nifi.util.NiFiProperties;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32 import org.slf4j.bridge.SLF4JBridgeHandler;
33
34 import java.io.File;
35 import java.io.FileWriter;
36 import java.io.IOException;
37 import java.lang.Thread.UncaughtExceptionHandler;
38 import java.lang.reflect.Constructor;
39 import java.lang.reflect.InvocationTargetException;
40 import java.lang.reflect.Method;
41 import java.net.MalformedURLException;
42 import java.net.URL;
43 import java.net.URLClassLoader;
44 import java.nio.charset.StandardCharsets;
45 import java.nio.file.Files;
46 import java.nio.file.Paths;
47 import java.util.ArrayList;
48 import java.util.Arrays;
49 import java.util.List;
50 import java.util.Set;
51 import java.util.Random;
52 import java.util.Timer;
53 import java.util.TimerTask;
54 import java.util.concurrent.Executors;
55 import java.util.concurrent.ScheduledExecutorService;
56 import java.util.concurrent.ScheduledFuture;
57 import java.util.concurrent.ThreadFactory;
58 import java.util.concurrent.TimeUnit;
59 import java.util.concurrent.atomic.AtomicInteger;
60 import java.util.concurrent.atomic.AtomicLong;
61
62 public class NiFi {
63
64     private static final Logger LOGGER = LoggerFactory.getLogger(NiFi.class);
65     private static final String KEY_FILE_FLAG = "-K";
66     private final NiFiServer nifiServer;
67     private final BootstrapListener bootstrapListener;
68
69     public static final String BOOTSTRAP_PORT_PROPERTY = "nifi.bootstrap.listen.port";
70     private volatile boolean shutdown = false;
71
72     public NiFi(final NiFiProperties properties)
73             throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
74
75         this(properties, ClassLoader.getSystemClassLoader());
76
77     }
78
79     public NiFi(final NiFiProperties properties, ClassLoader rootClassLoader)
80             throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
81
82         // There can only be one krb5.conf for the overall Java process so set this globally during
83         // start up so that processors and our Kerberos authentication code don't have to set this
84         final File kerberosConfigFile = properties.getKerberosConfigurationFile();
85         if (kerberosConfigFile != null) {
86             final String kerberosConfigFilePath = kerberosConfigFile.getAbsolutePath();
87             LOGGER.info("Setting java.security.krb5.conf to {}", new Object[]{kerberosConfigFilePath});
88             System.setProperty("java.security.krb5.conf", kerberosConfigFilePath);
89         }
90
91         setDefaultUncaughtExceptionHandler();
92
93         // register the shutdown hook
94         addShutdownHook();
95
96         final String bootstrapPort = System.getProperty(BOOTSTRAP_PORT_PROPERTY);
97         if (bootstrapPort != null) {
98             try {
99                 final int port = Integer.parseInt(bootstrapPort);
100
101                 if (port < 1 || port > 65535) {
102                     throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535");
103                 }
104
105                 bootstrapListener = new BootstrapListener(this, port);
106                 bootstrapListener.start();
107             } catch (final NumberFormatException nfe) {
108                 throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535");
109             }
110         } else {
111             LOGGER.info("NiFi started without Bootstrap Port information provided; will not listen for requests from Bootstrap");
112             bootstrapListener = null;
113         }
114
115         // delete the web working dir - if the application does not start successfully
116         // the web app directories might be in an invalid state. when this happens
117         // jetty will not attempt to re-extract the war into the directory. by removing
118         // the working directory, we can be assured that it will attempt to extract the
119         // war every time the application starts.
120         File webWorkingDir = properties.getWebWorkingDirectory();
121         FileUtils.deleteFilesInDirectory(webWorkingDir, null, LOGGER, true, true);
122         FileUtils.deleteFile(webWorkingDir, LOGGER, 3);
123
124         detectTimingIssues();
125
126         // redirect JUL log events
127         initLogging();
128
129         final Bundle systemBundle = SystemBundle.create(properties);
130
131         // expand the nars
132         final ExtensionMapping extensionMapping = NarUnpacker.unpackNars(properties, systemBundle);
133
134         // load the extensions classloaders
135         NarClassLoaders narClassLoaders = NarClassLoadersHolder.getInstance();
136
137         narClassLoaders.init(rootClassLoader,
138                 properties.getFrameworkWorkingDirectory(), properties.getExtensionsWorkingDirectory());
139
140         // load the framework classloader
141         final ClassLoader frameworkClassLoader = narClassLoaders.getFrameworkBundle().getClassLoader();
142         if (frameworkClassLoader == null) {
143             throw new IllegalStateException("Unable to find the framework NAR ClassLoader.");
144         }
145
146         final Set<Bundle> narBundles = narClassLoaders.getBundles();
147
148         // load the server from the framework classloader
149         Thread.currentThread().setContextClassLoader(frameworkClassLoader);
150         Class<?> jettyServer = Class.forName("org.apache.nifi.web.server.JettyServer", true, frameworkClassLoader);
151         Constructor<?> jettyConstructor = jettyServer.getConstructor(NiFiProperties.class, Set.class);
152
153         final long startTime = System.nanoTime();
154         nifiServer = (NiFiServer) jettyConstructor.newInstance(properties, narBundles);
155         nifiServer.setExtensionMapping(extensionMapping);
156         nifiServer.setBundles(systemBundle, narBundles);
157
158         if (shutdown) {
159             LOGGER.info("NiFi has been shutdown via NiFi Bootstrap. Will not start Controller");
160         } else {
161             nifiServer.start();
162
163             if (bootstrapListener != null) {
164                 bootstrapListener.sendStartedStatus(true);
165             }
166
167             final long duration = System.nanoTime() - startTime;
168             LOGGER.info("Controller initialization took " + duration + " nanoseconds "
169                     + "(" + (int) TimeUnit.SECONDS.convert(duration, TimeUnit.NANOSECONDS) + " seconds).");
170         }
171     }
172
173     protected void setDefaultUncaughtExceptionHandler() {
174         Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
175             @Override
176             public void uncaughtException(final Thread t, final Throwable e) {
177                 LOGGER.error("An Unknown Error Occurred in Thread {}: {}", t, e.toString());
178                 LOGGER.error("", e);
179             }
180         });
181     }
182
183     protected void addShutdownHook() {
184         Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
185             @Override
186             public void run() {
187                 // shutdown the jetty server
188                 shutdownHook();
189             }
190         }));
191     }
192
193     protected void initLogging() {
194         SLF4JBridgeHandler.removeHandlersForRootLogger();
195         SLF4JBridgeHandler.install();
196     }
197
198     private static ClassLoader createBootstrapClassLoader() {
199         //Get list of files in bootstrap folder
200         final List<URL> urls = new ArrayList<>();
201         try {
202             Files.list(Paths.get("lib/bootstrap")).forEach(p -> {
203                 try {
204                     urls.add(p.toUri().toURL());
205                 } catch (final MalformedURLException mef) {
206                     LOGGER.warn("Unable to load " + p.getFileName() + " due to " + mef, mef);
207                 }
208             });
209         } catch (IOException ioe) {
210             LOGGER.warn("Unable to access lib/bootstrap to create bootstrap classloader", ioe);
211         }
212         //Create the bootstrap classloader
213         return new URLClassLoader(urls.toArray(new URL[0]), Thread.currentThread().getContextClassLoader());
214     }
215
216     protected void shutdownHook() {
217         try {
218             shutdown();
219         } catch (final Throwable t) {
220             LOGGER.warn("Problem occurred ensuring Jetty web server was properly terminated due to " + t);
221         }
222     }
223
224     protected void shutdown() {
225         this.shutdown = true;
226
227         LOGGER.info("Initiating shutdown of Jetty web server...");
228         if (nifiServer != null) {
229             nifiServer.stop();
230         }
231         if (bootstrapListener != null) {
232             bootstrapListener.stop();
233         }
234         LOGGER.info("Jetty web server shutdown completed (nicely or otherwise).");
235     }
236
237     /**
238      * Determine if the machine we're running on has timing issues.
239      */
240     private void detectTimingIssues() {
241         final int minRequiredOccurrences = 25;
242         final int maxOccurrencesOutOfRange = 15;
243         final AtomicLong lastTriggerMillis = new AtomicLong(System.currentTimeMillis());
244
245         final ScheduledExecutorService service = Executors.newScheduledThreadPool(1, new ThreadFactory() {
246             private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
247
248             @Override
249             public Thread newThread(final Runnable r) {
250                 final Thread t = defaultFactory.newThread(r);
251                 t.setDaemon(true);
252                 t.setName("Detect Timing Issues");
253                 return t;
254             }
255         });
256
257         final AtomicInteger occurrencesOutOfRange = new AtomicInteger(0);
258         final AtomicInteger occurrences = new AtomicInteger(0);
259         final Runnable command = new Runnable() {
260             @Override
261             public void run() {
262                 final long curMillis = System.currentTimeMillis();
263                 final long difference = curMillis - lastTriggerMillis.get();
264                 final long millisOff = Math.abs(difference - 2000L);
265                 occurrences.incrementAndGet();
266                 if (millisOff > 500L) {
267                     occurrencesOutOfRange.incrementAndGet();
268                 }
269                 lastTriggerMillis.set(curMillis);
270             }
271         };
272
273         final ScheduledFuture<?> future = service.scheduleWithFixedDelay(command, 2000L, 2000L, TimeUnit.MILLISECONDS);
274
275         final TimerTask timerTask = new TimerTask() {
276             @Override
277             public void run() {
278                 future.cancel(true);
279                 service.shutdownNow();
280
281                 if (occurrences.get() < minRequiredOccurrences || occurrencesOutOfRange.get() > maxOccurrencesOutOfRange) {
282                     LOGGER.warn("NiFi has detected that this box is not responding within the expected timing interval, which may cause "
283                             + "Processors to be scheduled erratically. Please see the NiFi documentation for more information.");
284                 }
285             }
286         };
287         final Timer timer = new Timer(true);
288         timer.schedule(timerTask, 60000L);
289     }
290
291     /**
292      * Main entry point of the application.
293      *
294      * @param args things which are ignored
295      */
296     public static void main(String[] args) {
297         LOGGER.info("Launching NiFi...");
298         try {
299             NiFiProperties properties = convertArgumentsToValidatedNiFiProperties(args);
300             new NiFi(properties);
301         } catch (final Throwable t) {
302             LOGGER.error("Failure to launch NiFi due to " + t, t);
303         }
304     }
305
306     protected static NiFiProperties convertArgumentsToValidatedNiFiProperties(String[] args) {
307         final ClassLoader bootstrap = createBootstrapClassLoader();
308         NiFiProperties properties = initializeProperties(args, bootstrap);
309         properties.validate();
310         return properties;
311     }
312
313     private static NiFiProperties initializeProperties(final String[] args, final ClassLoader boostrapLoader) {
314         // Try to get key
315         // If key doesn't exist, instantiate without
316         // Load properties
317         // If properties are protected and key missing, throw RuntimeException
318
319         final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
320         final String key;
321         try {
322             key = loadFormattedKey(args);
323             // The key might be empty or null when it is passed to the loader
324         } catch (IllegalArgumentException e) {
325             final String msg = "The bootstrap process did not provide a valid key";
326             throw new IllegalArgumentException(msg, e);
327         }
328         Thread.currentThread().setContextClassLoader(boostrapLoader);
329
330         try {
331             final Class<?> propsLoaderClass = Class.forName("org.apache.nifi.properties.NiFiPropertiesLoader", true, boostrapLoader);
332             final Method withKeyMethod = propsLoaderClass.getMethod("withKey", String.class);
333             final Object loaderInstance = withKeyMethod.invoke(null, key);
334             final Method getMethod = propsLoaderClass.getMethod("get");
335             final NiFiProperties properties = (NiFiProperties) getMethod.invoke(loaderInstance);
336             LOGGER.info("Loaded {} properties", properties.size());
337             return properties;
338         } catch (InvocationTargetException wrappedException) {
339             final String msg = "There was an issue decrypting protected properties";
340             throw new IllegalArgumentException(msg, wrappedException.getCause() == null ? wrappedException : wrappedException.getCause());
341         } catch (final IllegalAccessException | NoSuchMethodException | ClassNotFoundException reex) {
342             final String msg = "Unable to access properties loader in the expected manner - apparent classpath or build issue";
343             throw new IllegalArgumentException(msg, reex);
344         } catch (final RuntimeException e) {
345             final String msg = "There was an issue decrypting protected properties";
346             throw new IllegalArgumentException(msg, e);
347         } finally {
348             Thread.currentThread().setContextClassLoader(contextClassLoader);
349         }
350     }
351
352     private static String loadFormattedKey(String[] args) {
353         String key = null;
354         List<String> parsedArgs = parseArgs(args);
355         // Check if args contain protection key
356         if (parsedArgs.contains(KEY_FILE_FLAG)) {
357             key = getKeyFromKeyFileAndPrune(parsedArgs);
358             // Format the key (check hex validity and remove spaces)
359             key = formatHexKey(key);
360
361         }
362
363         if (null == key) {
364             return "";
365         } else if (!isHexKeyValid(key)) {
366             throw new IllegalArgumentException("The key was not provided in valid hex format and of the correct length");
367         } else {
368             return key;
369         }
370     }
371
372     private static String getKeyFromKeyFileAndPrune(List<String> parsedArgs) {
373         String key = null;
374         LOGGER.debug("The bootstrap process provided the " + KEY_FILE_FLAG + " flag");
375         int i = parsedArgs.indexOf(KEY_FILE_FLAG);
376         if (parsedArgs.size() <= i + 1) {
377             LOGGER.error("The bootstrap process passed the {} flag without a filename", KEY_FILE_FLAG);
378             throw new IllegalArgumentException("The bootstrap process provided the " + KEY_FILE_FLAG + " flag but no key");
379         }
380         try {
381           String passwordfile_path = parsedArgs.get(i + 1);
382           // Slurp in the contents of the file:
383           byte[] encoded = Files.readAllBytes(Paths.get(passwordfile_path));
384           key = new String(encoded,StandardCharsets.UTF_8);
385           if (0 == key.length())
386             throw new IllegalArgumentException("Key in keyfile " + passwordfile_path + " yielded an empty key");
387
388           LOGGER.info("Now overwriting file in "+passwordfile_path);
389
390           // Overwrite the contents of the file (to avoid littering file system
391           // unlinked with key material):
392           File password_file = new File(passwordfile_path);
393           FileWriter overwriter = new FileWriter(password_file,false);
394
395           // Construe a random pad:
396           Random r = new Random();
397           StringBuffer sb = new StringBuffer();
398           // Note on correctness: this pad is longer, but equally sufficient.
399           while(sb.length() < encoded.length){
400             sb.append(Integer.toHexString(r.nextInt()));
401           }
402           String pad = sb.toString();
403           LOGGER.info("Overwriting key material with pad: "+pad);
404           overwriter.write(pad);
405           overwriter.close();
406
407           LOGGER.info("Removing/unlinking file: "+passwordfile_path);
408           password_file.delete();
409
410         } catch (IOException e) {
411           LOGGER.error("Caught IOException while retrieving the "+KEY_FILE_FLAG+"-passed keyfile; aborting: "+e.toString());
412           System.exit(1);
413         }
414
415         LOGGER.info("Read property protection key from key file provided by bootstrap process");
416         return key;
417     }
418
419     private static List<String> parseArgs(String[] args) {
420         List<String> parsedArgs = new ArrayList<>(Arrays.asList(args));
421         for (int i = 0; i < parsedArgs.size(); i++) {
422             if (parsedArgs.get(i).startsWith(KEY_FILE_FLAG + " ")) {
423                 String[] split = parsedArgs.get(i).split(" ", 2);
424                 parsedArgs.set(i, split[0]);
425                 parsedArgs.add(i + 1, split[1]);
426                 break;
427             }
428         }
429         return parsedArgs;
430     }
431
432     private static String formatHexKey(String input) {
433         if (input == null || input.trim().isEmpty()) {
434             return "";
435         }
436         return input.replaceAll("[^0-9a-fA-F]", "").toLowerCase();
437     }
438
439     private static boolean isHexKeyValid(String key) {
440         if (key == null || key.trim().isEmpty()) {
441             return false;
442         }
443         // Key length is in "nibbles" (i.e. one hex char = 4 bits)
444         return Arrays.asList(128, 196, 256).contains(key.length() * 4) && key.matches("^[0-9a-fA-F]*$");
445     }
446 }