2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * Modifications to the original nifi code for the ONAP project are made
18 * available under the Apache License, Version 2.0
20 package org.apache.nifi;
22 import org.apache.nifi.bundle.Bundle;
23 import org.apache.nifi.nar.ExtensionMapping;
24 import org.apache.nifi.nar.NarClassLoaders;
25 import org.apache.nifi.nar.NarClassLoadersHolder;
26 import org.apache.nifi.nar.NarUnpacker;
27 import org.apache.nifi.nar.SystemBundle;
28 import org.apache.nifi.util.FileUtils;
29 import org.apache.nifi.util.NiFiProperties;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32 import org.slf4j.bridge.SLF4JBridgeHandler;
35 import java.io.FileWriter;
36 import java.io.IOException;
37 import java.lang.Thread.UncaughtExceptionHandler;
38 import java.lang.reflect.Constructor;
39 import java.lang.reflect.InvocationTargetException;
40 import java.lang.reflect.Method;
41 import java.net.MalformedURLException;
43 import java.net.URLClassLoader;
44 import java.nio.charset.StandardCharsets;
45 import java.nio.file.Files;
46 import java.nio.file.Paths;
47 import java.util.ArrayList;
48 import java.util.Arrays;
49 import java.util.List;
51 import java.util.Random;
52 import java.util.Timer;
53 import java.util.TimerTask;
54 import java.util.concurrent.Executors;
55 import java.util.concurrent.ScheduledExecutorService;
56 import java.util.concurrent.ScheduledFuture;
57 import java.util.concurrent.ThreadFactory;
58 import java.util.concurrent.TimeUnit;
59 import java.util.concurrent.atomic.AtomicInteger;
60 import java.util.concurrent.atomic.AtomicLong;
64 private static final Logger LOGGER = LoggerFactory.getLogger(NiFi.class);
65 private static final String KEY_FILE_FLAG = "-K";
66 private final NiFiServer nifiServer;
67 private final BootstrapListener bootstrapListener;
69 public static final String BOOTSTRAP_PORT_PROPERTY = "nifi.bootstrap.listen.port";
70 private volatile boolean shutdown = false;
72 public NiFi(final NiFiProperties properties)
73 throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
75 this(properties, ClassLoader.getSystemClassLoader());
79 public NiFi(final NiFiProperties properties, ClassLoader rootClassLoader)
80 throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
82 // There can only be one krb5.conf for the overall Java process so set this globally during
83 // start up so that processors and our Kerberos authentication code don't have to set this
84 final File kerberosConfigFile = properties.getKerberosConfigurationFile();
85 if (kerberosConfigFile != null) {
86 final String kerberosConfigFilePath = kerberosConfigFile.getAbsolutePath();
87 LOGGER.info("Setting java.security.krb5.conf to {}", new Object[]{kerberosConfigFilePath});
88 System.setProperty("java.security.krb5.conf", kerberosConfigFilePath);
91 setDefaultUncaughtExceptionHandler();
93 // register the shutdown hook
96 final String bootstrapPort = System.getProperty(BOOTSTRAP_PORT_PROPERTY);
97 if (bootstrapPort != null) {
99 final int port = Integer.parseInt(bootstrapPort);
101 if (port < 1 || port > 65535) {
102 throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535");
105 bootstrapListener = new BootstrapListener(this, port);
106 bootstrapListener.start();
107 } catch (final NumberFormatException nfe) {
108 throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535");
111 LOGGER.info("NiFi started without Bootstrap Port information provided; will not listen for requests from Bootstrap");
112 bootstrapListener = null;
115 // delete the web working dir - if the application does not start successfully
116 // the web app directories might be in an invalid state. when this happens
117 // jetty will not attempt to re-extract the war into the directory. by removing
118 // the working directory, we can be assured that it will attempt to extract the
119 // war every time the application starts.
120 File webWorkingDir = properties.getWebWorkingDirectory();
121 FileUtils.deleteFilesInDirectory(webWorkingDir, null, LOGGER, true, true);
122 FileUtils.deleteFile(webWorkingDir, LOGGER, 3);
124 detectTimingIssues();
126 // redirect JUL log events
129 final Bundle systemBundle = SystemBundle.create(properties);
132 final ExtensionMapping extensionMapping = NarUnpacker.unpackNars(properties, systemBundle);
134 // load the extensions classloaders
135 NarClassLoaders narClassLoaders = NarClassLoadersHolder.getInstance();
137 narClassLoaders.init(rootClassLoader,
138 properties.getFrameworkWorkingDirectory(), properties.getExtensionsWorkingDirectory());
140 // load the framework classloader
141 final ClassLoader frameworkClassLoader = narClassLoaders.getFrameworkBundle().getClassLoader();
142 if (frameworkClassLoader == null) {
143 throw new IllegalStateException("Unable to find the framework NAR ClassLoader.");
146 final Set<Bundle> narBundles = narClassLoaders.getBundles();
148 // load the server from the framework classloader
149 Thread.currentThread().setContextClassLoader(frameworkClassLoader);
150 Class<?> jettyServer = Class.forName("org.apache.nifi.web.server.JettyServer", true, frameworkClassLoader);
151 Constructor<?> jettyConstructor = jettyServer.getConstructor(NiFiProperties.class, Set.class);
153 final long startTime = System.nanoTime();
154 nifiServer = (NiFiServer) jettyConstructor.newInstance(properties, narBundles);
155 nifiServer.setExtensionMapping(extensionMapping);
156 nifiServer.setBundles(systemBundle, narBundles);
159 LOGGER.info("NiFi has been shutdown via NiFi Bootstrap. Will not start Controller");
163 if (bootstrapListener != null) {
164 bootstrapListener.sendStartedStatus(true);
167 final long duration = System.nanoTime() - startTime;
168 LOGGER.info("Controller initialization took " + duration + " nanoseconds "
169 + "(" + (int) TimeUnit.SECONDS.convert(duration, TimeUnit.NANOSECONDS) + " seconds).");
173 protected void setDefaultUncaughtExceptionHandler() {
174 Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
176 public void uncaughtException(final Thread t, final Throwable e) {
177 LOGGER.error("An Unknown Error Occurred in Thread {}: {}", t, e.toString());
183 protected void addShutdownHook() {
184 Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
187 // shutdown the jetty server
193 protected void initLogging() {
194 SLF4JBridgeHandler.removeHandlersForRootLogger();
195 SLF4JBridgeHandler.install();
198 private static ClassLoader createBootstrapClassLoader() {
199 //Get list of files in bootstrap folder
200 final List<URL> urls = new ArrayList<>();
202 Files.list(Paths.get("lib/bootstrap")).forEach(p -> {
204 urls.add(p.toUri().toURL());
205 } catch (final MalformedURLException mef) {
206 LOGGER.warn("Unable to load " + p.getFileName() + " due to " + mef, mef);
209 } catch (IOException ioe) {
210 LOGGER.warn("Unable to access lib/bootstrap to create bootstrap classloader", ioe);
212 //Create the bootstrap classloader
213 return new URLClassLoader(urls.toArray(new URL[0]), Thread.currentThread().getContextClassLoader());
216 protected void shutdownHook() {
219 } catch (final Throwable t) {
220 LOGGER.warn("Problem occurred ensuring Jetty web server was properly terminated due to " + t);
224 protected void shutdown() {
225 this.shutdown = true;
227 LOGGER.info("Initiating shutdown of Jetty web server...");
228 if (nifiServer != null) {
231 if (bootstrapListener != null) {
232 bootstrapListener.stop();
234 LOGGER.info("Jetty web server shutdown completed (nicely or otherwise).");
238 * Determine if the machine we're running on has timing issues.
240 private void detectTimingIssues() {
241 final int minRequiredOccurrences = 25;
242 final int maxOccurrencesOutOfRange = 15;
243 final AtomicLong lastTriggerMillis = new AtomicLong(System.currentTimeMillis());
245 final ScheduledExecutorService service = Executors.newScheduledThreadPool(1, new ThreadFactory() {
246 private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
249 public Thread newThread(final Runnable r) {
250 final Thread t = defaultFactory.newThread(r);
252 t.setName("Detect Timing Issues");
257 final AtomicInteger occurrencesOutOfRange = new AtomicInteger(0);
258 final AtomicInteger occurrences = new AtomicInteger(0);
259 final Runnable command = new Runnable() {
262 final long curMillis = System.currentTimeMillis();
263 final long difference = curMillis - lastTriggerMillis.get();
264 final long millisOff = Math.abs(difference - 2000L);
265 occurrences.incrementAndGet();
266 if (millisOff > 500L) {
267 occurrencesOutOfRange.incrementAndGet();
269 lastTriggerMillis.set(curMillis);
273 final ScheduledFuture<?> future = service.scheduleWithFixedDelay(command, 2000L, 2000L, TimeUnit.MILLISECONDS);
275 final TimerTask timerTask = new TimerTask() {
279 service.shutdownNow();
281 if (occurrences.get() < minRequiredOccurrences || occurrencesOutOfRange.get() > maxOccurrencesOutOfRange) {
282 LOGGER.warn("NiFi has detected that this box is not responding within the expected timing interval, which may cause "
283 + "Processors to be scheduled erratically. Please see the NiFi documentation for more information.");
287 final Timer timer = new Timer(true);
288 timer.schedule(timerTask, 60000L);
292 * Main entry point of the application.
294 * @param args things which are ignored
296 public static void main(String[] args) {
297 LOGGER.info("Launching NiFi...");
299 NiFiProperties properties = convertArgumentsToValidatedNiFiProperties(args);
300 new NiFi(properties);
301 } catch (final Throwable t) {
302 LOGGER.error("Failure to launch NiFi due to " + t, t);
306 protected static NiFiProperties convertArgumentsToValidatedNiFiProperties(String[] args) {
307 final ClassLoader bootstrap = createBootstrapClassLoader();
308 NiFiProperties properties = initializeProperties(args, bootstrap);
309 properties.validate();
313 private static NiFiProperties initializeProperties(final String[] args, final ClassLoader boostrapLoader) {
315 // If key doesn't exist, instantiate without
317 // If properties are protected and key missing, throw RuntimeException
319 final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
322 key = loadFormattedKey(args);
323 // The key might be empty or null when it is passed to the loader
324 } catch (IllegalArgumentException e) {
325 final String msg = "The bootstrap process did not provide a valid key";
326 throw new IllegalArgumentException(msg, e);
328 Thread.currentThread().setContextClassLoader(boostrapLoader);
331 final Class<?> propsLoaderClass = Class.forName("org.apache.nifi.properties.NiFiPropertiesLoader", true, boostrapLoader);
332 final Method withKeyMethod = propsLoaderClass.getMethod("withKey", String.class);
333 final Object loaderInstance = withKeyMethod.invoke(null, key);
334 final Method getMethod = propsLoaderClass.getMethod("get");
335 final NiFiProperties properties = (NiFiProperties) getMethod.invoke(loaderInstance);
336 LOGGER.info("Loaded {} properties", properties.size());
338 } catch (InvocationTargetException wrappedException) {
339 final String msg = "There was an issue decrypting protected properties";
340 throw new IllegalArgumentException(msg, wrappedException.getCause() == null ? wrappedException : wrappedException.getCause());
341 } catch (final IllegalAccessException | NoSuchMethodException | ClassNotFoundException reex) {
342 final String msg = "Unable to access properties loader in the expected manner - apparent classpath or build issue";
343 throw new IllegalArgumentException(msg, reex);
344 } catch (final RuntimeException e) {
345 final String msg = "There was an issue decrypting protected properties";
346 throw new IllegalArgumentException(msg, e);
348 Thread.currentThread().setContextClassLoader(contextClassLoader);
352 private static String loadFormattedKey(String[] args) {
354 List<String> parsedArgs = parseArgs(args);
355 // Check if args contain protection key
356 if (parsedArgs.contains(KEY_FILE_FLAG)) {
357 key = getKeyFromKeyFileAndPrune(parsedArgs);
358 // Format the key (check hex validity and remove spaces)
359 key = formatHexKey(key);
365 } else if (!isHexKeyValid(key)) {
366 throw new IllegalArgumentException("The key was not provided in valid hex format and of the correct length");
372 private static String getKeyFromKeyFileAndPrune(List<String> parsedArgs) {
374 LOGGER.debug("The bootstrap process provided the " + KEY_FILE_FLAG + " flag");
375 int i = parsedArgs.indexOf(KEY_FILE_FLAG);
376 if (parsedArgs.size() <= i + 1) {
377 LOGGER.error("The bootstrap process passed the {} flag without a filename", KEY_FILE_FLAG);
378 throw new IllegalArgumentException("The bootstrap process provided the " + KEY_FILE_FLAG + " flag but no key");
381 String passwordfile_path = parsedArgs.get(i + 1);
382 // Slurp in the contents of the file:
383 byte[] encoded = Files.readAllBytes(Paths.get(passwordfile_path));
384 key = new String(encoded,StandardCharsets.UTF_8);
385 if (0 == key.length())
386 throw new IllegalArgumentException("Key in keyfile " + passwordfile_path + " yielded an empty key");
388 LOGGER.info("Now overwriting file in "+passwordfile_path);
390 // Overwrite the contents of the file (to avoid littering file system
391 // unlinked with key material):
392 File password_file = new File(passwordfile_path);
393 FileWriter overwriter = new FileWriter(password_file,false);
395 // Construe a random pad:
396 Random r = new Random();
397 StringBuffer sb = new StringBuffer();
398 // Note on correctness: this pad is longer, but equally sufficient.
399 while(sb.length() < encoded.length){
400 sb.append(Integer.toHexString(r.nextInt()));
402 String pad = sb.toString();
403 LOGGER.info("Overwriting key material with pad: "+pad);
404 overwriter.write(pad);
407 LOGGER.info("Removing/unlinking file: "+passwordfile_path);
408 password_file.delete();
410 } catch (IOException e) {
411 LOGGER.error("Caught IOException while retrieving the "+KEY_FILE_FLAG+"-passed keyfile; aborting: "+e.toString());
415 LOGGER.info("Read property protection key from key file provided by bootstrap process");
419 private static List<String> parseArgs(String[] args) {
420 List<String> parsedArgs = new ArrayList<>(Arrays.asList(args));
421 for (int i = 0; i < parsedArgs.size(); i++) {
422 if (parsedArgs.get(i).startsWith(KEY_FILE_FLAG + " ")) {
423 String[] split = parsedArgs.get(i).split(" ", 2);
424 parsedArgs.set(i, split[0]);
425 parsedArgs.add(i + 1, split[1]);
432 private static String formatHexKey(String input) {
433 if (input == null || input.trim().isEmpty()) {
436 return input.replaceAll("[^0-9a-fA-F]", "").toLowerCase();
439 private static boolean isHexKeyValid(String key) {
440 if (key == null || key.trim().isEmpty()) {
443 // Key length is in "nibbles" (i.e. one hex char = 4 bits)
444 return Arrays.asList(128, 196, 256).contains(key.length() * 4) && key.matches("^[0-9a-fA-F]*$");