Change rule retrieval from CBS to ConfigMap 94/125994/1
authorGuangrong Fu <fu.guangrong@zte.com.cn>
Wed, 1 Dec 2021 06:24:40 +0000 (14:24 +0800)
committerGuangrong Fu <fu.guangrong@zte.com.cn>
Wed, 1 Dec 2021 06:24:40 +0000 (14:24 +0800)
Issue-ID: HOLMES-488
Signed-off-by: Guangrong Fu <fu.guangrong@zte.com.cn>
Change-Id: Ia2c29489b37feb729940ee807471ae448c1911cc

engine-d-standalone/pom.xml
engine-d/pom.xml
engine-d/src/main/java/org/onap/holmes/engine/EngineDActiveApp.java
engine-d/src/main/java/org/onap/holmes/engine/dcae/ConfigFileScanningTask.java [new file with mode: 0644]
engine-d/src/main/java/org/onap/holmes/engine/dcae/DcaeConfigurationPolling.java
engine-d/src/main/java/org/onap/holmes/engine/dmaap/DMaaPAlarmPolling.java
engine-d/src/main/java/org/onap/holmes/engine/dmaap/SubscriberAction.java
engine-d/src/test/java/org/onap/holmes/engine/dcae/ConfigFileScanningTaskTest.java [new file with mode: 0644]
engine-d/src/test/resources/cfy.json [new file with mode: 0644]
pom.xml

index 733a538..14d02b9 100644 (file)
@@ -22,7 +22,7 @@
     <parent>
         <groupId>org.onap.holmes.engine-management</groupId>
         <artifactId>holmes-engine-parent</artifactId>
-        <version>1.3.5-SNAPSHOT</version>
+        <version>1.3.6-SNAPSHOT</version>
     </parent>
 
     <artifactId>holmes-engine-d-standalone</artifactId>
index d210155..2bdb217 100644 (file)
@@ -22,7 +22,7 @@
     <parent>
         <groupId>org.onap.holmes.engine-management</groupId>
         <artifactId>holmes-engine-parent</artifactId>
-        <version>1.3.5-SNAPSHOT</version>
+        <version>1.3.6-SNAPSHOT</version>
     </parent>
 
     <artifactId>holmes-engine-d</artifactId>
index 37b5c07..3f1fe50 100644 (file)
@@ -17,11 +17,10 @@ package org.onap.holmes.engine;
 
 import io.dropwizard.setup.Environment;
 import lombok.extern.slf4j.Slf4j;
-import org.onap.holmes.common.config.MicroServiceConfig;
+import org.onap.holmes.common.ConfigFileScanner;
 import org.onap.holmes.common.dropwizard.ioc.bundle.IOCApplication;
-import org.onap.holmes.common.utils.CommonUtils;
 import org.onap.holmes.common.utils.transactionid.TransactionIdFilter;
-import org.onap.holmes.engine.dcae.DcaeConfigurationPolling;
+import org.onap.holmes.engine.dcae.ConfigFileScanningTask;
 
 import javax.servlet.DispatcherType;
 import java.util.EnumSet;
