1 package org.openecomp.config.impl;
3 import static org.openecomp.config.ConfigurationUtils.isArray;
5 import org.openecomp.config.ConfigurationUtils;
6 import org.openecomp.config.Constants;
7 import org.openecomp.config.api.ConfigurationChangeListener;
8 import org.openecomp.config.api.ConfigurationManager;
9 import org.openecomp.config.api.Hint;
12 import java.io.IOException;
13 import java.lang.management.ManagementFactory;
14 import java.lang.reflect.Method;
15 import java.nio.file.ClosedWatchServiceException;
16 import java.nio.file.FileSystems;
17 import java.nio.file.Path;
18 import java.nio.file.StandardWatchEventKinds;
19 import java.nio.file.WatchEvent;
20 import java.nio.file.WatchKey;
21 import java.nio.file.WatchService;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.LinkedHashMap;
28 import java.util.List;
30 import java.util.Objects;
32 import java.util.Vector;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.ScheduledExecutorService;
36 import java.util.concurrent.TimeUnit;
37 import javax.management.JMX;
38 import javax.management.MBeanServerConnection;
39 import javax.management.ObjectName;
43 * The type Configuration change notifier.
45 public final class ConfigurationChangeNotifier {
47 private final HashMap<String, List<NotificationData>> store = new HashMap<>();
48 private final ScheduledExecutorService executor =
49 Executors.newScheduledThreadPool(5, ConfigurationUtils.getThreadFactory());
50 private final ExecutorService notificationExecutor =
51 Executors.newCachedThreadPool(ConfigurationUtils.getThreadFactory());
52 private final Map<String, WatchService> watchServiceCollection =
53 Collections.synchronizedMap(new HashMap<>());
56 if (!Thread.currentThread().getStackTrace()[2].getClassName()
57 .equals(ConfigurationImpl.class.getName())) {
58 throw new RuntimeException("Illegal access.");
63 * Instantiates a new Configuration change notifier.
65 * @param inMemoryConfig the in memory config
67 public ConfigurationChangeNotifier(Map<String, AggregateConfiguration> inMemoryConfig) {
68 executor.scheduleWithFixedDelay(() -> this
69 .pollFilesystemAndUpdateConfigurationIfRequired(inMemoryConfig,
70 System.getProperty("config.location"), false), 1, 1, TimeUnit.MILLISECONDS);
71 executor.scheduleWithFixedDelay(() -> this
72 .pollFilesystemAndUpdateConfigurationIfRequired(inMemoryConfig,
73 System.getProperty("tenant.config.location"), true), 1, 1, TimeUnit.MILLISECONDS);
74 executor.scheduleWithFixedDelay(() -> this
75 .pollFilesystemAndUpdateNodeSpecificConfigurationIfRequired(
76 System.getProperty("node.config.location")), 1, 1, TimeUnit.MILLISECONDS);
82 public void shutdown() {
83 for (WatchService watch : watchServiceCollection.values()) {
86 } catch (IOException exception) {
90 executor.shutdownNow();
94 * Poll filesystem and update configuration if required.
96 * @param inMemoryConfig the in memory config
97 * @param location the location
98 * @param isTenantLocation the is tenant location
100 public void pollFilesystemAndUpdateConfigurationIfRequired(
101 Map<String, AggregateConfiguration> inMemoryConfig, String location,
102 boolean isTenantLocation) {
104 Set<Path> paths = watchForChange(location);
106 for (Path path : paths) {
107 File file = path.toAbsolutePath().toFile();
108 String repositoryKey = null;
109 if (ConfigurationUtils.isConfig(file) && file.isFile()) {
110 if (isTenantLocation) {
111 Collection<File> tenantsRoot =
112 ConfigurationUtils.getAllFiles(new File(location), false, true);
113 for (File tenantRoot : tenantsRoot) {
114 if (file.getAbsolutePath().startsWith(tenantRoot.getAbsolutePath())) {
115 repositoryKey = ConfigurationUtils.getConfigurationRepositoryKey(
116 (tenantRoot.getName() + Constants.TENANT_NAMESPACE_SAPERATOR
117 + ConfigurationUtils.getNamespace(file))
118 .split(Constants.TENANT_NAMESPACE_SAPERATOR));
122 repositoryKey = ConfigurationUtils.getConfigurationRepositoryKey(file);
124 AggregateConfiguration config = inMemoryConfig.get(repositoryKey);
125 if (config != null) {
126 LinkedHashMap origConfig = ConfigurationUtils.toMap(config.getFinalConfiguration());
127 config.addConfig(file);
128 LinkedHashMap latestConfig = ConfigurationUtils.toMap(config.getFinalConfiguration());
129 Map map = ConfigurationUtils.diff(origConfig, latestConfig);
130 String[] tenantNamespaceArray =
131 repositoryKey.split(Constants.KEY_ELEMENTS_DELEMETER);
132 updateConfigurationValues(tenantNamespaceArray[0], tenantNamespaceArray[1], map);
135 for (String configKey : inMemoryConfig.keySet()) {
136 repositoryKey = configKey;
137 AggregateConfiguration config = inMemoryConfig.get(repositoryKey);
138 if (config.containsConfig(file)) {
139 LinkedHashMap origConfig = ConfigurationUtils.toMap(config.getFinalConfiguration());
140 config.removeConfig(file);
141 LinkedHashMap latestConfig =
142 ConfigurationUtils.toMap(config.getFinalConfiguration());
143 Map map = ConfigurationUtils.diff(origConfig, latestConfig);
144 String[] tenantNamespaceArray =
145 repositoryKey.split(Constants.KEY_ELEMENTS_DELEMETER);
146 updateConfigurationValues(tenantNamespaceArray[0], tenantNamespaceArray[1],
153 } catch (ClosedWatchServiceException exception) {
155 } catch (Exception exception) {
156 exception.printStackTrace();
160 private void updateConfigurationValues(String tenant, String namespace, Map map)
162 MBeanServerConnection mbsc = ManagementFactory.getPlatformMBeanServer();
163 ObjectName mbeanName = new ObjectName(Constants.MBEAN_NAME);
164 ConfigurationManager conf =
165 JMX.newMBeanProxy(mbsc, mbeanName, org.openecomp.config.api.ConfigurationManager.class,
167 conf.updateConfigurationValues(tenant, namespace, map);
171 * Poll filesystem and update node specific configuration if required.
173 * @param location the location
175 public void pollFilesystemAndUpdateNodeSpecificConfigurationIfRequired(String location) {
177 Set<Path> paths = watchForChange(location);
179 for (Path path : paths) {
180 File file = path.toAbsolutePath().toFile();
182 if (ConfigurationUtils.isConfig(file)) {
183 String repositoryKey = ConfigurationUtils.getConfigurationRepositoryKey(file);
184 ConfigurationRepository.lookup().populateOverrideConfigurtaion(repositoryKey, file);
186 ConfigurationRepository.lookup().removeOverrideConfigurtaion(file);
190 } catch (Exception exception) {
191 exception.printStackTrace();
196 * Notify changes towards.
198 * @param tenant the tenant
199 * @param component the component
201 * @param myself the myself
202 * @throws Exception the exception
204 public void notifyChangesTowards(String tenant, String component, String key,
205 ConfigurationChangeListener myself) throws Exception {
206 List<NotificationData> notificationList =
207 store.get(tenant + Constants.KEY_ELEMENTS_DELEMETER + component);
208 if (notificationList == null) {
209 notificationList = Collections.synchronizedList(new ArrayList<NotificationData>());
210 store.put(tenant + Constants.KEY_ELEMENTS_DELEMETER + component, notificationList);
211 executor.scheduleWithFixedDelay(
212 () -> triggerScanning(tenant + Constants.KEY_ELEMENTS_DELEMETER + component), 1, 30000,
213 TimeUnit.MILLISECONDS);
215 notificationList.add(new NotificationData(tenant, component, key, myself));
219 * Stop notification towards.
221 * @param tenant the tenant
222 * @param component the component
224 * @param myself the myself
225 * @throws Exception the exception
227 public void stopNotificationTowards(String tenant, String component, String key,
228 ConfigurationChangeListener myself) throws Exception {
229 List<NotificationData> notificationList =
230 store.get(tenant + Constants.KEY_ELEMENTS_DELEMETER + component);
231 if (notificationList != null) {
233 notificationList.remove(new NotificationData(tenant, component, key, myself));
234 if (removed && notificationList.isEmpty()) {
235 store.remove(tenant + Constants.KEY_ELEMENTS_DELEMETER + component);
241 private void triggerScanning(String key) {
242 if (store.get(key) != null) {
243 notificationExecutor.submit(() -> scanForChanges(key));
245 throw new IllegalArgumentException("Notification service for " + key + " is suspended.");
249 private void scanForChanges(String key) {
250 List<NotificationData> list = store.get(key);
253 .filter(NotificationData::isChanged)
254 .forEach(notificationData -> notificationExecutor.submit(() -> sendNotification(notificationData)));
258 private void sendNotification(NotificationData notificationData) {
260 notificationData.dispatchNotification();
261 } catch (Exception exception) {
262 exception.printStackTrace();
266 private Set<Path> watchForChange(String location) throws Exception {
267 if (location == null || location.trim().length() == 0) {
270 File file = new File(location);
271 if (!file.exists()) {
274 Path path = file.toPath();
275 Set<Path> toReturn = new HashSet<>();
276 try (final WatchService watchService = FileSystems.getDefault().newWatchService()) {
277 watchServiceCollection.put(location, watchService);
278 path.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY,
279 StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
280 for (File dir : ConfigurationUtils.getAllFiles(file, true, true)) {
281 dir.toPath().register(watchService, StandardWatchEventKinds.ENTRY_MODIFY,
282 StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
285 final WatchKey wk = watchService.take();
286 Thread.sleep(ConfigurationRepository.lookup()
287 .getConfigurationFor(Constants.DEFAULT_TENANT, Constants.DB_NAMESPACE)
288 .getLong("event.fetch.delay"));
289 for (WatchEvent<?> event : wk.pollEvents()) {
290 Object context = event.context();
291 if (context instanceof Path) {
292 File newFile = new File(((Path) wk.watchable()).toFile(), context.toString());
293 if (event.kind() == StandardWatchEventKinds.ENTRY_CREATE) {
294 if (newFile.isDirectory()) {
295 newFile.toPath().register(watchService, StandardWatchEventKinds.ENTRY_MODIFY,
296 StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
299 } else if (event.kind() == StandardWatchEventKinds.ENTRY_MODIFY) {
300 if (newFile.isDirectory()) {
304 toReturn.add(newFile.toPath());
307 if (toReturn.isEmpty()) {
317 * The type Notification data.
319 static class NotificationData {
328 final String namespace;
336 final ConfigurationChangeListener myself;
347 * Instantiates a new Notification data.
349 * @param tenant the tenant
350 * @param component the component
352 * @param myself the myself
353 * @throws Exception the exception
355 public NotificationData(String tenant, String component, String key,
356 ConfigurationChangeListener myself) throws Exception {
357 this.tenant = tenant;
358 this.namespace = component;
360 this.myself = myself;
361 if (!ConfigurationRepository.lookup().getConfigurationFor(tenant, component)
363 throw new RuntimeException("Key[" + key + "] not found.");
365 isArray = isArray(tenant, component, key, Hint.DEFAULT.value());
367 currentValue = ConfigurationManager.lookup().getAsStringValues(tenant, component, key);
369 currentValue = ConfigurationManager.lookup().getAsString(tenant, component, key);
374 public boolean equals(Object obj) {
375 if (!(obj instanceof NotificationData)) {
378 NotificationData nd = (NotificationData) obj;
379 return Objects.equals(tenant, nd.tenant)
380 && Objects.equals(namespace, nd.namespace)
381 && Objects.equals(key, nd.key)
382 && Objects.equals(myself, nd.myself)
383 && Objects.equals(currentValue, nd.currentValue) // it's either String or List<String>
384 && isArray == nd.isArray;
388 public int hashCode() {
389 return Objects.hash(tenant, namespace, key, myself, currentValue, isArray);
393 * Is changed boolean.
395 * @return the boolean
397 public boolean isChanged() {
401 latestValue = ConfigurationManager.lookup().getAsStringValues(tenant, namespace, key);
403 latestValue = ConfigurationManager.lookup().getAsString(tenant, namespace, key);
406 return !currentValue.equals(latestValue);
408 Collection<String> oldCollection = (Collection<String>) currentValue;
409 Collection<String> newCollection = (Collection<String>) latestValue;
410 for (String val : oldCollection) {
411 if (!newCollection.remove(val)) {
415 return !newCollection.isEmpty();
417 } catch (Exception exception) {
423 * Dispatch notification.
425 * @throws Exception the exception
427 public void dispatchNotification() throws Exception {
428 Method method = null;
429 Vector<Object> parameters = null;
433 latestValue = ConfigurationManager.lookup().getAsStringValues(tenant, namespace, key);
435 latestValue = ConfigurationManager.lookup().getAsString(tenant, namespace, key);
437 Method[] methods = myself.getClass().getDeclaredMethods();
438 if (methods != null && methods.length > 0) {
440 int paramCount = method.getParameterCount();
441 parameters = new Vector<>();
442 if (paramCount > 4) {
443 if (tenant.equals(Constants.DEFAULT_TENANT)) {
444 parameters.add(null);
446 parameters.add(tenant);
449 if (paramCount > 3) {
450 if (namespace.equals(Constants.DEFAULT_NAMESPACE)) {
451 parameters.add(null);
453 parameters.add(namespace);
457 parameters.add(currentValue);
458 parameters.add(latestValue);
459 method.setAccessible(true);
461 } catch (Exception exception) {
462 exception.printStackTrace();
464 isArray = isArray(tenant, namespace, key, Hint.DEFAULT.value());
466 currentValue = ConfigurationManager.lookup().getAsStringValues(tenant, namespace, key);
468 currentValue = ConfigurationManager.lookup().getAsString(tenant, namespace, key);
470 if (method != null && parameters != null) {
471 method.invoke(myself, parameters.toArray());