Fixes several sonarqube reported issues
[sdc.git] / common / onap-common-configuration-management / onap-configuration-management-core / src / main / java / org / onap / config / impl / ConfigurationChangeNotifier.java
1 package org.onap.config.impl;
2
3 import org.onap.config.ConfigurationUtils;
4 import org.onap.config.Constants;
5 import org.onap.config.api.ConfigurationChangeListener;
6 import org.onap.config.api.ConfigurationManager;
7 import org.onap.config.api.Hint;
8
9 import java.io.File;
10 import java.io.IOException;
11 import java.lang.management.ManagementFactory;
12 import java.lang.reflect.Method;
13 import java.nio.file.ClosedWatchServiceException;
14 import java.nio.file.FileSystems;
15 import java.nio.file.Path;
16 import java.nio.file.StandardWatchEventKinds;
17 import java.nio.file.WatchEvent;
18 import java.nio.file.WatchKey;
19 import java.nio.file.WatchService;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.LinkedHashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Objects;
29 import java.util.Set;
30 import java.util.Vector;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.ScheduledExecutorService;
34 import java.util.concurrent.TimeUnit;
35 import javax.management.JMX;
36 import javax.management.MBeanServerConnection;
37 import javax.management.ObjectName;
38
39
40 /**
41  * The type Configuration change notifier.
42  */
43 public final class ConfigurationChangeNotifier {
44
45   private final HashMap<String, List<NotificationData>> store = new HashMap<>();
46   private final ScheduledExecutorService executor =
47       Executors.newScheduledThreadPool(5, ConfigurationUtils.getThreadFactory());
48   private final ExecutorService notificationExecutor =
49       Executors.newCachedThreadPool(ConfigurationUtils.getThreadFactory());
50   private final Map<String, WatchService> watchServiceCollection =
51       Collections.synchronizedMap(new HashMap<>());
52
53   static {
54     if (!Thread.currentThread().getStackTrace()[2].getClassName()
55         .equals(ConfigurationImpl.class.getName())) {
56       throw new RuntimeException("Illegal access.");
57     }
58   }
59
60   /**
61    * Instantiates a new Configuration change notifier.
62    *
63    * @param inMemoryConfig the in memory config
64    */
65   public ConfigurationChangeNotifier(Map<String, AggregateConfiguration> inMemoryConfig) {
66     executor.scheduleWithFixedDelay(() -> this
67         .pollFilesystemAndUpdateConfigurationIfRequired(inMemoryConfig,
68             System.getProperty("config.location"), false), 1, 1, TimeUnit.MILLISECONDS);
69     executor.scheduleWithFixedDelay(() -> this
70         .pollFilesystemAndUpdateConfigurationIfRequired(inMemoryConfig,
71             System.getProperty("tenant.config.location"), true), 1, 1, TimeUnit.MILLISECONDS);
72     executor.scheduleWithFixedDelay(() -> this
73         .pollFilesystemAndUpdateNodeSpecificConfigurationIfRequired(
74             System.getProperty("node.config.location")), 1, 1, TimeUnit.MILLISECONDS);
75   }
76
77   /**
78    * Shutdown.
79    */
80   public void shutdown() {
81     for (WatchService watch : watchServiceCollection.values()) {
82       try {
83         watch.close();
84       } catch (IOException exception) {
85         //do nothing
86       }
87     }
88     executor.shutdownNow();
89   }
90
91   /**
92    * Poll filesystem and update configuration if required.
93    *
94    * @param inMemoryConfig   the in memory config
95    * @param location         the location
96    * @param isTenantLocation the is tenant location
97    */
98   public void pollFilesystemAndUpdateConfigurationIfRequired(
99       Map<String, AggregateConfiguration> inMemoryConfig, String location,
100       boolean isTenantLocation) {
101     try {
102       Set<Path> paths = watchForChange(location);
103       if (paths != null) {
104         for (Path path : paths) {
105           File file = path.toAbsolutePath().toFile();
106           String repositoryKey = null;
107           if (ConfigurationUtils.isConfig(file) && file.isFile()) {
108             if (isTenantLocation) {
109               Collection<File> tenantsRoot =
110                   ConfigurationUtils.getAllFiles(new File(location), false, true);
111               for (File tenantRoot : tenantsRoot) {
112                 if (file.getAbsolutePath().startsWith(tenantRoot.getAbsolutePath())) {
113                   repositoryKey = ConfigurationUtils.getConfigurationRepositoryKey(
114                       (tenantRoot.getName() + Constants.TENANT_NAMESPACE_SAPERATOR
115                           + ConfigurationUtils.getNamespace(file))
116                           .split(Constants.TENANT_NAMESPACE_SAPERATOR));
117                 }
118               }
119             } else {
120               repositoryKey = ConfigurationUtils.getConfigurationRepositoryKey(file);
121             }
122             AggregateConfiguration config = inMemoryConfig.get(repositoryKey);
123             if (config != null) {
124               LinkedHashMap origConfig = ConfigurationUtils.toMap(config.getFinalConfiguration());
125               config.addConfig(file);
126               LinkedHashMap latestConfig = ConfigurationUtils.toMap(config.getFinalConfiguration());
127               Map map = ConfigurationUtils.diff(origConfig, latestConfig);
128               String[] tenantNamespaceArray =
129                   repositoryKey.split(Constants.KEY_ELEMENTS_DELEMETER);
130               updateConfigurationValues(tenantNamespaceArray[0], tenantNamespaceArray[1], map);
131             }
132           } else {
133             for (String configKey : inMemoryConfig.keySet()) {
134               repositoryKey = configKey;
135               AggregateConfiguration config = inMemoryConfig.get(repositoryKey);
136               if (config.containsConfig(file)) {
137                 LinkedHashMap origConfig = ConfigurationUtils.toMap(config.getFinalConfiguration());
138                 config.removeConfig(file);
139                 LinkedHashMap latestConfig =
140                         ConfigurationUtils.toMap(config.getFinalConfiguration());
141                 Map map = ConfigurationUtils.diff(origConfig, latestConfig);
142                 String[] tenantNamespaceArray =
143                         repositoryKey.split(Constants.KEY_ELEMENTS_DELEMETER);
144                 updateConfigurationValues(tenantNamespaceArray[0], tenantNamespaceArray[1],
145                         map);
146               }
147             }
148           }
149         }
150       }
151     } catch (ClosedWatchServiceException exception) {
152       // do nothing.
153     } catch (Exception exception) {
154       exception.printStackTrace();
155     }
156   }
157
158   private void updateConfigurationValues(String tenant, String namespace, Map map)
159       throws Exception {
160     MBeanServerConnection mbsc = ManagementFactory.getPlatformMBeanServer();
161     ObjectName mbeanName = new ObjectName(Constants.MBEAN_NAME);
162     ConfigurationManager conf =
163         JMX.newMBeanProxy(mbsc, mbeanName, ConfigurationManager.class,
164             true);
165     conf.updateConfigurationValues(tenant, namespace, map);
166   }
167
168   /**
169    * Poll filesystem and update node specific configuration if required.
170    *
171    * @param location the location
172    */
173   public void pollFilesystemAndUpdateNodeSpecificConfigurationIfRequired(String location) {
174     try {
175       Set<Path> paths = watchForChange(location);
176       if (paths != null) {
177         for (Path path : paths) {
178           File file = path.toAbsolutePath().toFile();
179
180           if (ConfigurationUtils.isConfig(file)) {
181             String repositoryKey = ConfigurationUtils.getConfigurationRepositoryKey(file);
182             ConfigurationRepository.lookup().populateOverrideConfigurtaion(repositoryKey, file);
183           } else {
184             ConfigurationRepository.lookup().removeOverrideConfigurtaion(file);
185           }
186         }
187       }
188     } catch (Exception exception) {
189       exception.printStackTrace();
190     }
191   }
192
193   /**
194    * Notify changes towards.
195    *
196    * @param tenant    the tenant
197    * @param component the component
198    * @param key       the key
199    * @param myself    the myself
200    * @throws Exception the exception
201    */
202   public void notifyChangesTowards(String tenant, String component, String key,
203                                    ConfigurationChangeListener myself) throws Exception {
204     List<NotificationData> notificationList =
205         store.get(tenant + Constants.KEY_ELEMENTS_DELEMETER + component);
206     if (notificationList == null) {
207       notificationList = Collections.synchronizedList(new ArrayList<>());
208       store.put(tenant + Constants.KEY_ELEMENTS_DELEMETER + component, notificationList);
209       executor.scheduleWithFixedDelay(
210           () -> triggerScanning(tenant + Constants.KEY_ELEMENTS_DELEMETER + component), 1, 30000,
211           TimeUnit.MILLISECONDS);
212     }
213     notificationList.add(new NotificationData(tenant, component, key, myself));
214   }
215
216   /**
217    * Stop notification towards.
218    *
219    * @param tenant    the tenant
220    * @param component the component
221    * @param key       the key
222    * @param myself    the myself
223    * @throws Exception the exception
224    */
225   public void stopNotificationTowards(String tenant, String component, String key,
226                                       ConfigurationChangeListener myself) throws Exception {
227     List<NotificationData> notificationList =
228         store.get(tenant + Constants.KEY_ELEMENTS_DELEMETER + component);
229     if (notificationList != null) {
230       boolean removed =
231           notificationList.remove(new NotificationData(tenant, component, key, myself));
232       if (removed && notificationList.isEmpty()) {
233         store.remove(tenant + Constants.KEY_ELEMENTS_DELEMETER + component);
234       }
235     }
236
237   }
238
239   private void triggerScanning(String key) {
240     if (store.get(key) != null) {
241       notificationExecutor.submit(() -> scanForChanges(key));
242     } else {
243       throw new IllegalArgumentException("Notification service for " + key + " is suspended.");
244     }
245   }
246
247   private void scanForChanges(String key) {
248     List<NotificationData> list = store.get(key);
249     if (list != null) {
250       list.stream()
251               .filter(NotificationData::isChanged)
252               .forEach(notificationData -> notificationExecutor.submit(() -> sendNotification(notificationData)));
253     }
254   }
255
256   private void sendNotification(NotificationData notificationData) {
257     try {
258       notificationData.dispatchNotification();
259     } catch (Exception exception) {
260       exception.printStackTrace();
261     }
262   }
263
264   private Set<Path> watchForChange(String location) throws Exception {
265     if (location == null || location.trim().length() == 0) {
266       return Collections.emptySet();
267     }
268     File file = new File(location);
269     if (!file.exists()) {
270       return Collections.emptySet();
271     }
272     Path path = file.toPath();
273     Set<Path> toReturn = new HashSet<>();
274     try (final WatchService watchService = FileSystems.getDefault().newWatchService()) {
275       watchServiceCollection.put(location, watchService);
276       path.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY,
277           StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
278       for (File dir : ConfigurationUtils.getAllFiles(file, true, true)) {
279         dir.toPath().register(watchService, StandardWatchEventKinds.ENTRY_MODIFY,
280             StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
281       }
282       while (true) {
283         final WatchKey wk = watchService.take();
284         Thread.sleep(ConfigurationRepository.lookup()
285             .getConfigurationFor(Constants.DEFAULT_TENANT, Constants.DB_NAMESPACE)
286             .getLong("event.fetch.delay"));
287         for (WatchEvent<?> event : wk.pollEvents()) {
288           Object context = event.context();
289           if (context instanceof Path) {
290             File newFile = new File(((Path) wk.watchable()).toFile(), context.toString());
291             if (event.kind() == StandardWatchEventKinds.ENTRY_CREATE) {
292               if (newFile.isDirectory()) {
293                 newFile.toPath().register(watchService, StandardWatchEventKinds.ENTRY_MODIFY,
294                     StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
295                 continue;
296               }
297             } else if (event.kind() == StandardWatchEventKinds.ENTRY_MODIFY) {
298               if (newFile.isDirectory()) {
299                 continue;
300               }
301             }
302             toReturn.add(newFile.toPath());
303           }
304         }
305         if (toReturn.isEmpty()) {
306           continue;
307         }
308         break;
309       }
310     }
311     return toReturn;
312   }
313
314   /**
315    * The type Notification data.
316    */
317   static class NotificationData {
318
319     /**
320      * The Tenant.
321      */
322     final String tenant;
323     /**
324      * The Namespace.
325      */
326     final String namespace;
327     /**
328      * The Key.
329      */
330     final String key;
331     /**
332      * The Myself.
333      */
334     final ConfigurationChangeListener myself;
335     /**
336      * The Current value.
337      */
338     Object currentValue;
339     /**
340      * The Is array.
341      */
342     boolean isArray;
343
344     /**
345      * Instantiates a new Notification data.
346      *
347      * @param tenant    the tenant
348      * @param component the component
349      * @param key       the key
350      * @param myself    the myself
351      * @throws Exception the exception
352      */
353     public NotificationData(String tenant, String component, String key,
354                             ConfigurationChangeListener myself) throws Exception {
355       this.tenant = tenant;
356       this.namespace = component;
357       this.key = key;
358       this.myself = myself;
359       if (!ConfigurationRepository.lookup().getConfigurationFor(tenant, component)
360           .containsKey(key)) {
361         throw new RuntimeException("Key[" + key + "] not found.");
362       }
363       isArray = ConfigurationUtils.isArray(tenant, component, key, Hint.DEFAULT.value());
364       if (isArray) {
365         currentValue = ConfigurationManager.lookup().getAsStringValues(tenant, component, key);
366       } else {
367         currentValue = ConfigurationManager.lookup().getAsString(tenant, component, key);
368       }
369     }
370
371     @Override
372     public boolean equals(Object obj) {
373       if (!(obj instanceof NotificationData)) {
374         return false;
375       }
376       NotificationData nd = (NotificationData) obj;
377       return Objects.equals(tenant, nd.tenant)
378               && Objects.equals(namespace, nd.namespace)
379               && Objects.equals(key, nd.key)
380               && Objects.equals(myself, nd.myself)
381               && Objects.equals(currentValue, nd.currentValue) // it's either String or List<String>
382               && isArray == nd.isArray;
383     }
384
385     @Override
386     public int hashCode() {
387       return Objects.hash(tenant, namespace, key, myself, currentValue, isArray);
388     }
389
390     /**
391      * Is changed boolean.
392      *
393      * @return the boolean
394      */
395     public boolean isChanged() {
396       Object latestValue;
397       try {
398         if (isArray) {
399           latestValue = ConfigurationManager.lookup().getAsStringValues(tenant, namespace, key);
400         } else {
401           latestValue = ConfigurationManager.lookup().getAsString(tenant, namespace, key);
402         }
403         if (!isArray) {
404           return !currentValue.equals(latestValue);
405         } else {
406           Collection<String> oldCollection = (Collection<String>) currentValue;
407           Collection<String> newCollection = (Collection<String>) latestValue;
408           for (String val : oldCollection) {
409             if (!newCollection.remove(val)) {
410               return true;
411             }
412           }
413           return !newCollection.isEmpty();
414         }
415       } catch (Exception exception) {
416         return false;
417       }
418     }
419
420     /**
421      * Dispatch notification.
422      *
423      * @throws Exception the exception
424      */
425     public void dispatchNotification() throws Exception {
426       Method method = null;
427       Vector<Object> parameters = null;
428       try {
429         Object latestValue;
430         if (isArray) {
431           latestValue = ConfigurationManager.lookup().getAsStringValues(tenant, namespace, key);
432         } else {
433           latestValue = ConfigurationManager.lookup().getAsString(tenant, namespace, key);
434         }
435         Method[] methods = myself.getClass().getDeclaredMethods();
436         if (methods != null && methods.length > 0) {
437           method = methods[0];
438           int paramCount = method.getParameterCount();
439           parameters = new Vector<>();
440           if (paramCount > 4) {
441             if (tenant.equals(Constants.DEFAULT_TENANT)) {
442               parameters.add(null);
443             } else {
444               parameters.add(tenant);
445             }
446           }
447           if (paramCount > 3) {
448             if (namespace.equals(Constants.DEFAULT_NAMESPACE)) {
449               parameters.add(null);
450             } else {
451               parameters.add(namespace);
452             }
453           }
454           parameters.add(key);
455           parameters.add(currentValue);
456           parameters.add(latestValue);
457           method.setAccessible(true);
458         }
459       } catch (Exception exception) {
460         exception.printStackTrace();
461       } finally {
462         isArray = ConfigurationUtils.isArray(tenant, namespace, key, Hint.DEFAULT.value());
463         if (isArray) {
464           currentValue = ConfigurationManager.lookup().getAsStringValues(tenant, namespace, key);
465         } else {
466           currentValue = ConfigurationManager.lookup().getAsString(tenant, namespace, key);
467         }
468         if (method != null && parameters != null) {
469           method.invoke(myself, parameters.toArray());
470         }
471       }
472     }
473   }
474 }