3922720d48c872218918548cfff445cbbb7d2dff
[sdc.git] / common / openecomp-common-configuration-management / openecomp-configuration-management-core / src / main / java / org / openecomp / config / impl / ConfigurationChangeNotifier.java
1 package org.openecomp.config.impl;
2
3 import static org.openecomp.config.ConfigurationUtils.isArray;
4
5 import org.openecomp.config.ConfigurationUtils;
6 import org.openecomp.config.Constants;
7 import org.openecomp.config.api.ConfigurationChangeListener;
8 import org.openecomp.config.api.ConfigurationManager;
9 import org.openecomp.config.api.Hint;
10
11 import java.io.File;
12 import java.io.IOException;
13 import java.lang.management.ManagementFactory;
14 import java.lang.reflect.Method;
15 import java.nio.file.ClosedWatchServiceException;
16 import java.nio.file.FileSystems;
17 import java.nio.file.Path;
18 import java.nio.file.StandardWatchEventKinds;
19 import java.nio.file.WatchEvent;
20 import java.nio.file.WatchKey;
21 import java.nio.file.WatchService;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.LinkedHashMap;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Objects;
31 import java.util.Set;
32 import java.util.Vector;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.ScheduledExecutorService;
36 import java.util.concurrent.TimeUnit;
37 import javax.management.JMX;
38 import javax.management.MBeanServerConnection;
39 import javax.management.ObjectName;
40
41
42 /**
43  * The type Configuration change notifier.
44  */
45 public final class ConfigurationChangeNotifier {
46
47   private final HashMap<String, List<NotificationData>> store = new HashMap<>();
48   private final ScheduledExecutorService executor =
49       Executors.newScheduledThreadPool(5, ConfigurationUtils.getThreadFactory());
50   private final ExecutorService notificationExecutor =
51       Executors.newCachedThreadPool(ConfigurationUtils.getThreadFactory());
52   private final Map<String, WatchService> watchServiceCollection =
53       Collections.synchronizedMap(new HashMap<>());
54
55   static {
56     if (!Thread.currentThread().getStackTrace()[2].getClassName()
57         .equals(ConfigurationImpl.class.getName())) {
58       throw new RuntimeException("Illegal access.");
59     }
60   }
61
62   /**
63    * Instantiates a new Configuration change notifier.
64    *
65    * @param inMemoryConfig the in memory config
66    */
67   public ConfigurationChangeNotifier(Map<String, AggregateConfiguration> inMemoryConfig) {
68     executor.scheduleWithFixedDelay(() -> this
69         .pollFilesystemAndUpdateConfigurationIfRequired(inMemoryConfig,
70             System.getProperty("config.location"), false), 1, 1, TimeUnit.MILLISECONDS);
71     executor.scheduleWithFixedDelay(() -> this
72         .pollFilesystemAndUpdateConfigurationIfRequired(inMemoryConfig,
73             System.getProperty("tenant.config.location"), true), 1, 1, TimeUnit.MILLISECONDS);
74     executor.scheduleWithFixedDelay(() -> this
75         .pollFilesystemAndUpdateNodeSpecificConfigurationIfRequired(
76             System.getProperty("node.config.location")), 1, 1, TimeUnit.MILLISECONDS);
77   }
78
79   /**
80    * Shutdown.
81    */
82   public void shutdown() {
83     for (WatchService watch : watchServiceCollection.values()) {
84       try {
85         watch.close();
86       } catch (IOException exception) {
87         //do nothing
88       }
89     }
90     executor.shutdownNow();
91   }
92
93   /**
94    * Poll filesystem and update configuration if required.
95    *
96    * @param inMemoryConfig   the in memory config
97    * @param location         the location
98    * @param isTenantLocation the is tenant location
99    */
100   public void pollFilesystemAndUpdateConfigurationIfRequired(
101       Map<String, AggregateConfiguration> inMemoryConfig, String location,
102       boolean isTenantLocation) {
103     try {
104       Set<Path> paths = watchForChange(location);
105       if (paths != null) {
106         for (Path path : paths) {
107           File file = path.toAbsolutePath().toFile();
108           String repositoryKey = null;
109           if (ConfigurationUtils.isConfig(file) && file.isFile()) {
110             if (isTenantLocation) {
111               Collection<File> tenantsRoot =
112                   ConfigurationUtils.getAllFiles(new File(location), false, true);
113               for (File tenantRoot : tenantsRoot) {
114                 if (file.getAbsolutePath().startsWith(tenantRoot.getAbsolutePath())) {
115                   repositoryKey = ConfigurationUtils.getConfigurationRepositoryKey(
116                       (tenantRoot.getName() + Constants.TENANT_NAMESPACE_SAPERATOR
117                           + ConfigurationUtils.getNamespace(file))
118                           .split(Constants.TENANT_NAMESPACE_SAPERATOR));
119                 }
120               }
121             } else {
122               repositoryKey = ConfigurationUtils.getConfigurationRepositoryKey(file);
123             }
124             AggregateConfiguration config = inMemoryConfig.get(repositoryKey);
125             if (config != null) {
126               LinkedHashMap origConfig = ConfigurationUtils.toMap(config.getFinalConfiguration());
127               config.addConfig(file);
128               LinkedHashMap latestConfig = ConfigurationUtils.toMap(config.getFinalConfiguration());
129               Map map = ConfigurationUtils.diff(origConfig, latestConfig);
130               String[] tenantNamespaceArray =
131                   repositoryKey.split(Constants.KEY_ELEMENTS_DELEMETER);
132               updateConfigurationValues(tenantNamespaceArray[0], tenantNamespaceArray[1], map);
133             }
134           } else {
135             for (String configKey : inMemoryConfig.keySet()) {
136               repositoryKey = configKey;
137               AggregateConfiguration config = inMemoryConfig.get(repositoryKey);
138               if (config.containsConfig(file)) {
139                 LinkedHashMap origConfig = ConfigurationUtils.toMap(config.getFinalConfiguration());
140                 config.removeConfig(file);
141                 LinkedHashMap latestConfig =
142                         ConfigurationUtils.toMap(config.getFinalConfiguration());
143                 Map map = ConfigurationUtils.diff(origConfig, latestConfig);
144                 String[] tenantNamespaceArray =
145                         repositoryKey.split(Constants.KEY_ELEMENTS_DELEMETER);
146                 updateConfigurationValues(tenantNamespaceArray[0], tenantNamespaceArray[1],
147                         map);
148               }
149             }
150           }
151         }
152       }
153     } catch (ClosedWatchServiceException exception) {
154       // do nothing.
155     } catch (Exception exception) {
156       exception.printStackTrace();
157     }
158   }
159
160   private void updateConfigurationValues(String tenant, String namespace, Map map)
161       throws Exception {
162     MBeanServerConnection mbsc = ManagementFactory.getPlatformMBeanServer();
163     ObjectName mbeanName = new ObjectName(Constants.MBEAN_NAME);
164     ConfigurationManager conf =
165         JMX.newMBeanProxy(mbsc, mbeanName, org.openecomp.config.api.ConfigurationManager.class,
166             true);
167     conf.updateConfigurationValues(tenant, namespace, map);
168   }
169
170   /**
171    * Poll filesystem and update node specific configuration if required.
172    *
173    * @param location the location
174    */
175   public void pollFilesystemAndUpdateNodeSpecificConfigurationIfRequired(String location) {
176     try {
177       Set<Path> paths = watchForChange(location);
178       if (paths != null) {
179         for (Path path : paths) {
180           File file = path.toAbsolutePath().toFile();
181
182           if (ConfigurationUtils.isConfig(file)) {
183             String repositoryKey = ConfigurationUtils.getConfigurationRepositoryKey(file);
184             ConfigurationRepository.lookup().populateOverrideConfigurtaion(repositoryKey, file);
185           } else {
186             ConfigurationRepository.lookup().removeOverrideConfigurtaion(file);
187           }
188         }
189       }
190     } catch (Exception exception) {
191       exception.printStackTrace();
192     }
193   }
194
195   /**
196    * Notify changes towards.
197    *
198    * @param tenant    the tenant
199    * @param component the component
200    * @param key       the key
201    * @param myself    the myself
202    * @throws Exception the exception
203    */
204   public void notifyChangesTowards(String tenant, String component, String key,
205                                    ConfigurationChangeListener myself) throws Exception {
206     List<NotificationData> notificationList =
207         store.get(tenant + Constants.KEY_ELEMENTS_DELEMETER + component);
208     if (notificationList == null) {
209       notificationList = Collections.synchronizedList(new ArrayList<NotificationData>());
210       store.put(tenant + Constants.KEY_ELEMENTS_DELEMETER + component, notificationList);
211       executor.scheduleWithFixedDelay(
212           () -> triggerScanning(tenant + Constants.KEY_ELEMENTS_DELEMETER + component), 1, 30000,
213           TimeUnit.MILLISECONDS);
214     }
215     notificationList.add(new NotificationData(tenant, component, key, myself));
216   }
217
218   /**
219    * Stop notification towards.
220    *
221    * @param tenant    the tenant
222    * @param component the component
223    * @param key       the key
224    * @param myself    the myself
225    * @throws Exception the exception
226    */
227   public void stopNotificationTowards(String tenant, String component, String key,
228                                       ConfigurationChangeListener myself) throws Exception {
229     List<NotificationData> notificationList =
230         store.get(tenant + Constants.KEY_ELEMENTS_DELEMETER + component);
231     if (notificationList != null) {
232       boolean removed =
233           notificationList.remove(new NotificationData(tenant, component, key, myself));
234       if (removed && notificationList.isEmpty()) {
235         store.remove(tenant + Constants.KEY_ELEMENTS_DELEMETER + component);
236       }
237     }
238
239   }
240
241   private void triggerScanning(String key) {
242     if (store.get(key) != null) {
243       notificationExecutor.submit(() -> scanForChanges(key));
244     } else {
245       throw new IllegalArgumentException("Notification service for " + key + " is suspended.");
246     }
247   }
248
249   private void scanForChanges(String key) {
250     List<NotificationData> list = store.get(key);
251     if (list != null) {
252       list.stream()
253               .filter(NotificationData::isChanged)
254               .forEach(notificationData -> notificationExecutor.submit(() -> sendNotification(notificationData)));
255     }
256   }
257
258   private void sendNotification(NotificationData notificationData) {
259     try {
260       notificationData.dispatchNotification();
261     } catch (Exception exception) {
262       exception.printStackTrace();
263     }
264   }
265
266   private Set<Path> watchForChange(String location) throws Exception {
267     if (location == null || location.trim().length() == 0) {
268       return null;
269     }
270     File file = new File(location);
271     if (!file.exists()) {
272       return null;
273     }
274     Path path = file.toPath();
275     Set<Path> toReturn = new HashSet<>();
276     try (final WatchService watchService = FileSystems.getDefault().newWatchService()) {
277       watchServiceCollection.put(location, watchService);
278       path.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY,
279           StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
280       for (File dir : ConfigurationUtils.getAllFiles(file, true, true)) {
281         dir.toPath().register(watchService, StandardWatchEventKinds.ENTRY_MODIFY,
282             StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
283       }
284       while (true) {
285         final WatchKey wk = watchService.take();
286         Thread.sleep(ConfigurationRepository.lookup()
287             .getConfigurationFor(Constants.DEFAULT_TENANT, Constants.DB_NAMESPACE)
288             .getLong("event.fetch.delay"));
289         for (WatchEvent<?> event : wk.pollEvents()) {
290           Object context = event.context();
291           if (context instanceof Path) {
292             File newFile = new File(((Path) wk.watchable()).toFile(), context.toString());
293             if (event.kind() == StandardWatchEventKinds.ENTRY_CREATE) {
294               if (newFile.isDirectory()) {
295                 newFile.toPath().register(watchService, StandardWatchEventKinds.ENTRY_MODIFY,
296                     StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
297                 continue;
298               }
299             } else if (event.kind() == StandardWatchEventKinds.ENTRY_MODIFY) {
300               if (newFile.isDirectory()) {
301                 continue;
302               }
303             }
304             toReturn.add(newFile.toPath());
305           }
306         }
307         if (toReturn.isEmpty()) {
308           continue;
309         }
310         break;
311       }
312     }
313     return toReturn;
314   }
315
316   /**
317    * The type Notification data.
318    */
319   static class NotificationData {
320
321     /**
322      * The Tenant.
323      */
324     final String tenant;
325     /**
326      * The Namespace.
327      */
328     final String namespace;
329     /**
330      * The Key.
331      */
332     final String key;
333     /**
334      * The Myself.
335      */
336     final ConfigurationChangeListener myself;
337     /**
338      * The Current value.
339      */
340     Object currentValue;
341     /**
342      * The Is array.
343      */
344     boolean isArray;
345
346     /**
347      * Instantiates a new Notification data.
348      *
349      * @param tenant    the tenant
350      * @param component the component
351      * @param key       the key
352      * @param myself    the myself
353      * @throws Exception the exception
354      */
355     public NotificationData(String tenant, String component, String key,
356                             ConfigurationChangeListener myself) throws Exception {
357       this.tenant = tenant;
358       this.namespace = component;
359       this.key = key;
360       this.myself = myself;
361       if (!ConfigurationRepository.lookup().getConfigurationFor(tenant, component)
362           .containsKey(key)) {
363         throw new RuntimeException("Key[" + key + "] not found.");
364       }
365       isArray = isArray(tenant, component, key, Hint.DEFAULT.value());
366       if (isArray) {
367         currentValue = ConfigurationManager.lookup().getAsStringValues(tenant, component, key);
368       } else {
369         currentValue = ConfigurationManager.lookup().getAsString(tenant, component, key);
370       }
371     }
372
373     @Override
374     public boolean equals(Object obj) {
375       if (!(obj instanceof NotificationData)) {
376         return false;
377       }
378       NotificationData nd = (NotificationData) obj;
379       return Objects.equals(tenant, nd.tenant)
380               && Objects.equals(namespace, nd.namespace)
381               && Objects.equals(key, nd.key)
382               && Objects.equals(myself, nd.myself)
383               && Objects.equals(currentValue, nd.currentValue) // it's either String or List<String>
384               && isArray == nd.isArray;
385     }
386
387     @Override
388     public int hashCode() {
389       return Objects.hash(tenant, namespace, key, myself, currentValue, isArray);
390     }
391
392     /**
393      * Is changed boolean.
394      *
395      * @return the boolean
396      */
397     public boolean isChanged() {
398       Object latestValue;
399       try {
400         if (isArray) {
401           latestValue = ConfigurationManager.lookup().getAsStringValues(tenant, namespace, key);
402         } else {
403           latestValue = ConfigurationManager.lookup().getAsString(tenant, namespace, key);
404         }
405         if (!isArray) {
406           return !currentValue.equals(latestValue);
407         } else {
408           Collection<String> oldCollection = (Collection<String>) currentValue;
409           Collection<String> newCollection = (Collection<String>) latestValue;
410           for (String val : oldCollection) {
411             if (!newCollection.remove(val)) {
412               return true;
413             }
414           }
415           return !newCollection.isEmpty();
416         }
417       } catch (Exception exception) {
418         return false;
419       }
420     }
421
422     /**
423      * Dispatch notification.
424      *
425      * @throws Exception the exception
426      */
427     public void dispatchNotification() throws Exception {
428       Method method = null;
429       Vector<Object> parameters = null;
430       try {
431         Object latestValue;
432         if (isArray) {
433           latestValue = ConfigurationManager.lookup().getAsStringValues(tenant, namespace, key);
434         } else {
435           latestValue = ConfigurationManager.lookup().getAsString(tenant, namespace, key);
436         }
437         Method[] methods = myself.getClass().getDeclaredMethods();
438         if (methods != null && methods.length > 0) {
439           method = methods[0];
440           int paramCount = method.getParameterCount();
441           parameters = new Vector<>();
442           if (paramCount > 4) {
443             if (tenant.equals(Constants.DEFAULT_TENANT)) {
444               parameters.add(null);
445             } else {
446               parameters.add(tenant);
447             }
448           }
449           if (paramCount > 3) {
450             if (namespace.equals(Constants.DEFAULT_NAMESPACE)) {
451               parameters.add(null);
452             } else {
453               parameters.add(namespace);
454             }
455           }
456           parameters.add(key);
457           parameters.add(currentValue);
458           parameters.add(latestValue);
459           method.setAccessible(true);
460         }
461       } catch (Exception exception) {
462         exception.printStackTrace();
463       } finally {
464         isArray = isArray(tenant, namespace, key, Hint.DEFAULT.value());
465         if (isArray) {
466           currentValue = ConfigurationManager.lookup().getAsStringValues(tenant, namespace, key);
467         } else {
468           currentValue = ConfigurationManager.lookup().getAsString(tenant, namespace, key);
469         }
470         if (method != null && parameters != null) {
471           method.invoke(myself, parameters.toArray());
472         }
473       }
474     }
475   }
476 }