2 * Copyright © 2016-2018 European Support Limited
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.config.impl;
20 import java.io.IOException;
21 import java.lang.management.ManagementFactory;
22 import java.lang.reflect.Method;
23 import java.nio.file.ClosedWatchServiceException;
24 import java.nio.file.FileSystems;
25 import java.nio.file.Path;
26 import java.nio.file.StandardWatchEventKinds;
27 import java.nio.file.WatchEvent;
28 import java.nio.file.WatchKey;
29 import java.nio.file.WatchService;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.Collections;
33 import java.util.HashMap;
34 import java.util.HashSet;
35 import java.util.LinkedHashMap;
36 import java.util.List;
38 import java.util.Objects;
40 import java.util.Vector;
41 import java.util.concurrent.ExecutorService;
42 import java.util.concurrent.Executors;
43 import java.util.concurrent.ScheduledExecutorService;
44 import java.util.concurrent.TimeUnit;
45 import javax.management.JMX;
46 import javax.management.MBeanServerConnection;
47 import javax.management.ObjectName;
48 import org.onap.config.ConfigurationUtils;
49 import org.onap.config.Constants;
50 import org.onap.config.api.ConfigurationChangeListener;
51 import org.onap.config.api.ConfigurationManager;
52 import org.onap.config.api.Hint;
55 public final class ConfigurationChangeNotifier {
58 if (!Thread.currentThread().getStackTrace()[2].getClassName().equals(ConfigurationImpl.class.getName())) {
59 throw new RuntimeException("Illegal access.");
63 private final HashMap<String, List<NotificationData>> store = new HashMap<>();
64 private final ScheduledExecutorService executor =
65 Executors.newScheduledThreadPool(5, ConfigurationUtils.getThreadFactory());
66 private final ExecutorService notificationExecutor =
67 Executors.newCachedThreadPool(ConfigurationUtils.getThreadFactory());
68 private final Map<String, WatchService> watchServiceCollection = Collections.synchronizedMap(new HashMap<>());
70 public ConfigurationChangeNotifier(Map<String, AggregateConfiguration> inMemoryConfig) {
71 executor.scheduleWithFixedDelay(() -> this.pollFilesystemAndUpdateConfigurationIfRequired(inMemoryConfig,
72 System.getProperty("config.location"), false), 1, 1, TimeUnit.MILLISECONDS);
73 executor.scheduleWithFixedDelay(() -> this.pollFilesystemAndUpdateConfigurationIfRequired(inMemoryConfig,
74 System.getProperty("tenant.config.location"), true), 1, 1, TimeUnit.MILLISECONDS);
75 executor.scheduleWithFixedDelay(() -> this.pollFilesystemAndUpdateNodeSpecificConfigurationIfRequired(
76 System.getProperty("node.config.location")), 1, 1, TimeUnit.MILLISECONDS);
79 public void pollFilesystemAndUpdateConfigurationIfRequired(Map<String, AggregateConfiguration> inMemoryConfig,
80 String location, boolean isTenantLocation) {
82 Set<Path> paths = watchForChange(location);
84 for (Path path : paths) {
85 File file = path.toAbsolutePath().toFile();
86 String repositoryKey = null;
87 if (ConfigurationUtils.isConfig(file) && file.isFile()) {
88 if (isTenantLocation) {
89 Collection<File> tenantsRoot =
90 ConfigurationUtils.getAllFiles(new File(location), false, true);
91 for (File tenantRoot : tenantsRoot) {
92 if (file.getAbsolutePath().startsWith(tenantRoot.getAbsolutePath())) {
93 repositoryKey = ConfigurationUtils.getConfigurationRepositoryKey(
94 (tenantRoot.getName() + Constants.TENANT_NAMESPACE_SEPARATOR
95 + ConfigurationUtils.getNamespace(file))
96 .split(Constants.TENANT_NAMESPACE_SEPARATOR));
100 repositoryKey = ConfigurationUtils.getConfigurationRepositoryKey(file);
102 AggregateConfiguration config = inMemoryConfig.get(repositoryKey);
103 if (config != null) {
104 LinkedHashMap origConfig = ConfigurationUtils.toMap(config.getFinalConfiguration());
105 config.addConfig(file);
106 LinkedHashMap latestConfig = ConfigurationUtils.toMap(config.getFinalConfiguration());
107 Map map = ConfigurationUtils.diff(origConfig, latestConfig);
108 String[] tenantNamespaceArray = repositoryKey.split(Constants.KEY_ELEMENTS_DELIMETER);
109 updateConfigurationValues(tenantNamespaceArray[0], tenantNamespaceArray[1], map);
112 for (String configKey : inMemoryConfig.keySet()) {
113 repositoryKey = configKey;
114 AggregateConfiguration config = inMemoryConfig.get(repositoryKey);
115 if (config.containsConfig(file)) {
116 LinkedHashMap origConfig = ConfigurationUtils.toMap(config.getFinalConfiguration());
117 config.removeConfig(file);
118 LinkedHashMap latestConfig = ConfigurationUtils.toMap(config.getFinalConfiguration());
119 Map map = ConfigurationUtils.diff(origConfig, latestConfig);
120 String[] tenantNamespaceArray = repositoryKey.split(Constants.KEY_ELEMENTS_DELIMETER);
121 updateConfigurationValues(tenantNamespaceArray[0], tenantNamespaceArray[1], map);
127 } catch (ClosedWatchServiceException exception) {
129 } catch (Exception exception) {
130 exception.printStackTrace();
134 public void pollFilesystemAndUpdateNodeSpecificConfigurationIfRequired(String location) {
136 Set<Path> paths = watchForChange(location);
138 for (Path path : paths) {
139 File file = path.toAbsolutePath().toFile();
141 if (ConfigurationUtils.isConfig(file)) {
142 String repositoryKey = ConfigurationUtils.getConfigurationRepositoryKey(file);
143 ConfigurationRepository.lookup().populateOverrideConfiguration(repositoryKey, file);
145 ConfigurationRepository.lookup().removeOverrideConfiguration(file);
149 } catch (Exception exception) {
150 exception.printStackTrace();
154 private Set<Path> watchForChange(String location) throws Exception {
155 if (location == null || location.trim().length() == 0) {
156 return Collections.emptySet();
158 File file = new File(location);
159 if (!file.exists()) {
160 return Collections.emptySet();
162 Path path = file.toPath();
163 Set<Path> toReturn = new HashSet<>();
164 try (final WatchService watchService = FileSystems.getDefault().newWatchService()) {
165 watchServiceCollection.put(location, watchService);
166 path.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_CREATE,
167 StandardWatchEventKinds.ENTRY_DELETE);
168 for (File dir : ConfigurationUtils.getAllFiles(file, true, true)) {
169 dir.toPath().register(watchService, StandardWatchEventKinds.ENTRY_MODIFY,
170 StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
173 final WatchKey wk = watchService.take();
174 Thread.sleep(ConfigurationRepository.lookup()
175 .getConfigurationFor(Constants.DEFAULT_TENANT, Constants.DB_NAMESPACE)
176 .getLong("event.fetch.delay"));
177 for (WatchEvent<?> event : wk.pollEvents()) {
178 Object context = event.context();
179 if (context instanceof Path) {
180 File newFile = new File(((Path) wk.watchable()).toFile(), context.toString());
181 if (event.kind() == StandardWatchEventKinds.ENTRY_CREATE) {
182 if (newFile.isDirectory()) {
183 newFile.toPath().register(watchService, StandardWatchEventKinds.ENTRY_MODIFY,
184 StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
187 } else if (event.kind() == StandardWatchEventKinds.ENTRY_MODIFY) {
188 if (newFile.isDirectory()) {
192 toReturn.add(newFile.toPath());
195 if (toReturn.isEmpty()) {
204 private void updateConfigurationValues(String tenant, String namespace, Map map) throws Exception {
205 MBeanServerConnection mbsc = ManagementFactory.getPlatformMBeanServer();
206 ObjectName mbeanName = new ObjectName(Constants.MBEAN_NAME);
207 ConfigurationManager conf = JMX.newMBeanProxy(mbsc, mbeanName, ConfigurationManager.class, true);
208 conf.updateConfigurationValues(tenant, namespace, map);
211 public void shutdown() {
212 for (WatchService watch : watchServiceCollection.values()) {
215 } catch (IOException exception) {
219 executor.shutdownNow();
222 public void notifyChangesTowards(String tenant, String component, String key, ConfigurationChangeListener myself)
224 List<NotificationData> notificationList = store.get(tenant + Constants.KEY_ELEMENTS_DELIMETER + component);
225 if (notificationList == null) {
226 notificationList = Collections.synchronizedList(new ArrayList<>());
227 store.put(tenant + Constants.KEY_ELEMENTS_DELIMETER + component, notificationList);
228 executor.scheduleWithFixedDelay(
229 () -> triggerScanning(tenant + Constants.KEY_ELEMENTS_DELIMETER + component), 1, 30000,
230 TimeUnit.MILLISECONDS);
232 notificationList.add(new NotificationData(tenant, component, key, myself));
235 private void triggerScanning(String key) {
236 if (store.get(key) != null) {
237 notificationExecutor.submit(() -> scanForChanges(key));
239 throw new IllegalArgumentException("Notification service for " + key + " is suspended.");
243 private void scanForChanges(String key) {
244 List<NotificationData> list = store.get(key);
246 list.stream().filter(NotificationData::isChanged)
247 .forEach(notificationData -> notificationExecutor.submit(() -> sendNotification(notificationData)));
251 private void sendNotification(NotificationData notificationData) {
253 notificationData.dispatchNotification();
254 } catch (Exception exception) {
255 exception.printStackTrace();
259 public void stopNotificationTowards(String tenant, String component, String key, ConfigurationChangeListener myself)
261 List<NotificationData> notificationList = store.get(tenant + Constants.KEY_ELEMENTS_DELIMETER + component);
262 if (notificationList != null) {
263 boolean removed = notificationList.remove(new NotificationData(tenant, component, key, myself));
264 if (removed && notificationList.isEmpty()) {
265 store.remove(tenant + Constants.KEY_ELEMENTS_DELIMETER + component);
271 static class NotificationData {
275 final String namespace;
279 final ConfigurationChangeListener myself;
285 public NotificationData(String tenant, String component, String key, ConfigurationChangeListener myself)
287 this.tenant = tenant;
288 this.namespace = component;
290 this.myself = myself;
291 if (!ConfigurationRepository.lookup().getConfigurationFor(tenant, component).containsKey(key)) {
292 throw new RuntimeException("Key[" + key + "] not found.");
294 isArray = ConfigurationUtils.isArray(tenant, component, key, Hint.DEFAULT.value());
296 currentValue = ConfigurationManager.lookup().getAsStringValues(tenant, component, key);
298 currentValue = ConfigurationManager.lookup().getAsString(tenant, component, key);
303 public int hashCode() {
304 return Objects.hash(tenant, namespace, key, myself, currentValue, isArray);
308 public boolean equals(Object obj) {
309 if (!(obj instanceof NotificationData)) {
312 NotificationData nd = (NotificationData) obj;
313 return Objects.equals(tenant, nd.tenant) && Objects.equals(namespace, nd.namespace) && Objects.equals(key,
314 nd.key) && Objects.equals(myself, nd.myself) && Objects.equals(currentValue, nd.currentValue)
315 // it's either String or List<String>
316 && isArray == nd.isArray;
319 public boolean isChanged() {
323 latestValue = ConfigurationManager.lookup().getAsStringValues(tenant, namespace, key);
325 latestValue = ConfigurationManager.lookup().getAsString(tenant, namespace, key);
328 return !currentValue.equals(latestValue);
330 Collection<String> oldCollection = (Collection<String>) currentValue;
331 Collection<String> newCollection = (Collection<String>) latestValue;
332 for (String val : oldCollection) {
333 if (!newCollection.remove(val)) {
337 return !newCollection.isEmpty();
339 } catch (Exception exception) {
344 public void dispatchNotification() throws Exception {
345 Method method = null;
346 Vector<Object> parameters = null;
350 latestValue = ConfigurationManager.lookup().getAsStringValues(tenant, namespace, key);
352 latestValue = ConfigurationManager.lookup().getAsString(tenant, namespace, key);
354 Method[] methods = myself.getClass().getDeclaredMethods();
355 if (methods != null && methods.length > 0) {
357 int paramCount = method.getParameterCount();
358 parameters = new Vector<>();
359 if (paramCount > 4) {
360 if (tenant.equals(Constants.DEFAULT_TENANT)) {
361 parameters.add(null);
363 parameters.add(tenant);
366 if (paramCount > 3) {
367 if (namespace.equals(Constants.DEFAULT_NAMESPACE)) {
368 parameters.add(null);
370 parameters.add(namespace);
374 parameters.add(currentValue);
375 parameters.add(latestValue);
376 method.setAccessible(true);
378 } catch (Exception exception) {
379 exception.printStackTrace();
381 isArray = ConfigurationUtils.isArray(tenant, namespace, key, Hint.DEFAULT.value());
383 currentValue = ConfigurationManager.lookup().getAsStringValues(tenant, namespace, key);
385 currentValue = ConfigurationManager.lookup().getAsString(tenant, namespace, key);
387 if (method != null && parameters != null) {
388 method.invoke(myself, parameters.toArray());