modify not publish messages to DMaaP 59/18859/2
authorShiwei Tian <tian.shiwei@zte.com.cn>
Sat, 14 Oct 2017 02:45:09 +0000 (10:45 +0800)
committerShiwei Tian <tian.shiwei@zte.com.cn>
Sat, 14 Oct 2017 02:56:39 +0000 (10:56 +0800)
Issue-ID: HOLMES-71

Change-Id: Iaf87f0250d043aae84e9afc6aaec0f6cdc0a529c
Signed-off-by: Shiwei Tian <tian.shiwei@zte.com.cn>
engine-d/src/main/java/org/onap/holmes/engine/EngineDActiveApp.java
engine-d/src/main/java/org/onap/holmes/engine/dcaepolling/DcaeConfigurationPolling.java [new file with mode: 0644]
engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/SubscriberAction.java
engine-d/src/main/java/org/onap/holmes/engine/request/DeployRuleRequest.java
engine-d/src/main/java/org/onap/holmes/engine/resources/EngineResources.java
engine-d/src/test/java/org/onap/holmes/engine/resources/EngineResourcesTest.java

index 5bb3dbe..69e4c7a 100644 (file)
@@ -20,11 +20,15 @@ import static jdk.nashorn.internal.runtime.regexp.joni.Config.log;
 import io.dropwizard.setup.Environment;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.onap.holmes.common.config.MicroServiceConfig;
 import org.onap.holmes.common.dropwizard.ioc.bundle.IOCApplication;
 import org.onap.holmes.common.exception.CorrelationException;
 import org.onap.holmes.common.utils.MSBRegisterUtil;
+import org.onap.holmes.engine.dcaepolling.DcaeConfigurationPolling;
 import org.onap.holmes.engine.resources.EngineResources;
 import org.onap.msb.sdk.discovery.entity.MicroServiceInfo;
 import org.onap.msb.sdk.discovery.entity.Node;
