1 package org.onap.config.impl;
3 import org.onap.config.ConfigurationUtils;
4 import org.onap.config.Constants;
5 import org.onap.config.api.ConfigurationChangeListener;
6 import org.onap.config.api.ConfigurationManager;
7 import org.onap.config.api.Hint;
10 import java.io.IOException;
11 import java.lang.management.ManagementFactory;
12 import java.lang.reflect.Method;
13 import java.nio.file.ClosedWatchServiceException;
14 import java.nio.file.FileSystems;
15 import java.nio.file.Path;
16 import java.nio.file.StandardWatchEventKinds;
17 import java.nio.file.WatchEvent;
18 import java.nio.file.WatchKey;
19 import java.nio.file.WatchService;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.LinkedHashMap;
26 import java.util.List;
28 import java.util.Objects;
30 import java.util.Vector;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.ScheduledExecutorService;
34 import java.util.concurrent.TimeUnit;
35 import javax.management.JMX;
36 import javax.management.MBeanServerConnection;
37 import javax.management.ObjectName;
42 * The type Configuration change notifier.
44 public final class ConfigurationChangeNotifier {
46 private final HashMap<String, List<NotificationData>> store = new HashMap<>();
47 private final ScheduledExecutorService executor =
48 Executors.newScheduledThreadPool(5, ConfigurationUtils.getThreadFactory());
49 private final ExecutorService notificationExecutor =
50 Executors.newCachedThreadPool(ConfigurationUtils.getThreadFactory());
51 private final Map<String, WatchService> watchServiceCollection =
52 Collections.synchronizedMap(new HashMap<>());
55 if (!Thread.currentThread().getStackTrace()[2].getClassName()
56 .equals(ConfigurationImpl.class.getName())) {
57 throw new RuntimeException("Illegal access.");
62 * Instantiates a new Configuration change notifier.
64 * @param inMemoryConfig the in memory config
66 public ConfigurationChangeNotifier(Map<String, AggregateConfiguration> inMemoryConfig) {
67 executor.scheduleWithFixedDelay(() -> this
68 .pollFilesystemAndUpdateConfigurationIfRequired(inMemoryConfig,
69 System.getProperty("config.location"), false), 1, 1, TimeUnit.MILLISECONDS);
70 executor.scheduleWithFixedDelay(() -> this
71 .pollFilesystemAndUpdateConfigurationIfRequired(inMemoryConfig,
72 System.getProperty("tenant.config.location"), true), 1, 1, TimeUnit.MILLISECONDS);
73 executor.scheduleWithFixedDelay(() -> this
74 .pollFilesystemAndUpdateNodeSpecificConfigurationIfRequired(
75 System.getProperty("node.config.location")), 1, 1, TimeUnit.MILLISECONDS);
81 public void shutdown() {
82 for (WatchService watch : watchServiceCollection.values()) {
85 } catch (IOException exception) {
89 executor.shutdownNow();
93 * Poll filesystem and update configuration if required.
95 * @param inMemoryConfig the in memory config
96 * @param location the location
97 * @param isTenantLocation the is tenant location
99 public void pollFilesystemAndUpdateConfigurationIfRequired(
100 Map<String, AggregateConfiguration> inMemoryConfig, String location,
101 boolean isTenantLocation) {
103 Set<Path> paths = watchForChange(location);
105 for (Path path : paths) {
106 File file = path.toAbsolutePath().toFile();
107 String repositoryKey = null;
108 if (ConfigurationUtils.isConfig(file) && file.isFile()) {
109 if (isTenantLocation) {
110 Collection<File> tenantsRoot =
111 ConfigurationUtils.getAllFiles(new File(location), false, true);
112 for (File tenantRoot : tenantsRoot) {
113 if (file.getAbsolutePath().startsWith(tenantRoot.getAbsolutePath())) {
114 repositoryKey = ConfigurationUtils.getConfigurationRepositoryKey(
115 (tenantRoot.getName() + Constants.TENANT_NAMESPACE_SAPERATOR
116 + ConfigurationUtils.getNamespace(file))
117 .split(Constants.TENANT_NAMESPACE_SAPERATOR));
121 repositoryKey = ConfigurationUtils.getConfigurationRepositoryKey(file);
123 AggregateConfiguration config = inMemoryConfig.get(repositoryKey);
124 if (config != null) {
125 LinkedHashMap origConfig = ConfigurationUtils.toMap(config.getFinalConfiguration());
126 config.addConfig(file);
127 LinkedHashMap latestConfig = ConfigurationUtils.toMap(config.getFinalConfiguration());
128 Map map = ConfigurationUtils.diff(origConfig, latestConfig);
129 String[] tenantNamespaceArray =
130 repositoryKey.split(Constants.KEY_ELEMENTS_DELEMETER);
131 updateConfigurationValues(tenantNamespaceArray[0], tenantNamespaceArray[1], map);
134 for (String configKey : inMemoryConfig.keySet()) {
135 repositoryKey = configKey;
136 AggregateConfiguration config = inMemoryConfig.get(repositoryKey);
137 if (config.containsConfig(file)) {
138 LinkedHashMap origConfig = ConfigurationUtils.toMap(config.getFinalConfiguration());
139 config.removeConfig(file);
140 LinkedHashMap latestConfig =
141 ConfigurationUtils.toMap(config.getFinalConfiguration());
142 Map map = ConfigurationUtils.diff(origConfig, latestConfig);
143 String[] tenantNamespaceArray =
144 repositoryKey.split(Constants.KEY_ELEMENTS_DELEMETER);
145 updateConfigurationValues(tenantNamespaceArray[0], tenantNamespaceArray[1],
152 } catch (ClosedWatchServiceException exception) {
154 } catch (Exception exception) {
155 exception.printStackTrace();
159 private void updateConfigurationValues(String tenant, String namespace, Map map)
161 MBeanServerConnection mbsc = ManagementFactory.getPlatformMBeanServer();
162 ObjectName mbeanName = new ObjectName(Constants.MBEAN_NAME);
163 ConfigurationManager conf =
164 JMX.newMBeanProxy(mbsc, mbeanName, ConfigurationManager.class,
166 conf.updateConfigurationValues(tenant, namespace, map);
170 * Poll filesystem and update node specific configuration if required.
172 * @param location the location
174 public void pollFilesystemAndUpdateNodeSpecificConfigurationIfRequired(String location) {
176 Set<Path> paths = watchForChange(location);
178 for (Path path : paths) {
179 File file = path.toAbsolutePath().toFile();
181 if (ConfigurationUtils.isConfig(file)) {
182 String repositoryKey = ConfigurationUtils.getConfigurationRepositoryKey(file);
183 ConfigurationRepository.lookup().populateOverrideConfigurtaion(repositoryKey, file);
185 ConfigurationRepository.lookup().removeOverrideConfigurtaion(file);
189 } catch (Exception exception) {
190 exception.printStackTrace();
195 * Notify changes towards.
197 * @param tenant the tenant
198 * @param component the component
200 * @param myself the myself
201 * @throws Exception the exception
203 public void notifyChangesTowards(String tenant, String component, String key,
204 ConfigurationChangeListener myself) throws Exception {
205 List<NotificationData> notificationList =
206 store.get(tenant + Constants.KEY_ELEMENTS_DELEMETER + component);
207 if (notificationList == null) {
208 notificationList = Collections.synchronizedList(new ArrayList<>());
209 store.put(tenant + Constants.KEY_ELEMENTS_DELEMETER + component, notificationList);
210 executor.scheduleWithFixedDelay(
211 () -> triggerScanning(tenant + Constants.KEY_ELEMENTS_DELEMETER + component), 1, 30000,
212 TimeUnit.MILLISECONDS);
214 notificationList.add(new NotificationData(tenant, component, key, myself));
218 * Stop notification towards.
220 * @param tenant the tenant
221 * @param component the component
223 * @param myself the myself
224 * @throws Exception the exception
226 public void stopNotificationTowards(String tenant, String component, String key,
227 ConfigurationChangeListener myself) throws Exception {
228 List<NotificationData> notificationList =
229 store.get(tenant + Constants.KEY_ELEMENTS_DELEMETER + component);
230 if (notificationList != null) {
232 notificationList.remove(new NotificationData(tenant, component, key, myself));
233 if (removed && notificationList.isEmpty()) {
234 store.remove(tenant + Constants.KEY_ELEMENTS_DELEMETER + component);
240 private void triggerScanning(String key) {
241 if (store.get(key) != null) {
242 notificationExecutor.submit(() -> scanForChanges(key));
244 throw new IllegalArgumentException("Notification service for " + key + " is suspended.");
248 private void scanForChanges(String key) {
249 List<NotificationData> list = store.get(key);
252 .filter(NotificationData::isChanged)
253 .forEach(notificationData -> notificationExecutor.submit(() -> sendNotification(notificationData)));
257 private void sendNotification(NotificationData notificationData) {
259 notificationData.dispatchNotification();
260 } catch (Exception exception) {
261 exception.printStackTrace();
265 private Set<Path> watchForChange(String location) throws Exception {
266 if (location == null || location.trim().length() == 0) {
267 return Collections.emptySet();
269 File file = new File(location);
270 if (!file.exists()) {
271 return Collections.emptySet();
273 Path path = file.toPath();
274 Set<Path> toReturn = new HashSet<>();
275 try (final WatchService watchService = FileSystems.getDefault().newWatchService()) {
276 watchServiceCollection.put(location, watchService);
277 path.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY,
278 StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
279 for (File dir : ConfigurationUtils.getAllFiles(file, true, true)) {
280 dir.toPath().register(watchService, StandardWatchEventKinds.ENTRY_MODIFY,
281 StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
284 final WatchKey wk = watchService.take();
285 Thread.sleep(ConfigurationRepository.lookup()
286 .getConfigurationFor(Constants.DEFAULT_TENANT, Constants.DB_NAMESPACE)
287 .getLong("event.fetch.delay"));
288 for (WatchEvent<?> event : wk.pollEvents()) {
289 Object context = event.context();
290 if (context instanceof Path) {
291 File newFile = new File(((Path) wk.watchable()).toFile(), context.toString());
292 if (event.kind() == StandardWatchEventKinds.ENTRY_CREATE) {
293 if (newFile.isDirectory()) {
294 newFile.toPath().register(watchService, StandardWatchEventKinds.ENTRY_MODIFY,
295 StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
298 } else if (event.kind() == StandardWatchEventKinds.ENTRY_MODIFY) {
299 if (newFile.isDirectory()) {
303 toReturn.add(newFile.toPath());
306 if (toReturn.isEmpty()) {
316 * The type Notification data.
318 static class NotificationData {
327 final String namespace;
335 final ConfigurationChangeListener myself;
346 * Instantiates a new Notification data.
348 * @param tenant the tenant
349 * @param component the component
351 * @param myself the myself
352 * @throws Exception the exception
354 public NotificationData(String tenant, String component, String key,
355 ConfigurationChangeListener myself) throws Exception {
356 this.tenant = tenant;
357 this.namespace = component;
359 this.myself = myself;
360 if (!ConfigurationRepository.lookup().getConfigurationFor(tenant, component)
362 throw new RuntimeException("Key[" + key + "] not found.");
364 isArray = ConfigurationUtils.isArray(tenant, component, key, Hint.DEFAULT.value());
366 currentValue = ConfigurationManager.lookup().getAsStringValues(tenant, component, key);
368 currentValue = ConfigurationManager.lookup().getAsString(tenant, component, key);
373 public boolean equals(Object obj) {
374 if (!(obj instanceof NotificationData)) {
377 NotificationData nd = (NotificationData) obj;
378 return Objects.equals(tenant, nd.tenant)
379 && Objects.equals(namespace, nd.namespace)
380 && Objects.equals(key, nd.key)
381 && Objects.equals(myself, nd.myself)
382 && Objects.equals(currentValue, nd.currentValue) // it's either String or List<String>
383 && isArray == nd.isArray;
387 public int hashCode() {
388 return Objects.hash(tenant, namespace, key, myself, currentValue, isArray);
392 * Is changed boolean.
394 * @return the boolean
396 public boolean isChanged() {
400 latestValue = ConfigurationManager.lookup().getAsStringValues(tenant, namespace, key);
402 latestValue = ConfigurationManager.lookup().getAsString(tenant, namespace, key);
405 return !currentValue.equals(latestValue);
407 Collection<String> oldCollection = (Collection<String>) currentValue;
408 Collection<String> newCollection = (Collection<String>) latestValue;
409 for (String val : oldCollection) {
410 if (!newCollection.remove(val)) {
414 return !newCollection.isEmpty();
416 } catch (Exception exception) {
422 * Dispatch notification.
424 * @throws Exception the exception
426 public void dispatchNotification() throws Exception {
427 Method method = null;
428 Vector<Object> parameters = null;
432 latestValue = ConfigurationManager.lookup().getAsStringValues(tenant, namespace, key);
434 latestValue = ConfigurationManager.lookup().getAsString(tenant, namespace, key);
436 Method[] methods = myself.getClass().getDeclaredMethods();
437 if (methods != null && methods.length > 0) {
439 int paramCount = method.getParameterCount();
440 parameters = new Vector<>();
441 if (paramCount > 4) {
442 if (tenant.equals(Constants.DEFAULT_TENANT)) {
443 parameters.add(null);
445 parameters.add(tenant);
448 if (paramCount > 3) {
449 if (namespace.equals(Constants.DEFAULT_NAMESPACE)) {
450 parameters.add(null);
452 parameters.add(namespace);
456 parameters.add(currentValue);
457 parameters.add(latestValue);
458 method.setAccessible(true);
460 } catch (Exception exception) {
461 exception.printStackTrace();
463 isArray = ConfigurationUtils.isArray(tenant, namespace, key, Hint.DEFAULT.value());
465 currentValue = ConfigurationManager.lookup().getAsStringValues(tenant, namespace, key);
467 currentValue = ConfigurationManager.lookup().getAsString(tenant, namespace, key);
469 if (method != null && parameters != null) {
470 method.invoke(myself, parameters.toArray());