[SDC-29] Amdocs OnBoard 1707 initial commit.
[sdc.git] / common / openecomp-common-configuration-management / openecomp-configuration-management-core / src / main / java / org / openecomp / config / impl / ConfigurationChangeNotifier.java
1 package org.openecomp.config.impl;
2
3 import static org.openecomp.config.ConfigurationUtils.isArray;
4
5 import org.openecomp.config.ConfigurationUtils;
6 import org.openecomp.config.Constants;
7 import org.openecomp.config.api.ConfigurationChangeListener;
8 import org.openecomp.config.api.ConfigurationManager;
9 import org.openecomp.config.api.Hint;
10
11 import java.io.File;
12 import java.io.IOException;
13 import java.lang.management.ManagementFactory;
14 import java.lang.reflect.Method;
15 import java.nio.file.ClosedWatchServiceException;
16 import java.nio.file.FileSystems;
17 import java.nio.file.Path;
18 import java.nio.file.StandardWatchEventKinds;
19 import java.nio.file.WatchEvent;
20 import java.nio.file.WatchKey;
21 import java.nio.file.WatchService;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.Iterator;
28 import java.util.LinkedHashMap;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Set;
32 import java.util.Vector;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.ScheduledExecutorService;
36 import java.util.concurrent.TimeUnit;
37 import javax.management.JMX;
38 import javax.management.MBeanServerConnection;
39 import javax.management.ObjectName;
40
41
42 /**
43  * The type Configuration change notifier.
44  */
45 public final class ConfigurationChangeNotifier {
46
47   private HashMap<String, List<NotificationData>> store = new HashMap<>();
48   private ScheduledExecutorService executor =
49       Executors.newScheduledThreadPool(5, ConfigurationUtils.getThreadFactory());
50   private ExecutorService notificationExcecutor =
51       Executors.newCachedThreadPool(ConfigurationUtils.getThreadFactory());
52   private Map<String, WatchService> watchServiceCollection =
53       Collections.synchronizedMap(new HashMap<>());
54
55   {
56     if (!Thread.currentThread().getStackTrace()[2].getClassName()
57         .equals(ConfigurationImpl.class.getName())) {
58       throw new RuntimeException("Illegal access.");
59     }
60   }
61
62   /**
63    * Instantiates a new Configuration change notifier.
64    *
65    * @param inMemoryConfig the in memory config
66    */
67   public ConfigurationChangeNotifier(Map<String, AggregateConfiguration> inMemoryConfig) {
68     executor.scheduleWithFixedDelay(() -> this
69         .pollFilesystemAndUpdateConfigurationIfREquired(inMemoryConfig,
70             System.getProperty("config.location"), false), 1, 1, TimeUnit.MILLISECONDS);
71     executor.scheduleWithFixedDelay(() -> this
72         .pollFilesystemAndUpdateConfigurationIfREquired(inMemoryConfig,
73             System.getProperty("tenant.config.location"), true), 1, 1, TimeUnit.MILLISECONDS);
74     executor.scheduleWithFixedDelay(() -> this
75         .pollFilesystemAndUpdateNodeSpecificConfigurationIfREquired(
76             System.getProperty("node.config.location")), 1, 1, TimeUnit.MILLISECONDS);
77   }
78
79   /**
80    * Shutdown.
81    */
82   public void shutdown() {
83     for (WatchService watch : watchServiceCollection.values()) {
84       try {
85         watch.close();
86       } catch (IOException exception) {
87         //do nothing
88       }
89     }
90     executor.shutdownNow();
91   }
92
93   /**
94    * Poll filesystem and update configuration if r equired.
95    *
96    * @param inMemoryConfig   the in memory config
97    * @param location         the location
98    * @param isTenantLocation the is tenant location
99    */
100   public void pollFilesystemAndUpdateConfigurationIfREquired(
101       Map<String, AggregateConfiguration> inMemoryConfig, String location,
102       boolean isTenantLocation) {
103     try {
104       Set<Path> paths = watchForChange(location);
105       if (paths != null) {
106         for (Path path : paths) {
107           File file = path.toAbsolutePath().toFile();
108           String repositoryKey = null;
109           if (ConfigurationUtils.isConfig(file) && file.isFile()) {
110             if (isTenantLocation) {
111               Collection<File> tenantsRoot =
112                   ConfigurationUtils.getAllFiles(new File(location), false, true);
113               for (File tenantRoot : tenantsRoot) {
114                 if (file.getAbsolutePath().startsWith(tenantRoot.getAbsolutePath())) {
115                   repositoryKey = ConfigurationUtils.getConfigurationRepositoryKey(
116                       (tenantRoot.getName() + Constants.TENANT_NAMESPACE_SAPERATOR
117                           + ConfigurationUtils.getNamespace(file))
118                           .split(Constants.TENANT_NAMESPACE_SAPERATOR));
119                 }
120               }
121             } else {
122               repositoryKey = ConfigurationUtils.getConfigurationRepositoryKey(file);
123             }
124             AggregateConfiguration config = inMemoryConfig.get(repositoryKey);
125             if (config != null) {
126               LinkedHashMap origConfig = ConfigurationUtils.toMap(config.getFinalConfiguration());
127               config.addConfig(file);
128               LinkedHashMap latestConfig = ConfigurationUtils.toMap(config.getFinalConfiguration());
129               Map map = ConfigurationUtils.diff(origConfig, latestConfig);
130               String[] tenantNamespaceArray =
131                   repositoryKey.split(Constants.KEY_ELEMENTS_DELEMETER);
132               updateConfigurationValues(tenantNamespaceArray[0], tenantNamespaceArray[1], map);
133             }
134           } else {
135             Iterator<String> repoKeys = inMemoryConfig.keySet().iterator();
136             while (repoKeys.hasNext()) {
137               repositoryKey = repoKeys.next();
138               AggregateConfiguration config = inMemoryConfig.get(repositoryKey);
139               if (config.containsConfig(file)) {
140                 LinkedHashMap origConfig = ConfigurationUtils.toMap(config.getFinalConfiguration());
141                 config.removeConfig(file);
142                 LinkedHashMap latestConfig =
143                     ConfigurationUtils.toMap(config.getFinalConfiguration());
144                 Map map = ConfigurationUtils.diff(origConfig, latestConfig);
145                 String[] tenantNamespaceArray =
146                     repositoryKey.split(Constants.KEY_ELEMENTS_DELEMETER);
147                 updateConfigurationValues(tenantNamespaceArray[0], tenantNamespaceArray[1],
148                     map);
149               }
150             }
151           }
152         }
153       }
154     } catch (ClosedWatchServiceException exception) {
155       // do nothing.
156     } catch (Exception exception) {
157       exception.printStackTrace();
158     }
159   }
160
161   private void updateConfigurationValues(String tenant, String namespace, Map map)
162       throws Exception {
163     MBeanServerConnection mbsc = ManagementFactory.getPlatformMBeanServer();
164     ObjectName mbeanName = new ObjectName(Constants.MBEAN_NAME);
165     ConfigurationManager conf =
166         JMX.newMBeanProxy(mbsc, mbeanName, org.openecomp.config.api.ConfigurationManager.class,
167             true);
168     conf.updateConfigurationValues(tenant, namespace, map);
169   }
170
171   /**
172    * Poll filesystem and update node specific configuration if r equired.
173    *
174    * @param location the location
175    */
176   public void pollFilesystemAndUpdateNodeSpecificConfigurationIfREquired(String location) {
177     try {
178       Set<Path> paths = watchForChange(location);
179       if (paths != null) {
180         for (Path path : paths) {
181           File file = path.toAbsolutePath().toFile();
182           String repositoryKey = null;
183           if (ConfigurationUtils.isConfig(file)) {
184             repositoryKey = ConfigurationUtils.getConfigurationRepositoryKey(file);
185             ConfigurationRepository.lookup().populateOverrideConfigurtaion(repositoryKey, file);
186           } else {
187             ConfigurationRepository.lookup().removeOverrideConfigurtaion(file);
188           }
189         }
190       }
191     } catch (Throwable exception) {
192       exception.printStackTrace();
193     }
194   }
195
196   /**
197    * Notify changes towards.
198    *
199    * @param tenant    the tenant
200    * @param component the component
201    * @param key       the key
202    * @param myself    the myself
203    * @throws Exception the exception
204    */
205   public void notifyChangesTowards(String tenant, String component, String key,
206                                    ConfigurationChangeListener myself) throws Exception {
207     List<NotificationData> notificationList =
208         store.get(tenant + Constants.KEY_ELEMENTS_DELEMETER + component);
209     if (notificationList == null) {
210       notificationList = Collections.synchronizedList(new ArrayList<NotificationData>());
211       store.put(tenant + Constants.KEY_ELEMENTS_DELEMETER + component, notificationList);
212       executor.scheduleWithFixedDelay(
213           () -> triggerScanning(tenant + Constants.KEY_ELEMENTS_DELEMETER + component), 1, 30000,
214           TimeUnit.MILLISECONDS);
215     }
216     notificationList.add(new NotificationData(tenant, component, key, myself));
217   }
218
219   /**
220    * Stop notification towards.
221    *
222    * @param tenant    the tenant
223    * @param component the component
224    * @param key       the key
225    * @param myself    the myself
226    * @throws Exception the exception
227    */
228   public void stopNotificationTowards(String tenant, String component, String key,
229                                       ConfigurationChangeListener myself) throws Exception {
230     List<NotificationData> notificationList =
231         store.get(tenant + Constants.KEY_ELEMENTS_DELEMETER + component);
232     if (notificationList != null) {
233       boolean removed =
234           notificationList.remove(new NotificationData(tenant, component, key, myself));
235       if (removed && notificationList.isEmpty()) {
236         store.remove(tenant + Constants.KEY_ELEMENTS_DELEMETER + component);
237       }
238     }
239
240   }
241
242   private void triggerScanning(String key) {
243     if (store.get(key) != null) {
244       notificationExcecutor.submit(() -> scanForChanges(key));
245     } else {
246       throw new IllegalArgumentException("Notification service for " + key + " is suspended.");
247     }
248   }
249
250   private void scanForChanges(String key) {
251     List<NotificationData> list = store.get(key);
252     if (list != null) {
253       int size = list.size();
254       for (int i = 0; i < size; i++) {
255         NotificationData notificationData = list.get(i);
256         if (notificationData.isChanged()) {
257           notificationExcecutor.submit(() -> sendNotification(notificationData));
258         }
259       }
260     }
261   }
262
263   private void sendNotification(NotificationData notificationData) {
264     try {
265       notificationData.dispatchNotification();
266     } catch (Exception exception) {
267       exception.printStackTrace();
268     }
269   }
270
271   private Set<Path> watchForChange(String location) throws Exception {
272     if (location == null || location.trim().length() == 0) {
273       return null;
274     }
275     File file = new File(location);
276     if (!file.exists()) {
277       return null;
278     }
279     Path path = file.toPath();
280     Set<Path> toReturn = new HashSet<>();
281     try (final WatchService watchService = FileSystems.getDefault().newWatchService()) {
282       watchServiceCollection.put(location, watchService);
283       path.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY,
284           StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
285       for (File dir : ConfigurationUtils.getAllFiles(file, true, true)) {
286         dir.toPath().register(watchService, StandardWatchEventKinds.ENTRY_MODIFY,
287             StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
288       }
289       while (true) {
290         final WatchKey wk = watchService.take();
291         Thread.sleep(ConfigurationRepository.lookup()
292             .getConfigurationFor(Constants.DEFAULT_TENANT, Constants.DB_NAMESPACE)
293             .getLong("event.fetch.delay"));
294         for (WatchEvent<?> event : wk.pollEvents()) {
295           Object context = event.context();
296           if (context instanceof Path) {
297             File newFile = new File(((Path) wk.watchable()).toFile(), context.toString());
298             if (event.kind() == StandardWatchEventKinds.ENTRY_CREATE) {
299               if (newFile.isDirectory()) {
300                 newFile.toPath().register(watchService, StandardWatchEventKinds.ENTRY_MODIFY,
301                     StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
302                 continue;
303               }
304             } else if (event.kind() == StandardWatchEventKinds.ENTRY_MODIFY) {
305               if (newFile.isDirectory()) {
306                 continue;
307               }
308             }
309             toReturn.add(newFile.toPath());
310           }
311         }
312         if (toReturn.isEmpty()) {
313           continue;
314         }
315         break;
316       }
317     }
318     return toReturn;
319   }
320
321   /**
322    * The type Notification data.
323    */
324   class NotificationData {
325
326     /**
327      * The Tenant.
328      */
329     String tenant;
330     /**
331      * The Namespace.
332      */
333     String namespace;
334     /**
335      * The Key.
336      */
337     String key;
338     /**
339      * The Myself.
340      */
341     ConfigurationChangeListener myself;
342     /**
343      * The Current value.
344      */
345     Object currentValue;
346     /**
347      * The Is array.
348      */
349     boolean isArray;
350
351     /**
352      * Instantiates a new Notification data.
353      *
354      * @param tenant    the tenant
355      * @param component the component
356      * @param key       the key
357      * @param myself    the myself
358      * @throws Exception the exception
359      */
360     public NotificationData(String tenant, String component, String key,
361                             ConfigurationChangeListener myself) throws Exception {
362       this.tenant = tenant;
363       this.namespace = component;
364       this.key = key;
365       this.myself = myself;
366       if (!ConfigurationRepository.lookup().getConfigurationFor(tenant, component)
367           .containsKey(key)) {
368         throw new RuntimeException("Key[" + key + "] not found.");
369       }
370       isArray = isArray(tenant, component, key, Hint.DEFAULT.value());
371       if (isArray) {
372         currentValue = ConfigurationManager.lookup().getAsStringValues(tenant, component, key);
373       } else {
374         currentValue = ConfigurationManager.lookup().getAsString(tenant, component, key);
375       }
376     }
377
378     @Override
379     public boolean equals(Object obj) {
380       if (!(obj instanceof NotificationData)) {
381         return false;
382       }
383       NotificationData nd = (NotificationData) obj;
384       return tenant.equals(nd.tenant) && namespace.equals(nd.namespace) && key.equals(nd.key)
385           && myself.equals(nd.myself);
386     }
387
388     /**
389      * Is changed boolean.
390      *
391      * @return the boolean
392      */
393     public boolean isChanged() {
394       Object latestValue;
395       try {
396         if (isArray) {
397           latestValue = ConfigurationManager.lookup().getAsStringValues(tenant, namespace, key);
398         } else {
399           latestValue = ConfigurationManager.lookup().getAsString(tenant, namespace, key);
400         }
401         if (!isArray) {
402           return !currentValue.equals(latestValue);
403         } else {
404           Collection<String> oldCollection = (Collection<String>) currentValue;
405           Collection<String> newCollection = (Collection<String>) latestValue;
406           for (String val : oldCollection) {
407             if (!newCollection.remove(val)) {
408               return true;
409             }
410           }
411           return !newCollection.isEmpty();
412         }
413       } catch (Exception exception) {
414         return false;
415       }
416     }
417
418     /**
419      * Dispatch notification.
420      *
421      * @throws Exception the exception
422      */
423     public void dispatchNotification() throws Exception {
424       Method method = null;
425       Vector<Object> parameters = null;
426       try {
427         Object latestValue = null;
428         if (isArray) {
429           latestValue = ConfigurationManager.lookup().getAsStringValues(tenant, namespace, key);
430         } else {
431           latestValue = ConfigurationManager.lookup().getAsString(tenant, namespace, key);
432         }
433         Method[] methods = myself.getClass().getDeclaredMethods();
434         if (methods != null && methods.length > 0) {
435           method = methods[0];
436           int paramCount = method.getParameterCount();
437           parameters = new Vector<>();
438           if (paramCount > 4) {
439             if (tenant.equals(Constants.DEFAULT_TENANT)) {
440               parameters.add(null);
441             } else {
442               parameters.add(tenant);
443             }
444           }
445           if (paramCount > 3) {
446             if (namespace.equals(Constants.DEFAULT_NAMESPACE)) {
447               parameters.add(null);
448             } else {
449               parameters.add(namespace);
450             }
451           }
452           parameters.add(key);
453           parameters.add(currentValue);
454           parameters.add(latestValue);
455           method.setAccessible(true);
456         }
457       } catch (Exception exception) {
458         exception.printStackTrace();
459       } finally {
460         isArray = isArray(tenant, namespace, key, Hint.DEFAULT.value());
461         if (isArray) {
462           currentValue = ConfigurationManager.lookup().getAsStringValues(tenant, namespace, key);
463         } else {
464           currentValue = ConfigurationManager.lookup().getAsString(tenant, namespace, key);
465         }
466         if (method != null && parameters != null) {
467           method.invoke(myself, parameters.toArray());
468         }
469       }
470     }
471   }
472 }