cd6481f97bc8292954ba0ac8683cb3a2ef8cecff
[sdc.git] / common / onap-common-configuration-management / onap-configuration-management-core / src / main / java / org / onap / config / impl / ConfigurationChangeNotifier.java
1 /*
2  * Copyright © 2016-2018 European Support Limited
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.config.impl;
18
19 import java.io.File;
20 import java.io.IOException;
21 import java.lang.management.ManagementFactory;
22 import java.lang.reflect.Method;
23 import java.nio.file.ClosedWatchServiceException;
24 import java.nio.file.FileSystems;
25 import java.nio.file.Path;
26 import java.nio.file.StandardWatchEventKinds;
27 import java.nio.file.WatchEvent;
28 import java.nio.file.WatchKey;
29 import java.nio.file.WatchService;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.Collections;
33 import java.util.HashMap;
34 import java.util.HashSet;
35 import java.util.LinkedHashMap;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.Objects;
39 import java.util.Set;
40 import java.util.Vector;
41 import java.util.concurrent.ExecutorService;
42 import java.util.concurrent.Executors;
43 import java.util.concurrent.ScheduledExecutorService;
44 import java.util.concurrent.TimeUnit;
45 import javax.management.JMX;
46 import javax.management.MBeanServerConnection;
47 import javax.management.ObjectName;
48 import org.onap.config.ConfigurationUtils;
49 import org.onap.config.Constants;
50 import org.onap.config.api.ConfigurationChangeListener;
51 import org.onap.config.api.ConfigurationManager;
52 import org.onap.config.api.Hint;
53
54
55 public final class ConfigurationChangeNotifier {
56
57     static {
58         if (!Thread.currentThread().getStackTrace()[2].getClassName().equals(ConfigurationImpl.class.getName())) {
59             throw new RuntimeException("Illegal access.");
60         }
61     }
62
63     private final HashMap<String, List<NotificationData>> store = new HashMap<>();
64     private final ScheduledExecutorService executor =
65             Executors.newScheduledThreadPool(5, ConfigurationUtils.getThreadFactory());
66     private final ExecutorService notificationExecutor =
67             Executors.newCachedThreadPool(ConfigurationUtils.getThreadFactory());
68     private final Map<String, WatchService> watchServiceCollection = Collections.synchronizedMap(new HashMap<>());
69
70     public ConfigurationChangeNotifier(Map<String, AggregateConfiguration> inMemoryConfig) {
71         executor.scheduleWithFixedDelay(() -> this.pollFilesystemAndUpdateConfigurationIfRequired(inMemoryConfig,
72                 System.getProperty("config.location"), false), 1, 1, TimeUnit.MILLISECONDS);
73         executor.scheduleWithFixedDelay(() -> this.pollFilesystemAndUpdateConfigurationIfRequired(inMemoryConfig,
74                 System.getProperty("tenant.config.location"), true), 1, 1, TimeUnit.MILLISECONDS);
75         executor.scheduleWithFixedDelay(() -> this.pollFilesystemAndUpdateNodeSpecificConfigurationIfRequired(
76                 System.getProperty("node.config.location")), 1, 1, TimeUnit.MILLISECONDS);
77     }
78
79     public void pollFilesystemAndUpdateConfigurationIfRequired(Map<String, AggregateConfiguration> inMemoryConfig,
80             String location, boolean isTenantLocation) {
81         try {
82             Set<Path> paths = watchForChange(location);
83             if (paths != null) {
84                 for (Path path : paths) {
85                     File file = path.toAbsolutePath().toFile();
86                     String repositoryKey = null;
87                     if (ConfigurationUtils.isConfig(file) && file.isFile()) {
88                         if (isTenantLocation) {
89                             Collection<File> tenantsRoot =
90                                     ConfigurationUtils.getAllFiles(new File(location), false, true);
91                             for (File tenantRoot : tenantsRoot) {
92                                 if (file.getAbsolutePath().startsWith(tenantRoot.getAbsolutePath())) {
93                                     repositoryKey = ConfigurationUtils.getConfigurationRepositoryKey(
94                                             (tenantRoot.getName() + Constants.TENANT_NAMESPACE_SEPARATOR
95                                                      + ConfigurationUtils.getNamespace(file))
96                                                     .split(Constants.TENANT_NAMESPACE_SEPARATOR));
97                                 }
98                             }
99                         } else {
100                             repositoryKey = ConfigurationUtils.getConfigurationRepositoryKey(file);
101                         }
102                         AggregateConfiguration config = inMemoryConfig.get(repositoryKey);
103                         if (config != null) {
104                             LinkedHashMap origConfig = ConfigurationUtils.toMap(config.getFinalConfiguration());
105                             config.addConfig(file);
106                             LinkedHashMap latestConfig = ConfigurationUtils.toMap(config.getFinalConfiguration());
107                             Map map = ConfigurationUtils.diff(origConfig, latestConfig);
108                             String[] tenantNamespaceArray = repositoryKey.split(Constants.KEY_ELEMENTS_DELIMETER);
109                             updateConfigurationValues(tenantNamespaceArray[0], tenantNamespaceArray[1], map);
110                         }
111                     } else {
112                         for (String configKey : inMemoryConfig.keySet()) {
113                             repositoryKey = configKey;
114                             AggregateConfiguration config = inMemoryConfig.get(repositoryKey);
115                             if (config.containsConfig(file)) {
116                                 LinkedHashMap origConfig = ConfigurationUtils.toMap(config.getFinalConfiguration());
117                                 config.removeConfig(file);
118                                 LinkedHashMap latestConfig = ConfigurationUtils.toMap(config.getFinalConfiguration());
119                                 Map map = ConfigurationUtils.diff(origConfig, latestConfig);
120                                 String[] tenantNamespaceArray = repositoryKey.split(Constants.KEY_ELEMENTS_DELIMETER);
121                                 updateConfigurationValues(tenantNamespaceArray[0], tenantNamespaceArray[1], map);
122                             }
123                         }
124                     }
125                 }
126             }
127         } catch (ClosedWatchServiceException exception) {
128             // do nothing.
129         } catch (Exception exception) {
130             exception.printStackTrace();
131         }
132     }
133
134     public void pollFilesystemAndUpdateNodeSpecificConfigurationIfRequired(String location) {
135         try {
136             Set<Path> paths = watchForChange(location);
137             if (paths != null) {
138                 for (Path path : paths) {
139                     File file = path.toAbsolutePath().toFile();
140
141                     if (ConfigurationUtils.isConfig(file)) {
142                         String repositoryKey = ConfigurationUtils.getConfigurationRepositoryKey(file);
143                         ConfigurationRepository.lookup().populateOverrideConfiguration(repositoryKey, file);
144                     } else {
145                         ConfigurationRepository.lookup().removeOverrideConfiguration(file);
146                     }
147                 }
148             }
149         } catch (Exception exception) {
150             exception.printStackTrace();
151         }
152     }
153
154     private Set<Path> watchForChange(String location) throws Exception {
155         if (location == null || location.trim().length() == 0) {
156             return Collections.emptySet();
157         }
158         File file = new File(location);
159         if (!file.exists()) {
160             return Collections.emptySet();
161         }
162         Path path = file.toPath();
163         Set<Path> toReturn = new HashSet<>();
164         try (final WatchService watchService = FileSystems.getDefault().newWatchService()) {
165             watchServiceCollection.put(location, watchService);
166             path.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_CREATE,
167                     StandardWatchEventKinds.ENTRY_DELETE);
168             for (File dir : ConfigurationUtils.getAllFiles(file, true, true)) {
169                 dir.toPath().register(watchService, StandardWatchEventKinds.ENTRY_MODIFY,
170                         StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
171             }
172             while (true) {
173                 final WatchKey wk = watchService.take();
174                 Thread.sleep(ConfigurationRepository.lookup()
175                                      .getConfigurationFor(Constants.DEFAULT_TENANT, Constants.DB_NAMESPACE)
176                                      .getLong("event.fetch.delay"));
177                 for (WatchEvent<?> event : wk.pollEvents()) {
178                     Object context = event.context();
179                     if (context instanceof Path) {
180                         File newFile = new File(((Path) wk.watchable()).toFile(), context.toString());
181                         if (event.kind() == StandardWatchEventKinds.ENTRY_CREATE) {
182                             if (newFile.isDirectory()) {
183                                 newFile.toPath().register(watchService, StandardWatchEventKinds.ENTRY_MODIFY,
184                                         StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
185                                 continue;
186                             }
187                         } else if (event.kind() == StandardWatchEventKinds.ENTRY_MODIFY) {
188                             if (newFile.isDirectory()) {
189                                 continue;
190                             }
191                         }
192                         toReturn.add(newFile.toPath());
193                     }
194                 }
195                 if (toReturn.isEmpty()) {
196                     continue;
197                 }
198                 break;
199             }
200         }
201         return toReturn;
202     }
203
204     private void updateConfigurationValues(String tenant, String namespace, Map map) throws Exception {
205         MBeanServerConnection mbsc = ManagementFactory.getPlatformMBeanServer();
206         ObjectName mbeanName = new ObjectName(Constants.MBEAN_NAME);
207         ConfigurationManager conf = JMX.newMBeanProxy(mbsc, mbeanName, ConfigurationManager.class, true);
208         conf.updateConfigurationValues(tenant, namespace, map);
209     }
210
211     public void shutdown() {
212         for (WatchService watch : watchServiceCollection.values()) {
213             try {
214                 watch.close();
215             } catch (IOException exception) {
216                 //do nothing
217             }
218         }
219         executor.shutdownNow();
220     }
221
222     public void notifyChangesTowards(String tenant, String component, String key, ConfigurationChangeListener myself)
223             throws Exception {
224         List<NotificationData> notificationList = store.get(tenant + Constants.KEY_ELEMENTS_DELIMETER + component);
225         if (notificationList == null) {
226             notificationList = Collections.synchronizedList(new ArrayList<>());
227             store.put(tenant + Constants.KEY_ELEMENTS_DELIMETER + component, notificationList);
228             executor.scheduleWithFixedDelay(
229                     () -> triggerScanning(tenant + Constants.KEY_ELEMENTS_DELIMETER + component), 1, 30000,
230                     TimeUnit.MILLISECONDS);
231         }
232         notificationList.add(new NotificationData(tenant, component, key, myself));
233     }
234
235     private void triggerScanning(String key) {
236         if (store.get(key) != null) {
237             notificationExecutor.submit(() -> scanForChanges(key));
238         } else {
239             throw new IllegalArgumentException("Notification service for " + key + " is suspended.");
240         }
241     }
242
243     private void scanForChanges(String key) {
244         List<NotificationData> list = store.get(key);
245         if (list != null) {
246             list.stream().filter(NotificationData::isChanged)
247                     .forEach(notificationData -> notificationExecutor.submit(() -> sendNotification(notificationData)));
248         }
249     }
250
251     private void sendNotification(NotificationData notificationData) {
252         try {
253             notificationData.dispatchNotification();
254         } catch (Exception exception) {
255             exception.printStackTrace();
256         }
257     }
258
259     public void stopNotificationTowards(String tenant, String component, String key, ConfigurationChangeListener myself)
260             throws Exception {
261         List<NotificationData> notificationList = store.get(tenant + Constants.KEY_ELEMENTS_DELIMETER + component);
262         if (notificationList != null) {
263             boolean removed = notificationList.remove(new NotificationData(tenant, component, key, myself));
264             if (removed && notificationList.isEmpty()) {
265                 store.remove(tenant + Constants.KEY_ELEMENTS_DELIMETER + component);
266             }
267         }
268
269     }
270
271     static class NotificationData {
272
273         final String tenant;
274
275         final String namespace;
276
277         final String key;
278
279         final ConfigurationChangeListener myself;
280
281         Object currentValue;
282
283         boolean isArray;
284
285         public NotificationData(String tenant, String component, String key, ConfigurationChangeListener myself)
286                 throws Exception {
287             this.tenant = tenant;
288             this.namespace = component;
289             this.key = key;
290             this.myself = myself;
291             if (!ConfigurationRepository.lookup().getConfigurationFor(tenant, component).containsKey(key)) {
292                 throw new RuntimeException("Key[" + key + "] not found.");
293             }
294             isArray = ConfigurationUtils.isArray(tenant, component, key, Hint.DEFAULT.value());
295             if (isArray) {
296                 currentValue = ConfigurationManager.lookup().getAsStringValues(tenant, component, key);
297             } else {
298                 currentValue = ConfigurationManager.lookup().getAsString(tenant, component, key);
299             }
300         }
301
302         @Override
303         public int hashCode() {
304             return Objects.hash(tenant, namespace, key, myself, currentValue, isArray);
305         }
306
307         @Override
308         public boolean equals(Object obj) {
309             if (!(obj instanceof NotificationData)) {
310                 return false;
311             }
312             NotificationData nd = (NotificationData) obj;
313             return Objects.equals(tenant, nd.tenant) && Objects.equals(namespace, nd.namespace) && Objects.equals(key,
314                     nd.key) && Objects.equals(myself, nd.myself) && Objects.equals(currentValue, nd.currentValue)
315                            // it's either String or List<String>
316                            && isArray == nd.isArray;
317         }
318
319         public boolean isChanged() {
320             Object latestValue;
321             try {
322                 if (isArray) {
323                     latestValue = ConfigurationManager.lookup().getAsStringValues(tenant, namespace, key);
324                 } else {
325                     latestValue = ConfigurationManager.lookup().getAsString(tenant, namespace, key);
326                 }
327                 if (!isArray) {
328                     return !currentValue.equals(latestValue);
329                 } else {
330                     Collection<String> oldCollection = (Collection<String>) currentValue;
331                     Collection<String> newCollection = (Collection<String>) latestValue;
332                     for (String val : oldCollection) {
333                         if (!newCollection.remove(val)) {
334                             return true;
335                         }
336                     }
337                     return !newCollection.isEmpty();
338                 }
339             } catch (Exception exception) {
340                 return false;
341             }
342         }
343
344         public void dispatchNotification() throws Exception {
345             Method method = null;
346             Vector<Object> parameters = null;
347             try {
348                 Object latestValue;
349                 if (isArray) {
350                     latestValue = ConfigurationManager.lookup().getAsStringValues(tenant, namespace, key);
351                 } else {
352                     latestValue = ConfigurationManager.lookup().getAsString(tenant, namespace, key);
353                 }
354                 Method[] methods = myself.getClass().getDeclaredMethods();
355                 if (methods != null && methods.length > 0) {
356                     method = methods[0];
357                     int paramCount = method.getParameterCount();
358                     parameters = new Vector<>();
359                     if (paramCount > 4) {
360                         if (tenant.equals(Constants.DEFAULT_TENANT)) {
361                             parameters.add(null);
362                         } else {
363                             parameters.add(tenant);
364                         }
365                     }
366                     if (paramCount > 3) {
367                         if (namespace.equals(Constants.DEFAULT_NAMESPACE)) {
368                             parameters.add(null);
369                         } else {
370                             parameters.add(namespace);
371                         }
372                     }
373                     parameters.add(key);
374                     parameters.add(currentValue);
375                     parameters.add(latestValue);
376                     method.setAccessible(true);
377                 }
378             } catch (Exception exception) {
379                 exception.printStackTrace();
380             } finally {
381                 isArray = ConfigurationUtils.isArray(tenant, namespace, key, Hint.DEFAULT.value());
382                 if (isArray) {
383                     currentValue = ConfigurationManager.lookup().getAsStringValues(tenant, namespace, key);
384                 } else {
385                     currentValue = ConfigurationManager.lookup().getAsString(tenant, namespace, key);
386                 }
387                 if (method != null && parameters != null) {
388                     method.invoke(myself, parameters.toArray());
389                 }
390             }
391         }
392     }
393 }