2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 * ============LICENSE_END=========================================================
18 package org.apache.nifi.nar;
20 import org.apache.nifi.bundle.Bundle;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
24 import java.io.IOException;
27 import java.util.LinkedHashSet;
28 import java.util.List;
30 import java.util.concurrent.ScheduledExecutorService;
31 import java.util.concurrent.ScheduledFuture;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.Executors;
36 * Uses the Java executor service scheduler to continuously load new DCAE jars
38 public class DCAEAutoLoader {
40 private static final Logger LOGGER = LoggerFactory.getLogger(DCAEAutoLoader.class);
42 private static final long POLL_INTERVAL_MS = 5000;
45 * Runnable task that grabs list of remotely stored jars, identifies ones that haven't
46 * been processed, builds Nifi bundles for those unprocessed ones and loads them into
47 * the global extension manager.
49 private static class LoaderTask implements Runnable {
51 private static final Logger LOGGER = LoggerFactory.getLogger(LoaderTask.class);
53 private final URI indexJsonDcaeJars;
54 private final ExtensionDiscoveringManager extensionManager;
55 private final Set<URL> processed = new LinkedHashSet();
57 private LoaderTask(URI indexJsonDcaeJars, ExtensionDiscoveringManager extensionManager) {
58 this.indexJsonDcaeJars = indexJsonDcaeJars;
59 this.extensionManager = extensionManager;
65 List<URL> toProcess = DCAEClassLoaders.getDCAEJarsURLs(this.indexJsonDcaeJars);
66 toProcess.removeAll(processed);
68 if (!toProcess.isEmpty()) {
69 Set<Bundle> bundles = DCAEClassLoaders.createDCAEBundles(toProcess);
70 this.extensionManager.discoverExtensions(bundles);
71 processed.addAll(toProcess);
73 LOGGER.info(String.format("#Added DCAE bundles: %d, #Total DCAE bundles: %d ",
74 bundles.size(), processed.size()));
76 } catch (final Exception e) {
77 LOGGER.error("Error loading DCAE jars due to: " + e.getMessage(), e);
82 private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
83 private ScheduledFuture taskFuture;
85 public synchronized void start(URI indexJsonDcaeJars, final ExtensionDiscoveringManager extensionManager) {
86 // Restricting to a single thread
87 if (taskFuture != null && !taskFuture.isCancelled()) {
91 LOGGER.info("Starting DCAE Auto-Loader: {}", new Object[]{indexJsonDcaeJars});
93 LoaderTask task = new LoaderTask(indexJsonDcaeJars, extensionManager);
94 this.taskFuture = executor.scheduleAtFixedRate(task, 0, POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
95 LOGGER.info("DCAE Auto-Loader started");
98 public synchronized void stop() {
99 if (this.taskFuture != null) {
100 this.taskFuture.cancel(true);
101 LOGGER.info("DCAE Auto-Loader stopped");