1 package org.onap.config.impl;
3 import org.onap.config.ConfigurationUtils;
4 import org.onap.config.Constants;
5 import org.onap.config.api.ConfigurationChangeListener;
6 import org.onap.config.api.ConfigurationManager;
7 import org.onap.config.api.Hint;
10 import java.io.IOException;
11 import java.lang.management.ManagementFactory;
12 import java.lang.reflect.Method;
13 import java.nio.file.ClosedWatchServiceException;
14 import java.nio.file.FileSystems;
15 import java.nio.file.Path;
16 import java.nio.file.StandardWatchEventKinds;
17 import java.nio.file.WatchEvent;
18 import java.nio.file.WatchKey;
19 import java.nio.file.WatchService;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.LinkedHashMap;
26 import java.util.List;
28 import java.util.Objects;
30 import java.util.Vector;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.ScheduledExecutorService;
34 import java.util.concurrent.TimeUnit;
35 import javax.management.JMX;
36 import javax.management.MBeanServerConnection;
37 import javax.management.ObjectName;
41 * The type Configuration change notifier.
43 public final class ConfigurationChangeNotifier {
45 private final HashMap<String, List<NotificationData>> store = new HashMap<>();
46 private final ScheduledExecutorService executor =
47 Executors.newScheduledThreadPool(5, ConfigurationUtils.getThreadFactory());
48 private final ExecutorService notificationExecutor =
49 Executors.newCachedThreadPool(ConfigurationUtils.getThreadFactory());
50 private final Map<String, WatchService> watchServiceCollection =
51 Collections.synchronizedMap(new HashMap<>());
54 if (!Thread.currentThread().getStackTrace()[2].getClassName()
55 .equals(ConfigurationImpl.class.getName())) {
56 throw new RuntimeException("Illegal access.");
61 * Instantiates a new Configuration change notifier.
63 * @param inMemoryConfig the in memory config
65 public ConfigurationChangeNotifier(Map<String, AggregateConfiguration> inMemoryConfig) {
66 executor.scheduleWithFixedDelay(() -> this
67 .pollFilesystemAndUpdateConfigurationIfRequired(inMemoryConfig,
68 System.getProperty("config.location"), false), 1, 1, TimeUnit.MILLISECONDS);
69 executor.scheduleWithFixedDelay(() -> this
70 .pollFilesystemAndUpdateConfigurationIfRequired(inMemoryConfig,
71 System.getProperty("tenant.config.location"), true), 1, 1, TimeUnit.MILLISECONDS);
72 executor.scheduleWithFixedDelay(() -> this
73 .pollFilesystemAndUpdateNodeSpecificConfigurationIfRequired(
74 System.getProperty("node.config.location")), 1, 1, TimeUnit.MILLISECONDS);
80 public void shutdown() {
81 for (WatchService watch : watchServiceCollection.values()) {
84 } catch (IOException exception) {
88 executor.shutdownNow();
92 * Poll filesystem and update configuration if required.
94 * @param inMemoryConfig the in memory config
95 * @param location the location
96 * @param isTenantLocation the is tenant location
98 public void pollFilesystemAndUpdateConfigurationIfRequired(
99 Map<String, AggregateConfiguration> inMemoryConfig, String location,
100 boolean isTenantLocation) {
102 Set<Path> paths = watchForChange(location);
104 for (Path path : paths) {
105 File file = path.toAbsolutePath().toFile();
106 String repositoryKey = null;
107 if (ConfigurationUtils.isConfig(file) && file.isFile()) {
108 if (isTenantLocation) {
109 Collection<File> tenantsRoot =
110 ConfigurationUtils.getAllFiles(new File(location), false, true);
111 for (File tenantRoot : tenantsRoot) {
112 if (file.getAbsolutePath().startsWith(tenantRoot.getAbsolutePath())) {
113 repositoryKey = ConfigurationUtils.getConfigurationRepositoryKey(
114 (tenantRoot.getName() + Constants.TENANT_NAMESPACE_SAPERATOR
115 + ConfigurationUtils.getNamespace(file))
116 .split(Constants.TENANT_NAMESPACE_SAPERATOR));
120 repositoryKey = ConfigurationUtils.getConfigurationRepositoryKey(file);
122 AggregateConfiguration config = inMemoryConfig.get(repositoryKey);
123 if (config != null) {
124 LinkedHashMap origConfig = ConfigurationUtils.toMap(config.getFinalConfiguration());
125 config.addConfig(file);
126 LinkedHashMap latestConfig = ConfigurationUtils.toMap(config.getFinalConfiguration());
127 Map map = ConfigurationUtils.diff(origConfig, latestConfig);
128 String[] tenantNamespaceArray =
129 repositoryKey.split(Constants.KEY_ELEMENTS_DELEMETER);
130 updateConfigurationValues(tenantNamespaceArray[0], tenantNamespaceArray[1], map);
133 for (String configKey : inMemoryConfig.keySet()) {
134 repositoryKey = configKey;
135 AggregateConfiguration config = inMemoryConfig.get(repositoryKey);
136 if (config.containsConfig(file)) {
137 LinkedHashMap origConfig = ConfigurationUtils.toMap(config.getFinalConfiguration());
138 config.removeConfig(file);
139 LinkedHashMap latestConfig =
140 ConfigurationUtils.toMap(config.getFinalConfiguration());
141 Map map = ConfigurationUtils.diff(origConfig, latestConfig);
142 String[] tenantNamespaceArray =
143 repositoryKey.split(Constants.KEY_ELEMENTS_DELEMETER);
144 updateConfigurationValues(tenantNamespaceArray[0], tenantNamespaceArray[1],
151 } catch (ClosedWatchServiceException exception) {
153 } catch (Exception exception) {
154 exception.printStackTrace();
158 private void updateConfigurationValues(String tenant, String namespace, Map map)
160 MBeanServerConnection mbsc = ManagementFactory.getPlatformMBeanServer();
161 ObjectName mbeanName = new ObjectName(Constants.MBEAN_NAME);
162 ConfigurationManager conf =
163 JMX.newMBeanProxy(mbsc, mbeanName, ConfigurationManager.class,
165 conf.updateConfigurationValues(tenant, namespace, map);
169 * Poll filesystem and update node specific configuration if required.
171 * @param location the location
173 public void pollFilesystemAndUpdateNodeSpecificConfigurationIfRequired(String location) {
175 Set<Path> paths = watchForChange(location);
177 for (Path path : paths) {
178 File file = path.toAbsolutePath().toFile();
180 if (ConfigurationUtils.isConfig(file)) {
181 String repositoryKey = ConfigurationUtils.getConfigurationRepositoryKey(file);
182 ConfigurationRepository.lookup().populateOverrideConfigurtaion(repositoryKey, file);
184 ConfigurationRepository.lookup().removeOverrideConfigurtaion(file);
188 } catch (Exception exception) {
189 exception.printStackTrace();
194 * Notify changes towards.
196 * @param tenant the tenant
197 * @param component the component
199 * @param myself the myself
200 * @throws Exception the exception
202 public void notifyChangesTowards(String tenant, String component, String key,
203 ConfigurationChangeListener myself) throws Exception {
204 List<NotificationData> notificationList =
205 store.get(tenant + Constants.KEY_ELEMENTS_DELEMETER + component);
206 if (notificationList == null) {
207 notificationList = Collections.synchronizedList(new ArrayList<>());
208 store.put(tenant + Constants.KEY_ELEMENTS_DELEMETER + component, notificationList);
209 executor.scheduleWithFixedDelay(
210 () -> triggerScanning(tenant + Constants.KEY_ELEMENTS_DELEMETER + component), 1, 30000,
211 TimeUnit.MILLISECONDS);
213 notificationList.add(new NotificationData(tenant, component, key, myself));
217 * Stop notification towards.
219 * @param tenant the tenant
220 * @param component the component
222 * @param myself the myself
223 * @throws Exception the exception
225 public void stopNotificationTowards(String tenant, String component, String key,
226 ConfigurationChangeListener myself) throws Exception {
227 List<NotificationData> notificationList =
228 store.get(tenant + Constants.KEY_ELEMENTS_DELEMETER + component);
229 if (notificationList != null) {
231 notificationList.remove(new NotificationData(tenant, component, key, myself));
232 if (removed && notificationList.isEmpty()) {
233 store.remove(tenant + Constants.KEY_ELEMENTS_DELEMETER + component);
239 private void triggerScanning(String key) {
240 if (store.get(key) != null) {
241 notificationExecutor.submit(() -> scanForChanges(key));
243 throw new IllegalArgumentException("Notification service for " + key + " is suspended.");
247 private void scanForChanges(String key) {
248 List<NotificationData> list = store.get(key);
251 .filter(NotificationData::isChanged)
252 .forEach(notificationData -> notificationExecutor.submit(() -> sendNotification(notificationData)));
256 private void sendNotification(NotificationData notificationData) {
258 notificationData.dispatchNotification();
259 } catch (Exception exception) {
260 exception.printStackTrace();
264 private Set<Path> watchForChange(String location) throws Exception {
265 if (location == null || location.trim().length() == 0) {
266 return Collections.emptySet();
268 File file = new File(location);
269 if (!file.exists()) {
270 return Collections.emptySet();
272 Path path = file.toPath();
273 Set<Path> toReturn = new HashSet<>();
274 try (final WatchService watchService = FileSystems.getDefault().newWatchService()) {
275 watchServiceCollection.put(location, watchService);
276 path.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY,
277 StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
278 for (File dir : ConfigurationUtils.getAllFiles(file, true, true)) {
279 dir.toPath().register(watchService, StandardWatchEventKinds.ENTRY_MODIFY,
280 StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
283 final WatchKey wk = watchService.take();
284 Thread.sleep(ConfigurationRepository.lookup()
285 .getConfigurationFor(Constants.DEFAULT_TENANT, Constants.DB_NAMESPACE)
286 .getLong("event.fetch.delay"));
287 for (WatchEvent<?> event : wk.pollEvents()) {
288 Object context = event.context();
289 if (context instanceof Path) {
290 File newFile = new File(((Path) wk.watchable()).toFile(), context.toString());
291 if (event.kind() == StandardWatchEventKinds.ENTRY_CREATE) {
292 if (newFile.isDirectory()) {
293 newFile.toPath().register(watchService, StandardWatchEventKinds.ENTRY_MODIFY,
294 StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
297 } else if (event.kind() == StandardWatchEventKinds.ENTRY_MODIFY) {
298 if (newFile.isDirectory()) {
302 toReturn.add(newFile.toPath());
305 if (toReturn.isEmpty()) {
315 * The type Notification data.
317 static class NotificationData {
326 final String namespace;
334 final ConfigurationChangeListener myself;
345 * Instantiates a new Notification data.
347 * @param tenant the tenant
348 * @param component the component
350 * @param myself the myself
351 * @throws Exception the exception
353 public NotificationData(String tenant, String component, String key,
354 ConfigurationChangeListener myself) throws Exception {
355 this.tenant = tenant;
356 this.namespace = component;
358 this.myself = myself;
359 if (!ConfigurationRepository.lookup().getConfigurationFor(tenant, component)
361 throw new RuntimeException("Key[" + key + "] not found.");
363 isArray = ConfigurationUtils.isArray(tenant, component, key, Hint.DEFAULT.value());
365 currentValue = ConfigurationManager.lookup().getAsStringValues(tenant, component, key);
367 currentValue = ConfigurationManager.lookup().getAsString(tenant, component, key);
372 public boolean equals(Object obj) {
373 if (!(obj instanceof NotificationData)) {
376 NotificationData nd = (NotificationData) obj;
377 return Objects.equals(tenant, nd.tenant)
378 && Objects.equals(namespace, nd.namespace)
379 && Objects.equals(key, nd.key)
380 && Objects.equals(myself, nd.myself)
381 && Objects.equals(currentValue, nd.currentValue) // it's either String or List<String>
382 && isArray == nd.isArray;
386 public int hashCode() {
387 return Objects.hash(tenant, namespace, key, myself, currentValue, isArray);
391 * Is changed boolean.
393 * @return the boolean
395 public boolean isChanged() {
399 latestValue = ConfigurationManager.lookup().getAsStringValues(tenant, namespace, key);
401 latestValue = ConfigurationManager.lookup().getAsString(tenant, namespace, key);
404 return !currentValue.equals(latestValue);
406 Collection<String> oldCollection = (Collection<String>) currentValue;
407 Collection<String> newCollection = (Collection<String>) latestValue;
408 for (String val : oldCollection) {
409 if (!newCollection.remove(val)) {
413 return !newCollection.isEmpty();
415 } catch (Exception exception) {
421 * Dispatch notification.
423 * @throws Exception the exception
425 public void dispatchNotification() throws Exception {
426 Method method = null;
427 Vector<Object> parameters = null;
431 latestValue = ConfigurationManager.lookup().getAsStringValues(tenant, namespace, key);
433 latestValue = ConfigurationManager.lookup().getAsString(tenant, namespace, key);
435 Method[] methods = myself.getClass().getDeclaredMethods();
436 if (methods != null && methods.length > 0) {
438 int paramCount = method.getParameterCount();
439 parameters = new Vector<>();
440 if (paramCount > 4) {
441 if (tenant.equals(Constants.DEFAULT_TENANT)) {
442 parameters.add(null);
444 parameters.add(tenant);
447 if (paramCount > 3) {
448 if (namespace.equals(Constants.DEFAULT_NAMESPACE)) {
449 parameters.add(null);
451 parameters.add(namespace);
455 parameters.add(currentValue);
456 parameters.add(latestValue);
457 method.setAccessible(true);
459 } catch (Exception exception) {
460 exception.printStackTrace();
462 isArray = ConfigurationUtils.isArray(tenant, namespace, key, Hint.DEFAULT.value());
464 currentValue = ConfigurationManager.lookup().getAsStringValues(tenant, namespace, key);
466 currentValue = ConfigurationManager.lookup().getAsString(tenant, namespace, key);
468 if (method != null && parameters != null) {
469 method.invoke(myself, parameters.toArray());