88c71c14e0512fef6c2dc550aa42e7b779c3cb3d
[sdc.git] /
1 package org.onap.config.impl;
2
3 import org.onap.config.ConfigurationUtils;
4 import org.onap.config.Constants;
5 import org.onap.config.api.ConfigurationChangeListener;
6 import org.onap.config.api.ConfigurationManager;
7 import org.onap.config.api.Hint;
8  
9 import java.io.File;
10 import java.io.IOException;
11 import java.lang.management.ManagementFactory;
12 import java.lang.reflect.Method;
13 import java.nio.file.ClosedWatchServiceException;
14 import java.nio.file.FileSystems;
15 import java.nio.file.Path;
16 import java.nio.file.StandardWatchEventKinds;
17 import java.nio.file.WatchEvent;
18 import java.nio.file.WatchKey;
19 import java.nio.file.WatchService;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.LinkedHashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Objects;
29 import java.util.Set;
30 import java.util.Vector;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.ScheduledExecutorService;
34 import java.util.concurrent.TimeUnit;
35 import javax.management.JMX;
36 import javax.management.MBeanServerConnection;
37 import javax.management.ObjectName;
38
39
40
41 /**
42  * The type Configuration change notifier.
43  */
44 public final class ConfigurationChangeNotifier {
45
46   private final HashMap<String, List<NotificationData>> store = new HashMap<>();
47   private final ScheduledExecutorService executor =
48       Executors.newScheduledThreadPool(5, ConfigurationUtils.getThreadFactory());
49   private final ExecutorService notificationExecutor =
50       Executors.newCachedThreadPool(ConfigurationUtils.getThreadFactory());
51   private final Map<String, WatchService> watchServiceCollection =
52       Collections.synchronizedMap(new HashMap<>());
53
54   static {
55     if (!Thread.currentThread().getStackTrace()[2].getClassName()
56         .equals(ConfigurationImpl.class.getName())) {
57       throw new RuntimeException("Illegal access.");
58     }
59   }
60
61   /**
62    * Instantiates a new Configuration change notifier.
63    *
64    * @param inMemoryConfig the in memory config
65    */
66   public ConfigurationChangeNotifier(Map<String, AggregateConfiguration> inMemoryConfig) {
67     executor.scheduleWithFixedDelay(() -> this
68         .pollFilesystemAndUpdateConfigurationIfRequired(inMemoryConfig,
69             System.getProperty("config.location"), false), 1, 1, TimeUnit.MILLISECONDS);
70     executor.scheduleWithFixedDelay(() -> this
71         .pollFilesystemAndUpdateConfigurationIfRequired(inMemoryConfig,
72             System.getProperty("tenant.config.location"), true), 1, 1, TimeUnit.MILLISECONDS);
73     executor.scheduleWithFixedDelay(() -> this
74         .pollFilesystemAndUpdateNodeSpecificConfigurationIfRequired(
75             System.getProperty("node.config.location")), 1, 1, TimeUnit.MILLISECONDS);
76   }
77
78   /**
79    * Shutdown.
80    */
81   public void shutdown() {
82     for (WatchService watch : watchServiceCollection.values()) {
83       try {
84         watch.close();
85       } catch (IOException exception) {
86         //do nothing
87       }
88     }
89     executor.shutdownNow();
90   }
91
92   /**
93    * Poll filesystem and update configuration if required.
94    *
95    * @param inMemoryConfig   the in memory config
96    * @param location         the location
97    * @param isTenantLocation the is tenant location
98    */
99   public void pollFilesystemAndUpdateConfigurationIfRequired(
100       Map<String, AggregateConfiguration> inMemoryConfig, String location,
101       boolean isTenantLocation) {
102     try {
103       Set<Path> paths = watchForChange(location);
104       if (paths != null) {
105         for (Path path : paths) {
106           File file = path.toAbsolutePath().toFile();
107           String repositoryKey = null;
108           if (ConfigurationUtils.isConfig(file) && file.isFile()) {
109             if (isTenantLocation) {
110               Collection<File> tenantsRoot =
111                   ConfigurationUtils.getAllFiles(new File(location), false, true);
112               for (File tenantRoot : tenantsRoot) {
113                 if (file.getAbsolutePath().startsWith(tenantRoot.getAbsolutePath())) {
114                   repositoryKey = ConfigurationUtils.getConfigurationRepositoryKey(
115                       (tenantRoot.getName() + Constants.TENANT_NAMESPACE_SAPERATOR
116                           + ConfigurationUtils.getNamespace(file))
117                           .split(Constants.TENANT_NAMESPACE_SAPERATOR));
118                 }
119               }
120             } else {
121               repositoryKey = ConfigurationUtils.getConfigurationRepositoryKey(file);
122             }
123             AggregateConfiguration config = inMemoryConfig.get(repositoryKey);
124             if (config != null) {
125               LinkedHashMap origConfig = ConfigurationUtils.toMap(config.getFinalConfiguration());
126               config.addConfig(file);
127               LinkedHashMap latestConfig = ConfigurationUtils.toMap(config.getFinalConfiguration());
128               Map map = ConfigurationUtils.diff(origConfig, latestConfig);
129               String[] tenantNamespaceArray =
130                   repositoryKey.split(Constants.KEY_ELEMENTS_DELEMETER);
131               updateConfigurationValues(tenantNamespaceArray[0], tenantNamespaceArray[1], map);
132             }
133           } else {
134             for (String configKey : inMemoryConfig.keySet()) {
135               repositoryKey = configKey;
136               AggregateConfiguration config = inMemoryConfig.get(repositoryKey);
137               if (config.containsConfig(file)) {
138                 LinkedHashMap origConfig = ConfigurationUtils.toMap(config.getFinalConfiguration());
139                 config.removeConfig(file);
140                 LinkedHashMap latestConfig =
141                         ConfigurationUtils.toMap(config.getFinalConfiguration());
142                 Map map = ConfigurationUtils.diff(origConfig, latestConfig);
143                 String[] tenantNamespaceArray =
144                         repositoryKey.split(Constants.KEY_ELEMENTS_DELEMETER);
145                 updateConfigurationValues(tenantNamespaceArray[0], tenantNamespaceArray[1],
146                         map);
147               }
148             }
149           }
150         }
151       }
152     } catch (ClosedWatchServiceException exception) {
153       // do nothing.
154     } catch (Exception exception) {
155       exception.printStackTrace();
156     }
157   }
158
159   private void updateConfigurationValues(String tenant, String namespace, Map map)
160       throws Exception {
161     MBeanServerConnection mbsc = ManagementFactory.getPlatformMBeanServer();
162     ObjectName mbeanName = new ObjectName(Constants.MBEAN_NAME);
163     ConfigurationManager conf =
164         JMX.newMBeanProxy(mbsc, mbeanName, ConfigurationManager.class,
165             true);
166     conf.updateConfigurationValues(tenant, namespace, map);
167   }
168
169   /**
170    * Poll filesystem and update node specific configuration if required.
171    *
172    * @param location the location
173    */
174   public void pollFilesystemAndUpdateNodeSpecificConfigurationIfRequired(String location) {
175     try {
176       Set<Path> paths = watchForChange(location);
177       if (paths != null) {
178         for (Path path : paths) {
179           File file = path.toAbsolutePath().toFile();
180
181           if (ConfigurationUtils.isConfig(file)) {
182             String repositoryKey = ConfigurationUtils.getConfigurationRepositoryKey(file);
183             ConfigurationRepository.lookup().populateOverrideConfigurtaion(repositoryKey, file);
184           } else {
185             ConfigurationRepository.lookup().removeOverrideConfigurtaion(file);
186           }
187         }
188       }
189     } catch (Exception exception) {
190       exception.printStackTrace();
191     }
192   }
193
194   /**
195    * Notify changes towards.
196    *
197    * @param tenant    the tenant
198    * @param component the component
199    * @param key       the key
200    * @param myself    the myself
201    * @throws Exception the exception
202    */
203   public void notifyChangesTowards(String tenant, String component, String key,
204                                    ConfigurationChangeListener myself) throws Exception {
205     List<NotificationData> notificationList =
206         store.get(tenant + Constants.KEY_ELEMENTS_DELEMETER + component);
207     if (notificationList == null) {
208       notificationList = Collections.synchronizedList(new ArrayList<>());
209       store.put(tenant + Constants.KEY_ELEMENTS_DELEMETER + component, notificationList);
210       executor.scheduleWithFixedDelay(
211           () -> triggerScanning(tenant + Constants.KEY_ELEMENTS_DELEMETER + component), 1, 30000,
212           TimeUnit.MILLISECONDS);
213     }
214     notificationList.add(new NotificationData(tenant, component, key, myself));
215   }
216
217   /**
218    * Stop notification towards.
219    *
220    * @param tenant    the tenant
221    * @param component the component
222    * @param key       the key
223    * @param myself    the myself
224    * @throws Exception the exception
225    */
226   public void stopNotificationTowards(String tenant, String component, String key,
227                                       ConfigurationChangeListener myself) throws Exception {
228     List<NotificationData> notificationList =
229         store.get(tenant + Constants.KEY_ELEMENTS_DELEMETER + component);
230     if (notificationList != null) {
231       boolean removed =
232           notificationList.remove(new NotificationData(tenant, component, key, myself));
233       if (removed && notificationList.isEmpty()) {
234         store.remove(tenant + Constants.KEY_ELEMENTS_DELEMETER + component);
235       }
236     }
237
238   }
239
240   private void triggerScanning(String key) {
241     if (store.get(key) != null) {
242       notificationExecutor.submit(() -> scanForChanges(key));
243     } else {
244       throw new IllegalArgumentException("Notification service for " + key + " is suspended.");
245     }
246   }
247
248   private void scanForChanges(String key) {
249     List<NotificationData> list = store.get(key);
250     if (list != null) {
251       list.stream()
252               .filter(NotificationData::isChanged)
253               .forEach(notificationData -> notificationExecutor.submit(() -> sendNotification(notificationData)));
254     }
255   }
256
257   private void sendNotification(NotificationData notificationData) {
258     try {
259       notificationData.dispatchNotification();
260     } catch (Exception exception) {
261       exception.printStackTrace();
262     }
263   }
264
265   private Set<Path> watchForChange(String location) throws Exception {
266     if (location == null || location.trim().length() == 0) {
267       return Collections.emptySet();
268     }
269     File file = new File(location);
270     if (!file.exists()) {
271       return Collections.emptySet();
272     }
273     Path path = file.toPath();
274     Set<Path> toReturn = new HashSet<>();
275     try (final WatchService watchService = FileSystems.getDefault().newWatchService()) {
276       watchServiceCollection.put(location, watchService);
277       path.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY,
278           StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
279       for (File dir : ConfigurationUtils.getAllFiles(file, true, true)) {
280         dir.toPath().register(watchService, StandardWatchEventKinds.ENTRY_MODIFY,
281             StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
282       }
283       while (true) {
284         final WatchKey wk = watchService.take();
285         Thread.sleep(ConfigurationRepository.lookup()
286             .getConfigurationFor(Constants.DEFAULT_TENANT, Constants.DB_NAMESPACE)
287             .getLong("event.fetch.delay"));
288         for (WatchEvent<?> event : wk.pollEvents()) {
289           Object context = event.context();
290           if (context instanceof Path) {
291             File newFile = new File(((Path) wk.watchable()).toFile(), context.toString());
292             if (event.kind() == StandardWatchEventKinds.ENTRY_CREATE) {
293               if (newFile.isDirectory()) {
294                 newFile.toPath().register(watchService, StandardWatchEventKinds.ENTRY_MODIFY,
295                     StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
296                 continue;
297               }
298             } else if (event.kind() == StandardWatchEventKinds.ENTRY_MODIFY) {
299               if (newFile.isDirectory()) {
300                 continue;
301               }
302             }
303             toReturn.add(newFile.toPath());
304           }
305         }
306         if (toReturn.isEmpty()) {
307           continue;
308         }
309         break;
310       }
311     }
312     return toReturn;
313   }
314
315   /**
316    * The type Notification data.
317    */
318   static class NotificationData {
319
320     /**
321      * The Tenant.
322      */
323     final String tenant;
324     /**
325      * The Namespace.
326      */
327     final String namespace;
328     /**
329      * The Key.
330      */
331     final String key;
332     /**
333      * The Myself.
334      */
335     final ConfigurationChangeListener myself;
336     /**
337      * The Current value.
338      */
339     Object currentValue;
340     /**
341      * The Is array.
342      */
343     boolean isArray;
344
345     /**
346      * Instantiates a new Notification data.
347      *
348      * @param tenant    the tenant
349      * @param component the component
350      * @param key       the key
351      * @param myself    the myself
352      * @throws Exception the exception
353      */
354     public NotificationData(String tenant, String component, String key,
355                             ConfigurationChangeListener myself) throws Exception {
356       this.tenant = tenant;
357       this.namespace = component;
358       this.key = key;
359       this.myself = myself;
360       if (!ConfigurationRepository.lookup().getConfigurationFor(tenant, component)
361           .containsKey(key)) {
362         throw new RuntimeException("Key[" + key + "] not found.");
363       }
364       isArray = ConfigurationUtils.isArray(tenant, component, key, Hint.DEFAULT.value());
365       if (isArray) {
366         currentValue = ConfigurationManager.lookup().getAsStringValues(tenant, component, key);
367       } else {
368         currentValue = ConfigurationManager.lookup().getAsString(tenant, component, key);
369       }
370     }
371
372     @Override
373     public boolean equals(Object obj) {
374       if (!(obj instanceof NotificationData)) {
375         return false;
376       }
377       NotificationData nd = (NotificationData) obj;
378       return Objects.equals(tenant, nd.tenant)
379               && Objects.equals(namespace, nd.namespace)
380               && Objects.equals(key, nd.key)
381               && Objects.equals(myself, nd.myself)
382               && Objects.equals(currentValue, nd.currentValue) // it's either String or List<String>
383               && isArray == nd.isArray;
384     }
385
386     @Override
387     public int hashCode() {
388       return Objects.hash(tenant, namespace, key, myself, currentValue, isArray);
389     }
390
391     /**
392      * Is changed boolean.
393      *
394      * @return the boolean
395      */
396     public boolean isChanged() {
397       Object latestValue;
398       try {
399         if (isArray) {
400           latestValue = ConfigurationManager.lookup().getAsStringValues(tenant, namespace, key);
401         } else {
402           latestValue = ConfigurationManager.lookup().getAsString(tenant, namespace, key);
403         }
404         if (!isArray) {
405           return !currentValue.equals(latestValue);
406         } else {
407           Collection<String> oldCollection = (Collection<String>) currentValue;
408           Collection<String> newCollection = (Collection<String>) latestValue;
409           for (String val : oldCollection) {
410             if (!newCollection.remove(val)) {
411               return true;
412             }
413           }
414           return !newCollection.isEmpty();
415         }
416       } catch (Exception exception) {
417         return false;
418       }
419     }
420
421     /**
422      * Dispatch notification.
423      *
424      * @throws Exception the exception
425      */
426     public void dispatchNotification() throws Exception {
427       Method method = null;
428       Vector<Object> parameters = null;
429       try {
430         Object latestValue;
431         if (isArray) {
432           latestValue = ConfigurationManager.lookup().getAsStringValues(tenant, namespace, key);
433         } else {
434           latestValue = ConfigurationManager.lookup().getAsString(tenant, namespace, key);
435         }
436         Method[] methods = myself.getClass().getDeclaredMethods();
437         if (methods != null && methods.length > 0) {
438           method = methods[0];
439           int paramCount = method.getParameterCount();
440           parameters = new Vector<>();
441           if (paramCount > 4) {
442             if (tenant.equals(Constants.DEFAULT_TENANT)) {
443               parameters.add(null);
444             } else {
445               parameters.add(tenant);
446             }
447           }
448           if (paramCount > 3) {
449             if (namespace.equals(Constants.DEFAULT_NAMESPACE)) {
450               parameters.add(null);
451             } else {
452               parameters.add(namespace);
453             }
454           }
455           parameters.add(key);
456           parameters.add(currentValue);
457           parameters.add(latestValue);
458           method.setAccessible(true);
459         }
460       } catch (Exception exception) {
461         exception.printStackTrace();
462       } finally {
463         isArray = ConfigurationUtils.isArray(tenant, namespace, key, Hint.DEFAULT.value());
464         if (isArray) {
465           currentValue = ConfigurationManager.lookup().getAsStringValues(tenant, namespace, key);
466         } else {
467           currentValue = ConfigurationManager.lookup().getAsString(tenant, namespace, key);
468         }
469         if (method != null && parameters != null) {
470           method.invoke(myself, parameters.toArray());
471         }
472       }
473     }
474   }
475 }