@@ -40,12 +39,12 @@ public class EngineDActiveApp extends IOCApplication<EngineDAppConfig> {
     public void run(EngineDAppConfig configuration, Environment environment) throws Exception {
         super.run(configuration, environment);
 
-        if (!"1".equals(System.getenv("TESTING"))) {
-            ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
-            service.scheduleAtFixedRate(
-                    new DcaeConfigurationPolling(CommonUtils.getEnv(MicroServiceConfig.HOSTNAME)), 0,
-                    DcaeConfigurationPolling.POLLING_PERIOD, TimeUnit.MILLISECONDS);
-        }
+
+        ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
+        service.scheduleAtFixedRate(
+                new ConfigFileScanningTask(new ConfigFileScanner()), 60L,
+                ConfigFileScanningTask.POLLING_PERIOD, TimeUnit.SECONDS);
+
 
         environment.servlets().addFilter("logFilter", new TransactionIdFilter()).addMappingForUrlPatterns(EnumSet
                 .allOf(DispatcherType.class), true, "/*");
diff --git a/engine-d/src/main/java/org/onap/holmes/engine/dcae/ConfigFileScanningTask.java b/engine-d/src/main/java/org/onap/holmes/engine/dcae/ConfigFileScanningTask.java
new file mode 100644 (file)
index 0000000..2835a45
--- /dev/null
@@ -0,0 +1,86 @@
+/**
+ * Copyright 2021 ZTE Corporation.
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.holmes.engine.dcae;
+
+import org.onap.holmes.common.ConfigFileScanner;
+import org.onap.holmes.common.dcae.DcaeConfigurationsCache;
+import org.onap.holmes.common.dcae.entity.DcaeConfigurations;
+import org.onap.holmes.common.dcae.utils.DcaeConfigurationParser;
+import org.onap.holmes.common.dropwizard.ioc.utils.ServiceLocatorHolder;
+import org.onap.holmes.common.exception.CorrelationException;
+import org.onap.holmes.common.utils.Md5Util;
+import org.onap.holmes.dsa.dmaappolling.Subscriber;
+import org.onap.holmes.engine.dmaap.SubscriberAction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class ConfigFileScanningTask implements Runnable {
+    final public static long POLLING_PERIOD = 60L;
+    final private static Logger LOGGER = LoggerFactory.getLogger(ConfigFileScanningTask.class);
+    private String configFile = "/opt/hemtopics/cfy.json";
+    private ConfigFileScanner configFileScanner;
+    private String prevConfigMd5 = Md5Util.md5(null);
+
+    public ConfigFileScanningTask(ConfigFileScanner configFileScanner) {
+        this.configFileScanner = configFileScanner;
+    }
+
+    @Override
+    public void run() {
+        if (null == configFileScanner) {
+            configFileScanner = new ConfigFileScanner();
+        }
+        Map<String, String> newConfig = configFileScanner.scan(configFile);
+        String md5 = Md5Util.md5(newConfig);
+        if (!prevConfigMd5.equals(md5)) {
+            LOGGER.info("Configurations have changed.");
+            prevConfigMd5 = md5;
+        } else {
+            return;
+        }
+
+        for (Map.Entry entry : newConfig.entrySet()) {
+            DcaeConfigurations dcaeConfigurations = null;
+            try {
+                dcaeConfigurations = DcaeConfigurationParser.parse(entry.getValue().toString());
+            } catch (CorrelationException e) {
+                LOGGER.error(e.getMessage(), e);
+            } catch (Exception e) {
+                LOGGER.warn("Failed to deal with the new configurations.", e);
+            }
+
+            if (dcaeConfigurations != null) {
+                DcaeConfigurationsCache.setDcaeConfigurations(dcaeConfigurations);
+                addSubscribers(dcaeConfigurations);
+            }
+        }
+    }
+
+    private void addSubscribers(DcaeConfigurations dcaeConfigurations) {
+        SubscriberAction subscriberAction = ServiceLocatorHolder.getLocator()
+                .getService(SubscriberAction.class);
+        for (String key : dcaeConfigurations.getSubKeys()) {
+            Subscriber subscriber = new Subscriber();
+            subscriber.setTopic(key);
+            subscriber.setUrl(dcaeConfigurations.getSubSecInfo(key).getDmaapInfo()
+                    .getTopicUrl());
+            subscriberAction.addSubscriber(subscriber);
+        }
+    }
+}
index 4da1740..15d77d6 100644 (file)
@@ -1,5 +1,5 @@
 /**
- * Copyright 2017 ZTE Corporation.
+ * Copyright 2017 - 2021 ZTE Corporation.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@ import org.onap.holmes.dsa.dmaappolling.Subscriber;
 import org.onap.holmes.engine.dmaap.SubscriberAction;
 
 @Slf4j
+@Deprecated
 public class DcaeConfigurationPolling implements Runnable {
 
     private String hostname;
index 99ba3d7..e2ad89c 100644 (file)
@@ -44,7 +44,7 @@ public class DMaaPAlarmPolling implements Runnable {
 
     public void run() {
         while (isAlive) {
-            List<VesAlarm> vesAlarmList = new ArrayList<>();
+            List<VesAlarm> vesAlarmList;
             try {
                 vesAlarmList = subscriber.subscribe();
                 vesAlarmList.forEach(vesAlarm -> {
index c96b813..1297f11 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2017 ZTE Corporation.
+ * Copyright 2017 - 2021 ZTE Corporation.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -39,13 +39,18 @@ public class SubscriberAction {
 
     public synchronized void addSubscriber(Subscriber subscriber) {
         String topic = subscriber.getTopic();
-        if (topic != null && !pollingTasks.containsKey(topic)) {
+        if (topic != null) {
+            if (pollingTasks.containsKey(topic)) {
+                removeSubscriber(subscriber);
+            }
             AlarmInfoDao alarmInfoDao = daoUtil.getJdbiDaoByOnDemand(AlarmInfoDao.class);
             DMaaPAlarmPolling pollingTask = new DMaaPAlarmPolling(subscriber, droolsEngine, alarmInfoDao);
             Thread thread = new Thread(pollingTask);
             thread.start();
             pollingTasks.put(topic, pollingTask);
-            log.info("Subscribe to topic: " + subscriber.getUrl());
+            log.info("Subscribed to topic: " + subscriber.getUrl());
+        } else {
+            log.info("The topic is null. Operation aborted.");
         }
     }
 
diff --git a/engine-d/src/test/java/org/onap/holmes/engine/dcae/ConfigFileScanningTaskTest.java b/engine-d/src/test/java/org/onap/holmes/engine/dcae/ConfigFileScanningTaskTest.java
new file mode 100644 (file)
index 0000000..b663133
--- /dev/null
@@ -0,0 +1,82 @@
+/**
+ * Copyright 2021 ZTE Corporation.
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.holmes.engine.dcae;
+
+import org.easymock.EasyMock;
+import org.glassfish.hk2.api.ServiceLocator;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.onap.holmes.common.dcae.DcaeConfigurationsCache;
+import org.onap.holmes.common.dropwizard.ioc.utils.ServiceLocatorHolder;
+import org.onap.holmes.dsa.dmaappolling.DMaaPResponseUtil;
+import org.onap.holmes.dsa.dmaappolling.Subscriber;
+import org.onap.holmes.engine.dmaap.SubscriberAction;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+import static org.hamcrest.core.IsEqual.equalTo;
+import static org.junit.Assert.assertThat;
+
+@RunWith(PowerMockRunner.class)
+public class ConfigFileScanningTaskTest {
+
+    @Test
+    public void run() {
+        ServiceLocator mockedSl = PowerMock.createMock(ServiceLocator.class);
+        SubscriberAction mockedSa = PowerMock.createMock(SubscriberAction.class);
+        ServiceLocatorHolder.setLocator(mockedSl);
+        EasyMock.expect(mockedSl.getService(SubscriberAction.class)).andReturn(mockedSa);
+        // This is invoked while executing new Subscriber().
+        EasyMock.expect(mockedSl.getService(DMaaPResponseUtil.class)).andReturn(new DMaaPResponseUtil());
+        mockedSa.addSubscriber(EasyMock.anyObject(Subscriber.class));
+        EasyMock.expectLastCall();
+
+        ConfigFileScanningTask cfst = new ConfigFileScanningTask(null);
+        String configFilePath = ConfigFileScanningTaskTest.class.getResource("/cfy.json").getFile();
+        Whitebox.setInternalState(cfst, "configFile", configFilePath);
+
+        PowerMock.replayAll();
+        cfst.run();
+        PowerMock.verifyAll();
+
+        assertThat(DcaeConfigurationsCache.getPubSecInfo("dcae_cl_out").getDmaapInfo().getTopicUrl(),
+                equalTo("http://message-router.onap:3904/events/unauthenticated.DCAE_CL_OUTPUT"));
+    }
+
+    @Test
+    public void run_config_not_changed() {
+        ServiceLocator mockedSl = PowerMock.createMock(ServiceLocator.class);
+        SubscriberAction mockedSa = PowerMock.createMock(SubscriberAction.class);
+        ServiceLocatorHolder.setLocator(mockedSl);
+        // mocked objects will be only used once
+        EasyMock.expect(mockedSl.getService(SubscriberAction.class)).andReturn(mockedSa);
+        // This is invoked while executing new Subscriber().
+        EasyMock.expect(mockedSl.getService(DMaaPResponseUtil.class)).andReturn(new DMaaPResponseUtil());
+        mockedSa.addSubscriber(EasyMock.anyObject(Subscriber.class));
+        EasyMock.expectLastCall();
+
+        ConfigFileScanningTask cfst = new ConfigFileScanningTask(null);
+        String configFilePath = ConfigFileScanningTaskTest.class.getResource("/cfy.json").getFile();
+        Whitebox.setInternalState(cfst, "configFile", configFilePath);
+
+        PowerMock.replayAll();
+        cfst.run();
+        cfst.run();
+        PowerMock.verifyAll();
+    }
+}
\ No newline at end of file
diff --git a/engine-d/src/test/resources/cfy.json b/engine-d/src/test/resources/cfy.json
new file mode 100644 (file)
index 0000000..dfa58b0
--- /dev/null
@@ -0,0 +1,19 @@
+{
+  "services_calls": {},
+  "streams_publishes": {
+    "dcae_cl_out": {
+      "dmaap_info": {
+        "topic_url": "http://message-router.onap:3904/events/unauthenticated.DCAE_CL_OUTPUT"
+      },
+      "type": "message_router"
+    }
+  },
+  "streams_subscribes": {
+    "ves_fault": {
+      "dmaap_info": {
+        "topic_url": "http://message-router.onap:3904/events/unauthenticated.SEC_FAULT_OUTPUT"
+      },
+      "type": "message_router"
+    }
+  }
+}
diff --git a/pom.xml b/pom.xml
index 5df7e65..fa6b755 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -26,7 +26,7 @@
 
     <groupId>org.onap.holmes.engine-management</groupId>
     <artifactId>holmes-engine-parent</artifactId>
-    <version>1.3.5-SNAPSHOT</version>
+    <version>1.3.6-SNAPSHOT</version>
     <packaging>pom</packaging>
     <name>holmes-engine-management</name>
     <modules>
         <dependency>
             <groupId>org.onap.holmes.common</groupId>
             <artifactId>holmes-actions</artifactId>
-            <version>1.3.6</version>
+            <version>1.3.7</version>
             <exclusions>
                 <exclusion>
                     <groupId>io.dropwizard</groupId>