@@ -40,6 +44,11 @@ public class EngineDActiveApp extends IOCApplication<EngineDAppConfig> {
         super.run(configuration, environment);
 
         environment.jersey().register(new EngineResources());
+
+        ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
+        service.scheduleAtFixedRate(new DcaeConfigurationPolling("holmes-rule-mgmt"), 0,
+                DcaeConfigurationPolling.POLLING_PERIOD, TimeUnit.MILLISECONDS);
+
         try {
             new MSBRegisterUtil().register2Msb(createMicroServiceInfo());
         } catch (CorrelationException e) {
diff --git a/engine-d/src/main/java/org/onap/holmes/engine/dcaepolling/DcaeConfigurationPolling.java b/engine-d/src/main/java/org/onap/holmes/engine/dcaepolling/DcaeConfigurationPolling.java
new file mode 100644 (file)
index 0000000..23030d2
--- /dev/null
@@ -0,0 +1,63 @@
+/**
+ * Copyright 2017 ZTE Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.holmes.engine.dcaepolling;
+
+import lombok.extern.slf4j.Slf4j;
+import org.onap.holmes.common.dcae.DcaeConfigurationQuery;
+import org.onap.holmes.common.dcae.DcaeConfigurationsCache;
+import org.onap.holmes.common.dcae.entity.DcaeConfigurations;
+import org.onap.holmes.common.dropwizard.ioc.utils.ServiceLocatorHolder;
+import org.onap.holmes.common.exception.CorrelationException;
+import org.onap.holmes.dsa.dmaappolling.Subscriber;
+import org.onap.holmes.engine.dmaappolling.SubscriberAction;
+
+@Slf4j
+public class DcaeConfigurationPolling implements Runnable{
+
+    private String hostname;
+
+    private String subscriberKey = "sec_fault_unsecure";
+
+    public static long POLLING_PERIOD = 10 * 1000L;
+
+    public DcaeConfigurationPolling(String hostname) {
+        this.hostname = hostname;
+    }
+
+    @Override
+    public void run() {
+        DcaeConfigurations dcaeConfigurations = null;
+        try {
+            dcaeConfigurations = DcaeConfigurationQuery
+                    .getDcaeConfigurations(hostname);
+        } catch (CorrelationException e) {
+            log.error("Failed to polling dcae configurations" + e.getMessage());
+        }
+        if (dcaeConfigurations != null) {
+            DcaeConfigurationsCache.setDcaeConfigurations(dcaeConfigurations);
+            addSubscriber(dcaeConfigurations);
+        }
+    }
+
+    private void addSubscriber(DcaeConfigurations dcaeConfigurations) {
+        SubscriberAction subscriberAction = ServiceLocatorHolder.getLocator()
+                .getService(SubscriberAction.class);
+        Subscriber subscriber = new Subscriber();
+        subscriber.setUrl(dcaeConfigurations.getSubSecInfo(subscriberKey).getDmaapInfo()
+                .getTopicUrl());
+        subscriberAction.addSubscriber(subscriber);
+    }
+}
index 1e71899..ef585d5 100644 (file)
@@ -35,17 +35,19 @@ public class SubscriberAction {
     private ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
 
     public void addSubscriber(Subscriber subscriber) {
-        DMaaPPollingRequest pollingTask = new DMaaPPollingRequest(subscriber, droolsEngine);
-        ScheduledFuture future = service
-                .scheduleAtFixedRate(pollingTask, 0, subscriber.getPeriod(), TimeUnit.MILLISECONDS);
-        pollingRequests.put(subscriber.getTopic(), future);
+        if (!pollingRequests.containsKey(subscriber.getUrl())) {
+            DMaaPPollingRequest pollingTask = new DMaaPPollingRequest(subscriber, droolsEngine);
+            ScheduledFuture future = service
+                    .scheduleAtFixedRate(pollingTask, 0, subscriber.getPeriod(), TimeUnit.MILLISECONDS);
+            pollingRequests.put(subscriber.getUrl(), future);
+        }
     }
 
     public void removeSubscriber(Subscriber subscriber) {
-        ScheduledFuture future = pollingRequests.get(subscriber.getTopic());
+        ScheduledFuture future = pollingRequests.get(subscriber.getUrl());
         if (future != null) {
             future.cancel(true);
         }
-        pollingRequests.remove(subscriber.getTopic());
+        pollingRequests.remove(subscriber.getUrl());
     }
 }
index de9d773..5b8951b 100644 (file)
@@ -30,4 +30,7 @@ public class DeployRuleRequest {
 
     @JsonProperty(value = "engineid")
     private String engineId;
+
+    @JsonProperty(value = "loopcontrolname")
+    private String loopControlName;
 }
index 8f9a271..45754e2 100644 (file)
@@ -33,6 +33,7 @@ import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import lombok.extern.slf4j.Slf4j;
 import org.jvnet.hk2.annotations.Service;
+import org.onap.holmes.common.dmaap.DmaapService;
 import org.onap.holmes.common.exception.CorrelationException;
 import org.onap.holmes.common.utils.ExceptionUtil;
 import org.onap.holmes.common.utils.LanguageUtil;
@@ -65,8 +66,9 @@ public class EngineResources {
         CorrelationRuleResponse crResponse = new CorrelationRuleResponse();
         Locale locale = LanguageUtil.getLocale(httpRequest);
         try {
-
             String packageName = droolsEngine.deployRule(deployRuleRequest, locale);
+            DmaapService.loopControlNames
+                    .put(packageName, deployRuleRequest.getLoopControlName());
             log.info("Rule deployed. Package name: " + packageName);
             crResponse.setPackageName(packageName);
 
index 3c68182..98f4797 100644 (file)
@@ -66,6 +66,7 @@ public class EngineResourcesTest {
     @Test\r
     public void deployRule_normal() throws CorrelationException {\r
         DeployRuleRequest deployRuleRequest = new DeployRuleRequest();\r
+        deployRuleRequest.setLoopControlName("loopControlName");\r
         HttpServletRequest httpRequest = PowerMock.createMock(HttpServletRequest.class);\r
 \r
         expect(httpRequest.getHeader("language-option")).andReturn("en_US");\r