1 package org.openecomp.config.impl;
3 import static org.openecomp.config.ConfigurationUtils.isArray;
5 import org.openecomp.config.ConfigurationUtils;
6 import org.openecomp.config.Constants;
7 import org.openecomp.config.api.ConfigurationChangeListener;
8 import org.openecomp.config.api.ConfigurationManager;
9 import org.openecomp.config.api.Hint;
12 import java.io.IOException;
13 import java.lang.management.ManagementFactory;
14 import java.lang.reflect.Method;
15 import java.nio.file.ClosedWatchServiceException;
16 import java.nio.file.FileSystems;
17 import java.nio.file.Path;
18 import java.nio.file.StandardWatchEventKinds;
19 import java.nio.file.WatchEvent;
20 import java.nio.file.WatchKey;
21 import java.nio.file.WatchService;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.Iterator;
28 import java.util.LinkedHashMap;
29 import java.util.List;
32 import java.util.Vector;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.ScheduledExecutorService;
36 import java.util.concurrent.TimeUnit;
37 import javax.management.JMX;
38 import javax.management.MBeanServerConnection;
39 import javax.management.ObjectName;
43 * The type Configuration change notifier.
45 public final class ConfigurationChangeNotifier {
47 private HashMap<String, List<NotificationData>> store = new HashMap<>();
48 private ScheduledExecutorService executor =
49 Executors.newScheduledThreadPool(5, ConfigurationUtils.getThreadFactory());
50 private ExecutorService notificationExcecutor =
51 Executors.newCachedThreadPool(ConfigurationUtils.getThreadFactory());
52 private Map<String, WatchService> watchServiceCollection =
53 Collections.synchronizedMap(new HashMap<>());
56 if (!Thread.currentThread().getStackTrace()[2].getClassName()
57 .equals(ConfigurationImpl.class.getName())) {
58 throw new RuntimeException("Illegal access.");
63 * Instantiates a new Configuration change notifier.
65 * @param inMemoryConfig the in memory config
67 public ConfigurationChangeNotifier(Map<String, AggregateConfiguration> inMemoryConfig) {
68 executor.scheduleWithFixedDelay(() -> this
69 .pollFilesystemAndUpdateConfigurationIfREquired(inMemoryConfig,
70 System.getProperty("config.location"), false), 1, 1, TimeUnit.MILLISECONDS);
71 executor.scheduleWithFixedDelay(() -> this
72 .pollFilesystemAndUpdateConfigurationIfREquired(inMemoryConfig,
73 System.getProperty("tenant.config.location"), true), 1, 1, TimeUnit.MILLISECONDS);
74 executor.scheduleWithFixedDelay(() -> this
75 .pollFilesystemAndUpdateNodeSpecificConfigurationIfREquired(
76 System.getProperty("node.config.location")), 1, 1, TimeUnit.MILLISECONDS);
82 public void shutdown() {
83 for (WatchService watch : watchServiceCollection.values()) {
86 } catch (IOException exception) {
90 executor.shutdownNow();
94 * Poll filesystem and update configuration if r equired.
96 * @param inMemoryConfig the in memory config
97 * @param location the location
98 * @param isTenantLocation the is tenant location
100 public void pollFilesystemAndUpdateConfigurationIfREquired(
101 Map<String, AggregateConfiguration> inMemoryConfig, String location,
102 boolean isTenantLocation) {
104 Set<Path> paths = watchForChange(location);
106 for (Path path : paths) {
107 File file = path.toAbsolutePath().toFile();
108 String repositoryKey = null;
109 if (ConfigurationUtils.isConfig(file) && file.isFile()) {
110 if (isTenantLocation) {
111 Collection<File> tenantsRoot =
112 ConfigurationUtils.getAllFiles(new File(location), false, true);
113 for (File tenantRoot : tenantsRoot) {
114 if (file.getAbsolutePath().startsWith(tenantRoot.getAbsolutePath())) {
115 repositoryKey = ConfigurationUtils.getConfigurationRepositoryKey(
116 (tenantRoot.getName() + Constants.TENANT_NAMESPACE_SAPERATOR
117 + ConfigurationUtils.getNamespace(file))
118 .split(Constants.TENANT_NAMESPACE_SAPERATOR));
122 repositoryKey = ConfigurationUtils.getConfigurationRepositoryKey(file);
124 AggregateConfiguration config = inMemoryConfig.get(repositoryKey);
125 if (config != null) {
126 LinkedHashMap origConfig = ConfigurationUtils.toMap(config.getFinalConfiguration());
127 config.addConfig(file);
128 LinkedHashMap latestConfig = ConfigurationUtils.toMap(config.getFinalConfiguration());
129 Map map = ConfigurationUtils.diff(origConfig, latestConfig);
130 String[] tenantNamespaceArray =
131 repositoryKey.split(Constants.KEY_ELEMENTS_DELEMETER);
132 updateConfigurationValues(tenantNamespaceArray[0], tenantNamespaceArray[1], map);
135 Iterator<String> repoKeys = inMemoryConfig.keySet().iterator();
136 while (repoKeys.hasNext()) {
137 repositoryKey = repoKeys.next();
138 AggregateConfiguration config = inMemoryConfig.get(repositoryKey);
139 if (config.containsConfig(file)) {
140 LinkedHashMap origConfig = ConfigurationUtils.toMap(config.getFinalConfiguration());
141 config.removeConfig(file);
142 LinkedHashMap latestConfig =
143 ConfigurationUtils.toMap(config.getFinalConfiguration());
144 Map map = ConfigurationUtils.diff(origConfig, latestConfig);
145 String[] tenantNamespaceArray =
146 repositoryKey.split(Constants.KEY_ELEMENTS_DELEMETER);
147 updateConfigurationValues(tenantNamespaceArray[0], tenantNamespaceArray[1],
154 } catch (ClosedWatchServiceException exception) {
156 } catch (Exception exception) {
157 exception.printStackTrace();
161 private void updateConfigurationValues(String tenant, String namespace, Map map)
163 MBeanServerConnection mbsc = ManagementFactory.getPlatformMBeanServer();
164 ObjectName mbeanName = new ObjectName(Constants.MBEAN_NAME);
165 ConfigurationManager conf =
166 JMX.newMBeanProxy(mbsc, mbeanName, org.openecomp.config.api.ConfigurationManager.class,
168 conf.updateConfigurationValues(tenant, namespace, map);
172 * Poll filesystem and update node specific configuration if r equired.
174 * @param location the location
176 public void pollFilesystemAndUpdateNodeSpecificConfigurationIfREquired(String location) {
178 Set<Path> paths = watchForChange(location);
180 for (Path path : paths) {
181 File file = path.toAbsolutePath().toFile();
182 String repositoryKey = null;
183 if (ConfigurationUtils.isConfig(file)) {
184 repositoryKey = ConfigurationUtils.getConfigurationRepositoryKey(file);
185 ConfigurationRepository.lookup().populateOverrideConfigurtaion(repositoryKey, file);
187 ConfigurationRepository.lookup().removeOverrideConfigurtaion(file);
191 } catch (Throwable exception) {
192 exception.printStackTrace();
197 * Notify changes towards.
199 * @param tenant the tenant
200 * @param component the component
202 * @param myself the myself
203 * @throws Exception the exception
205 public void notifyChangesTowards(String tenant, String component, String key,
206 ConfigurationChangeListener myself) throws Exception {
207 List<NotificationData> notificationList =
208 store.get(tenant + Constants.KEY_ELEMENTS_DELEMETER + component);
209 if (notificationList == null) {
210 notificationList = Collections.synchronizedList(new ArrayList<NotificationData>());
211 store.put(tenant + Constants.KEY_ELEMENTS_DELEMETER + component, notificationList);
212 executor.scheduleWithFixedDelay(
213 () -> triggerScanning(tenant + Constants.KEY_ELEMENTS_DELEMETER + component), 1, 30000,
214 TimeUnit.MILLISECONDS);
216 notificationList.add(new NotificationData(tenant, component, key, myself));
220 * Stop notification towards.
222 * @param tenant the tenant
223 * @param component the component
225 * @param myself the myself
226 * @throws Exception the exception
228 public void stopNotificationTowards(String tenant, String component, String key,
229 ConfigurationChangeListener myself) throws Exception {
230 List<NotificationData> notificationList =
231 store.get(tenant + Constants.KEY_ELEMENTS_DELEMETER + component);
232 if (notificationList != null) {
234 notificationList.remove(new NotificationData(tenant, component, key, myself));
235 if (removed && notificationList.isEmpty()) {
236 store.remove(tenant + Constants.KEY_ELEMENTS_DELEMETER + component);
242 private void triggerScanning(String key) {
243 if (store.get(key) != null) {
244 notificationExcecutor.submit(() -> scanForChanges(key));
246 throw new IllegalArgumentException("Notification service for " + key + " is suspended.");
250 private void scanForChanges(String key) {
251 List<NotificationData> list = store.get(key);
253 int size = list.size();
254 for (int i = 0; i < size; i++) {
255 NotificationData notificationData = list.get(i);
256 if (notificationData.isChanged()) {
257 notificationExcecutor.submit(() -> sendNotification(notificationData));
263 private void sendNotification(NotificationData notificationData) {
265 notificationData.dispatchNotification();
266 } catch (Exception exception) {
267 exception.printStackTrace();
271 private Set<Path> watchForChange(String location) throws Exception {
272 if (location == null || location.trim().length() == 0) {
275 File file = new File(location);
276 if (!file.exists()) {
279 Path path = file.toPath();
280 Set<Path> toReturn = new HashSet<>();
281 try (final WatchService watchService = FileSystems.getDefault().newWatchService()) {
282 watchServiceCollection.put(location, watchService);
283 path.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY,
284 StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
285 for (File dir : ConfigurationUtils.getAllFiles(file, true, true)) {
286 dir.toPath().register(watchService, StandardWatchEventKinds.ENTRY_MODIFY,
287 StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
290 final WatchKey wk = watchService.take();
291 Thread.sleep(ConfigurationRepository.lookup()
292 .getConfigurationFor(Constants.DEFAULT_TENANT, Constants.DB_NAMESPACE)
293 .getLong("event.fetch.delay"));
294 for (WatchEvent<?> event : wk.pollEvents()) {
295 Object context = event.context();
296 if (context instanceof Path) {
297 File newFile = new File(((Path) wk.watchable()).toFile(), context.toString());
298 if (event.kind() == StandardWatchEventKinds.ENTRY_CREATE) {
299 if (newFile.isDirectory()) {
300 newFile.toPath().register(watchService, StandardWatchEventKinds.ENTRY_MODIFY,
301 StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
304 } else if (event.kind() == StandardWatchEventKinds.ENTRY_MODIFY) {
305 if (newFile.isDirectory()) {
309 toReturn.add(newFile.toPath());
312 if (toReturn.isEmpty()) {
322 * The type Notification data.
324 class NotificationData {
341 ConfigurationChangeListener myself;
352 * Instantiates a new Notification data.
354 * @param tenant the tenant
355 * @param component the component
357 * @param myself the myself
358 * @throws Exception the exception
360 public NotificationData(String tenant, String component, String key,
361 ConfigurationChangeListener myself) throws Exception {
362 this.tenant = tenant;
363 this.namespace = component;
365 this.myself = myself;
366 if (!ConfigurationRepository.lookup().getConfigurationFor(tenant, component)
368 throw new RuntimeException("Key[" + key + "] not found.");
370 isArray = isArray(tenant, component, key, Hint.DEFAULT.value());
372 currentValue = ConfigurationManager.lookup().getAsStringValues(tenant, component, key);
374 currentValue = ConfigurationManager.lookup().getAsString(tenant, component, key);
379 public boolean equals(Object obj) {
380 if (!(obj instanceof NotificationData)) {
383 NotificationData nd = (NotificationData) obj;
384 return tenant.equals(nd.tenant) && namespace.equals(nd.namespace) && key.equals(nd.key)
385 && myself.equals(nd.myself);
389 * Is changed boolean.
391 * @return the boolean
393 public boolean isChanged() {
397 latestValue = ConfigurationManager.lookup().getAsStringValues(tenant, namespace, key);
399 latestValue = ConfigurationManager.lookup().getAsString(tenant, namespace, key);
402 return !currentValue.equals(latestValue);
404 Collection<String> oldCollection = (Collection<String>) currentValue;
405 Collection<String> newCollection = (Collection<String>) latestValue;
406 for (String val : oldCollection) {
407 if (!newCollection.remove(val)) {
411 return !newCollection.isEmpty();
413 } catch (Exception exception) {
419 * Dispatch notification.
421 * @throws Exception the exception
423 public void dispatchNotification() throws Exception {
424 Method method = null;
425 Vector<Object> parameters = null;
427 Object latestValue = null;
429 latestValue = ConfigurationManager.lookup().getAsStringValues(tenant, namespace, key);
431 latestValue = ConfigurationManager.lookup().getAsString(tenant, namespace, key);
433 Method[] methods = myself.getClass().getDeclaredMethods();
434 if (methods != null && methods.length > 0) {
436 int paramCount = method.getParameterCount();
437 parameters = new Vector<>();
438 if (paramCount > 4) {
439 if (tenant.equals(Constants.DEFAULT_TENANT)) {
440 parameters.add(null);
442 parameters.add(tenant);
445 if (paramCount > 3) {
446 if (namespace.equals(Constants.DEFAULT_NAMESPACE)) {
447 parameters.add(null);
449 parameters.add(namespace);
453 parameters.add(currentValue);
454 parameters.add(latestValue);
455 method.setAccessible(true);
457 } catch (Exception exception) {
458 exception.printStackTrace();
460 isArray = isArray(tenant, namespace, key, Hint.DEFAULT.value());
462 currentValue = ConfigurationManager.lookup().getAsStringValues(tenant, namespace, key);
464 currentValue = ConfigurationManager.lookup().getAsString(tenant, namespace, key);
466 if (method != null && parameters != null) {
467 method.invoke(myself, parameters.toArray());