From b6e1b12ee06a03b4faff592df9644575d9fb58c3 Mon Sep 17 00:00:00 2001 From: Shiwei Tian Date: Thu, 26 Oct 2017 17:13:07 +0800 Subject: [PATCH] modify dmaap request way Issue-ID: HOLMES-71 Change-Id: Ie6bc1234fd0108e651635e8f8bfb45b0dcb4cd13 Signed-off-by: Shiwei Tian --- .../org/onap/holmes/engine/EngineDActiveApp.java | 4 +- .../DcaeConfigurationPolling.java | 4 +- .../DMaaPAlarmPolling.java} | 26 +++++---- .../onap/holmes/engine/dmaap/SubscriberAction.java | 61 ++++++++++++++++++++++ .../engine/dmaappolling/SubscriberAction.java | 53 ------------------- .../resources/DmaapConfigurationService.java | 3 +- 6 files changed, 81 insertions(+), 70 deletions(-) rename engine-d/src/main/java/org/onap/holmes/engine/{dcaepolling => dcae}/DcaeConfigurationPolling.java (95%) rename engine-d/src/main/java/org/onap/holmes/engine/{dmaappolling/DMaaPPollingRequest.java => dmaap/DMaaPAlarmPolling.java} (62%) create mode 100644 engine-d/src/main/java/org/onap/holmes/engine/dmaap/SubscriberAction.java delete mode 100644 engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/SubscriberAction.java diff --git a/engine-d/src/main/java/org/onap/holmes/engine/EngineDActiveApp.java b/engine-d/src/main/java/org/onap/holmes/engine/EngineDActiveApp.java index 69e4c7a..73c35bc 100644 --- a/engine-d/src/main/java/org/onap/holmes/engine/EngineDActiveApp.java +++ b/engine-d/src/main/java/org/onap/holmes/engine/EngineDActiveApp.java @@ -15,8 +15,6 @@ */ package org.onap.holmes.engine; -import static jdk.nashorn.internal.runtime.regexp.joni.Config.log; - import io.dropwizard.setup.Environment; import java.util.HashSet; import java.util.Set; @@ -28,7 +26,7 @@ import org.onap.holmes.common.config.MicroServiceConfig; import org.onap.holmes.common.dropwizard.ioc.bundle.IOCApplication; import org.onap.holmes.common.exception.CorrelationException; import org.onap.holmes.common.utils.MSBRegisterUtil; -import org.onap.holmes.engine.dcaepolling.DcaeConfigurationPolling; +import org.onap.holmes.engine.dcae.DcaeConfigurationPolling; import org.onap.holmes.engine.resources.EngineResources; import org.onap.msb.sdk.discovery.entity.MicroServiceInfo; import org.onap.msb.sdk.discovery.entity.Node; diff --git a/engine-d/src/main/java/org/onap/holmes/engine/dcaepolling/DcaeConfigurationPolling.java b/engine-d/src/main/java/org/onap/holmes/engine/dcae/DcaeConfigurationPolling.java similarity index 95% rename from engine-d/src/main/java/org/onap/holmes/engine/dcaepolling/DcaeConfigurationPolling.java rename to engine-d/src/main/java/org/onap/holmes/engine/dcae/DcaeConfigurationPolling.java index 9fa153e..4808009 100644 --- a/engine-d/src/main/java/org/onap/holmes/engine/dcaepolling/DcaeConfigurationPolling.java +++ b/engine-d/src/main/java/org/onap/holmes/engine/dcae/DcaeConfigurationPolling.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.onap.holmes.engine.dcaepolling; +package org.onap.holmes.engine.dcae; import lombok.extern.slf4j.Slf4j; import org.onap.holmes.common.dcae.DcaeConfigurationQuery; @@ -22,7 +22,7 @@ import org.onap.holmes.common.dcae.entity.DcaeConfigurations; import org.onap.holmes.common.dropwizard.ioc.utils.ServiceLocatorHolder; import org.onap.holmes.common.exception.CorrelationException; import org.onap.holmes.dsa.dmaappolling.Subscriber; -import org.onap.holmes.engine.dmaappolling.SubscriberAction; +import org.onap.holmes.engine.dmaap.SubscriberAction; @Slf4j public class DcaeConfigurationPolling implements Runnable{ diff --git a/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/DMaaPPollingRequest.java b/engine-d/src/main/java/org/onap/holmes/engine/dmaap/DMaaPAlarmPolling.java similarity index 62% rename from engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/DMaaPPollingRequest.java rename to engine-d/src/main/java/org/onap/holmes/engine/dmaap/DMaaPAlarmPolling.java index 24e0817..52f3915 100644 --- a/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/DMaaPPollingRequest.java +++ b/engine-d/src/main/java/org/onap/holmes/engine/dmaap/DMaaPAlarmPolling.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.onap.holmes.engine.dmaappolling; +package org.onap.holmes.engine.dmaap; import java.util.ArrayList; import java.util.List; @@ -24,24 +24,30 @@ import org.onap.holmes.dsa.dmaappolling.Subscriber; import org.onap.holmes.engine.manager.DroolsEngine; @Slf4j -public class DMaaPPollingRequest implements Runnable { +public class DMaaPAlarmPolling implements Runnable { private Subscriber subscriber; - private DroolsEngine droolsEngine; + private volatile boolean isAlive = true; - public DMaaPPollingRequest(Subscriber subscriber, DroolsEngine droolsEngine) { + public DMaaPAlarmPolling(Subscriber subscriber, DroolsEngine droolsEngine) { this.subscriber = subscriber; this.droolsEngine = droolsEngine; } public void run() { - List vesAlarmList = new ArrayList<>(); - try { - vesAlarmList = subscriber.subscribe(); - } catch (CorrelationException e) { - log.error("Failed polling request alarm." + e.getMessage()); + while (isAlive) { + List vesAlarmList = new ArrayList<>(); + try { + vesAlarmList = subscriber.subscribe(); + } catch (CorrelationException e) { + log.error("Failed polling request alarm." + e.getMessage()); + } + vesAlarmList.forEach(vesAlarm -> droolsEngine.putRaisedIntoStream(vesAlarm)); } - vesAlarmList.forEach(vesAlarm -> droolsEngine.putRaisedIntoStream(vesAlarm)); + } + + public void stopTask() { + isAlive = false; } } diff --git a/engine-d/src/main/java/org/onap/holmes/engine/dmaap/SubscriberAction.java b/engine-d/src/main/java/org/onap/holmes/engine/dmaap/SubscriberAction.java new file mode 100644 index 0000000..8d80a6b --- /dev/null +++ b/engine-d/src/main/java/org/onap/holmes/engine/dmaap/SubscriberAction.java @@ -0,0 +1,61 @@ +/* + * Copyright 2017 ZTE Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onap.holmes.engine.dmaap; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import org.jvnet.hk2.annotations.Service; +import org.onap.holmes.dsa.dmaappolling.Subscriber; +import org.onap.holmes.engine.manager.DroolsEngine; + +@Service +public class SubscriberAction { + + @Inject + private DroolsEngine droolsEngine; + private HashMap pollingTasks = new HashMap<>(); + + public synchronized void addSubscriber(Subscriber subscriber) { + if (!pollingTasks.containsKey(subscriber.getTopic())) { + DMaaPAlarmPolling pollingTask = new DMaaPAlarmPolling(subscriber, droolsEngine); + Thread thread = new Thread(pollingTask); + thread.start(); + pollingTasks.put(subscriber.getTopic(), pollingTask); + } + } + + public synchronized void removeSubscriber(Subscriber subscriber) { + if (pollingTasks.containsKey(subscriber.getTopic())) { + pollingTasks.get(subscriber.getTopic()).stopTask(); + pollingTasks.remove(subscriber.getTopic()); + } + } + + @PreDestroy + public void stopPollingTasks() { + Iterator iterator = pollingTasks.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = (Map.Entry)iterator.next(); + String key = (String) entry.getKey(); + pollingTasks.get(key).stopTask(); + } + pollingTasks.clear(); + } +} diff --git a/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/SubscriberAction.java b/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/SubscriberAction.java deleted file mode 100644 index da83683..0000000 --- a/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/SubscriberAction.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2017 ZTE Corporation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onap.holmes.engine.dmaappolling; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import javax.inject.Inject; -import org.jvnet.hk2.annotations.Service; -import org.onap.holmes.dsa.dmaappolling.Subscriber; -import org.onap.holmes.engine.manager.DroolsEngine; - -@Service -public class SubscriberAction { - - @Inject - private DroolsEngine droolsEngine; - - private ConcurrentHashMap pollingRequests = new ConcurrentHashMap(); - private ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); - - public void addSubscriber(Subscriber subscriber) { - if (!pollingRequests.containsKey(subscriber.getTopic())) { - DMaaPPollingRequest pollingTask = new DMaaPPollingRequest(subscriber, droolsEngine); - ScheduledFuture future = service - .scheduleAtFixedRate(pollingTask, 0, subscriber.getPeriod(), TimeUnit.MILLISECONDS); - pollingRequests.put(subscriber.getTopic(), future); - } - } - - public void removeSubscriber(Subscriber subscriber) { - ScheduledFuture future = pollingRequests.get(subscriber.getTopic()); - if (future != null) { - future.cancel(true); - } - pollingRequests.remove(subscriber.getTopic()); - } -} diff --git a/engine-d/src/main/java/org/onap/holmes/engine/resources/DmaapConfigurationService.java b/engine-d/src/main/java/org/onap/holmes/engine/resources/DmaapConfigurationService.java index 9fa1874..99567b8 100644 --- a/engine-d/src/main/java/org/onap/holmes/engine/resources/DmaapConfigurationService.java +++ b/engine-d/src/main/java/org/onap/holmes/engine/resources/DmaapConfigurationService.java @@ -16,7 +16,6 @@ package org.onap.holmes.engine.resources; -import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import javax.servlet.http.HttpServletRequest; @@ -33,7 +32,7 @@ import org.onap.holmes.common.dcae.DcaeConfigurationsCache; import org.onap.holmes.common.dcae.entity.SecurityInfo; import org.onap.holmes.common.dropwizard.ioc.utils.ServiceLocatorHolder; import org.onap.holmes.dsa.dmaappolling.Subscriber; -import org.onap.holmes.engine.dmaappolling.SubscriberAction; +import org.onap.holmes.engine.dmaap.SubscriberAction; import org.onap.holmes.engine.request.DmaapConfigRequest; @Service -- 2.16.6