remove code duplication in plugins 59/102059/2
authorshaoqiu <tim.huang@est.tech>
Thu, 20 Feb 2020 04:28:25 +0000 (12:28 +0800)
committershaoqiu <tim.huang@est.tech>
Fri, 21 Feb 2020 01:21:53 +0000 (09:21 +0800)
Remove consumer code duplication in plugins-event-carrier

Issue-ID: POLICY-1884
Signed-off-by: shaoqiu <tim.huang@est.tech>
Change-Id: I2d222436a97224e54a03c2501f4dc14d3f5f6ac0

plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJmsConsumer.java
plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restclient/src/main/java/org/onap/policy/apex/plugins/event/carrier/restclient/ApexRestClientConsumer.java
plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java
plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restserver/src/main/java/org/onap/policy/apex/plugins/event/carrier/restserver/ApexRestServerConsumer.java
plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restserver/src/test/java/org/onap/policy/apex/plugins/event/carrier/restserver/ApexRestServerConsumerTest.java
plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-websocket/src/main/java/org/onap/policy/apex/plugins/event/carrier/websocket/ApexWebSocketConsumer.java
services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexPluginsEventConsumer.java [new file with mode: 0644]

index cbabab3..ff30042 100644 (file)
@@ -1,7 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
- *  Modifications Copyright (C) 2019 Nordix Foundation.
+ *  Modifications Copyright (C) 2019-2020 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -21,8 +21,6 @@
 
 package org.onap.policy.apex.plugins.event.carrier.jms;
 
-import java.util.EnumMap;
-import java.util.Map;
 import java.util.Properties;
 
 import javax.jms.Connection;
@@ -34,15 +32,12 @@ import javax.jms.Session;
 import javax.jms.Topic;
 import javax.naming.InitialContext;
 
-import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
-import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
 import org.onap.policy.apex.service.engine.event.ApexEventException;
 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
-import org.onap.policy.apex.service.engine.event.PeeredReference;
+import org.onap.policy.apex.service.engine.event.ApexPluginsEventConsumer;
 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
-import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,7 +46,7 @@ import org.slf4j.LoggerFactory;
  *
  * @author Liam Fallon (liam.fallon@ericsson.com)
  */
-public class ApexJmsConsumer implements MessageListener, ApexEventConsumer, Runnable {
+public class ApexJmsConsumer extends ApexPluginsEventConsumer implements MessageListener {
     // Get a reference to the logger
     private static final Logger LOGGER = LoggerFactory.getLogger(ApexJmsConsumer.class);
 
@@ -61,22 +56,12 @@ public class ApexJmsConsumer implements MessageListener, ApexEventConsumer, Runn
     // The event receiver that will receive events from this consumer
     private ApexEventReceiver eventReceiver;
 
-    // The consumer thread and stopping flag
-    private Thread consumerThread;
-    private boolean stopOrderedFlag = false;
-
     // The connection to the JMS server
     private Connection connection;
 
     // The topic on which we receive events from JMS
     private Topic jmsIncomingTopic;
 
-    // The name for this consumer
-    private String name = null;
-
-    // The peer references for this event handler
-    private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class);
-
     @Override
     public void init(final String consumerName, final EventHandlerParameters consumerParameters,
             final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
@@ -144,42 +129,6 @@ public class ApexJmsConsumer implements MessageListener, ApexEventConsumer, Runn
         }
     }
 
-    /**
-     * {@inheritDoc}.
-     */
-    @Override
-    public void start() {
-        // Configure and start the event reception thread
-        final String threadName = this.getClass().getName() + ":" + this.name;
-        consumerThread = new ApplicationThreadFactory(threadName).newThread(this);
-        consumerThread.setDaemon(true);
-        consumerThread.start();
-    }
-
-    /**
-     * {@inheritDoc}.
-     */
-    @Override
-    public String getName() {
-        return name;
-    }
-
-    /**
-     * {@inheritDoc}.
-     */
-    @Override
-    public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
-        return peerReferenceMap.get(peeredMode);
-    }
-
-    /**
-     * {@inheritDoc}.
-     */
-    @Override
-    public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
-        peerReferenceMap.put(peeredMode, peeredReference);
-    }
-
     /**
      * {@inheritDoc}.
      */
index a99258a..947dd54 100644 (file)
@@ -1,7 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
- *  Modifications Copyright (C) 2019 Nordix Foundation.
+ *  Modifications Copyright (C) 2019-2020 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.policy.apex.plugins.event.carrier.kafka;
 
-import java.util.EnumMap;
-import java.util.Map;
 import java.util.Properties;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
-import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
 import org.onap.policy.apex.service.engine.event.ApexEventException;
 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
-import org.onap.policy.apex.service.engine.event.PeeredReference;
+import org.onap.policy.apex.service.engine.event.ApexPluginsEventConsumer;
 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
-import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,7 +39,7 @@ import org.slf4j.LoggerFactory;
  *
  * @author Liam Fallon (liam.fallon@ericsson.com)
  */
-public class ApexKafkaConsumer implements ApexEventConsumer, Runnable {
+public class ApexKafkaConsumer extends ApexPluginsEventConsumer {
     // Get a reference to the logger
     private static final Logger LOGGER = LoggerFactory.getLogger(ApexKafkaConsumer.class);
 
@@ -57,16 +52,6 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable {
     // The Kafka consumer used to receive events using Kafka
     private KafkaConsumer<String, String> kafkaConsumer;
 
-    // The name for this consumer
-    private String name = null;
-
-    // The peer references for this event handler
-    private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class);
-
-    // The consumer thread and stopping flag
-    private Thread consumerThread;
-    private boolean stopOrderedFlag = false;
-
     /**
      * {@inheritDoc}.
      */
@@ -97,41 +82,6 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable {
         }
     }
 
-    /**
-     * {@inheritDoc}.
-     */
-    @Override
-    public void start() {
-        // Configure and start the event reception thread
-        final String threadName = this.getClass().getName() + ":" + this.name;
-        consumerThread = new ApplicationThreadFactory(threadName).newThread(this);
-        consumerThread.setDaemon(true);
-        consumerThread.start();
-    }
-
-    /**
-     * {@inheritDoc}.
-     */
-    @Override
-    public String getName() {
-        return name;
-    }
-
-    /**
-     * {@inheritDoc}.
-     */
-    @Override
-    public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
-        return peerReferenceMap.get(peeredMode);
-    }
-
-    /**
-     * {@inheritDoc}.
-     */
-    @Override
-    public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
-        peerReferenceMap.put(peeredMode, peeredReference);
-    }
 
     /**
      * {@inheritDoc}.
index aaad529..aa8185f 100644 (file)
@@ -1,7 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
- *  Modifications Copyright (C) 2019 Nordix Foundation.
+ *  Modifications Copyright (C) 2019-2020 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -21,8 +21,6 @@
 
 package org.onap.policy.apex.plugins.event.carrier.restclient;
 
-import java.util.EnumMap;
-import java.util.Map;
 import java.util.Properties;
 
 import java.util.regex.Matcher;
@@ -32,15 +30,12 @@ import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.core.Response;
 
 import org.apache.commons.lang3.StringUtils;
-import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
-import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
 import org.onap.policy.apex.service.engine.event.ApexEventException;
 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
-import org.onap.policy.apex.service.engine.event.PeeredReference;
+import org.onap.policy.apex.service.engine.event.ApexPluginsEventConsumer;
 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
-import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,7 +44,7 @@ import org.slf4j.LoggerFactory;
  *
  * @author Liam Fallon (liam.fallon@ericsson.com)
  */
-public class ApexRestClientConsumer implements ApexEventConsumer, Runnable {
+public class ApexRestClientConsumer extends ApexPluginsEventConsumer {
     // Get a reference to the logger
     private static final Logger LOGGER = LoggerFactory.getLogger(ApexRestClientConsumer.class);
 
@@ -68,19 +63,9 @@ public class ApexRestClientConsumer implements ApexEventConsumer, Runnable {
     // The HTTP client that makes a REST call to get an input event for Apex
     private Client client;
 
-    // The name for this consumer
-    private String name = null;
-
-    // The peer references for this event handler
-    private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class);
-
     // The pattern for filtering status code
     private Pattern httpCodeFilterPattern = null;
 
-    // The consumer thread and stopping flag
-    private Thread consumerThread;
-    private boolean stopOrderedFlag = false;
-
     @Override
     public void init(final String consumerName, final EventHandlerParameters consumerParameters,
             final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
@@ -116,42 +101,6 @@ public class ApexRestClientConsumer implements ApexEventConsumer, Runnable {
         client = ClientBuilder.newClient();
     }
 
-    /**
-     * {@inheritDoc}.
-     */
-    @Override
-    public void start() {
-        // Configure and start the event reception thread
-        final String threadName = this.getClass().getName() + ":" + this.name;
-        consumerThread = new ApplicationThreadFactory(threadName).newThread(this);
-        consumerThread.setDaemon(true);
-        consumerThread.start();
-    }
-
-    /**
-     * {@inheritDoc}.
-     */
-    @Override
-    public String getName() {
-        return name;
-    }
-
-    /**
-     * {@inheritDoc}.
-     */
-    @Override
-    public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
-        return peerReferenceMap.get(peeredMode);
-    }
-
-    /**
-     * {@inheritDoc}.
-     */
-    @Override
-    public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
-        peerReferenceMap.put(peeredMode, peeredReference);
-    }
-
     /**
      * {@inheritDoc}.
      */
index e382c02..57560d2 100644 (file)
@@ -1,7 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
- *  Modifications Copyright (C) 2019 Nordix Foundation.
+ *  Modifications Copyright (C) 2019-2020 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -24,7 +24,6 @@ package org.onap.policy.apex.plugins.event.carrier.restrequestor;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.EnumMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -45,13 +44,11 @@ import javax.ws.rs.client.Invocation.Builder;
 import javax.ws.rs.core.Response;
 
 import org.apache.commons.lang3.StringUtils;
-import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
-import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
 import org.onap.policy.apex.service.engine.event.ApexEventException;
 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
-import org.onap.policy.apex.service.engine.event.PeeredReference;
+import org.onap.policy.apex.service.engine.event.ApexPluginsEventConsumer;
 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
 import org.slf4j.Logger;
@@ -63,7 +60,7 @@ import org.slf4j.LoggerFactory;
  *
  * @author Liam Fallon (liam.fallon@ericsson.com)
  */
-public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable {
+public class ApexRestRequestorConsumer extends ApexPluginsEventConsumer {
     // Get a reference to the logger
     private static final Logger LOGGER = LoggerFactory.getLogger(ApexRestRequestorConsumer.class);
 
@@ -86,16 +83,6 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable {
     // The HTTP client that makes a REST call to get an input event for Apex
     private Client client;
 
-    // The name for this consumer
-    private String name = null;
-
-    // The peer references for this event handler
-    private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class);
-
-    // The consumer thread and stopping flag
-    private Thread consumerThread;
-    private boolean stopOrderedFlag = false;
-
     // Temporary request holder for incoming REST requests
     private final BlockingQueue<ApexRestRequest> incomingRestRequestQueue = new LinkedBlockingQueue<>();
 
@@ -196,26 +183,6 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable {
         }
     }
 
-    /**
-     * {@inheritDoc}.
-     */
-    @Override
-    public void start() {
-        // Configure and start the event reception thread
-        final String threadName = this.getClass().getName() + ":" + this.name;
-        consumerThread = new ApplicationThreadFactory(threadName).newThread(this);
-        consumerThread.setDaemon(true);
-        consumerThread.start();
-    }
-
-    /**
-     * {@inheritDoc}.
-     */
-    @Override
-    public String getName() {
-        return name;
-    }
-
     /**
      * Get the number of events received to date.
      *
@@ -225,22 +192,6 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable {
         return eventsReceived;
     }
 
-    /**
-     * {@inheritDoc}.
-     */
-    @Override
-    public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
-        return peerReferenceMap.get(peeredMode);
-    }
-
-    /**
-     * {@inheritDoc}.
-     */
-    @Override
-    public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
-        peerReferenceMap.put(peeredMode, peeredReference);
-    }
-
     /**
      * {@inheritDoc}.
      */
index c8a07f2..9503716 100644 (file)
@@ -1,7 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
- *  Modifications Copyright (C) 2019 Nordix Foundation.
+ *  Modifications Copyright (C) 2019-2020 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.policy.apex.plugins.event.carrier.restserver;
 
-import java.util.EnumMap;
-import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.ws.rs.core.Response;
-import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
-import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
 import org.onap.policy.apex.service.engine.event.ApexEventException;
 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
-import org.onap.policy.apex.service.engine.event.PeeredReference;
+import org.onap.policy.apex.service.engine.event.ApexPluginsEventConsumer;
 import org.onap.policy.apex.service.engine.event.SynchronousEventCache;
 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
@@ -46,7 +42,7 @@ import org.slf4j.LoggerFactory;
  *
  * @author Liam Fallon (liam.fallon@ericsson.com)
  */
-public class ApexRestServerConsumer implements ApexEventConsumer, Runnable {
+public class ApexRestServerConsumer extends ApexPluginsEventConsumer {
     // Get a reference to the logger
     private static final Logger LOGGER = LoggerFactory.getLogger(ApexRestServerConsumer.class);
 
@@ -56,16 +52,6 @@ public class ApexRestServerConsumer implements ApexEventConsumer, Runnable {
     // The event receiver that will receive events from this consumer
     private ApexEventReceiver eventReceiver;
 
-    // The name for this consumer
-    private String name = null;
-
-    // The peer references for this event handler
-    private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class);
-
-    // The consumer thread and stopping flag
-    private Thread consumerThread;
-    private boolean stopOrderedFlag = false;
-
     // The local HTTP server to use for REST call reception if we are running a local Grizzly server
     private HttpServletServer server;
 
@@ -155,42 +141,6 @@ public class ApexRestServerConsumer implements ApexEventConsumer, Runnable {
         }
     }
 
-    /**
-     * {@inheritDoc}.
-     */
-    @Override
-    public void start() {
-        // Configure and start the event reception thread
-        final String threadName = this.getClass().getName() + ":" + this.name;
-        consumerThread = new ApplicationThreadFactory(threadName).newThread(this);
-        consumerThread.setDaemon(true);
-        consumerThread.start();
-    }
-
-    /**
-     * {@inheritDoc}.
-     */
-    @Override
-    public String getName() {
-        return name;
-    }
-
-    /**
-     * {@inheritDoc}.
-     */
-    @Override
-    public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
-        return peerReferenceMap.get(peeredMode);
-    }
-
-    /**
-     * {@inheritDoc}.
-     */
-    @Override
-    public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
-        peerReferenceMap.put(peeredMode, peeredReference);
-    }
-
     /**
      * Receive an event for processing in Apex.
      *
index 07f705c..91d6a0f 100644 (file)
@@ -1,7 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2019 Samsung. All rights reserved.
- *  Modifications Copyright (C) 2019 Nordix Foundation.
+ *  Modifications Copyright (C) 2019-2020 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -32,6 +32,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.onap.policy.apex.service.engine.event.ApexEventException;
 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
+import org.onap.policy.apex.service.engine.event.ApexPluginsEventConsumer;
 import org.onap.policy.apex.service.engine.event.PeeredReference;
 import org.onap.policy.apex.service.engine.event.SynchronousEventCache;
 import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters;
@@ -156,7 +157,7 @@ public class ApexRestServerConsumerTest {
         Field field = ApexRestServerConsumer.class.getDeclaredField("eventReceiver");
         field.setAccessible(true);
         field.set(apexRestServerConsumer, apexEventReceiver);
-        field = ApexRestServerConsumer.class.getDeclaredField("name");
+        field = ApexPluginsEventConsumer.class.getDeclaredField("name");
         field.setAccessible(true);
         field.set(apexRestServerConsumer, "TestApexRestServerConsumer");
 
index 3cf4480..949cd53 100644 (file)
@@ -1,6 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ *   *  Modifications Copyright (C) 2020 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -20,8 +21,6 @@
 
 package org.onap.policy.apex.plugins.event.carrier.websocket;
 
-import java.util.EnumMap;
-import java.util.Map;
 import java.util.Properties;
 
 import org.onap.policy.apex.core.infrastructure.messaging.MessagingException;
@@ -29,14 +28,11 @@ import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WsStri
 import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WsStringMessageListener;
 import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WsStringMessageServer;
 import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WsStringMessager;
-import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
-import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
 import org.onap.policy.apex.service.engine.event.ApexEventException;
 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
-import org.onap.policy.apex.service.engine.event.PeeredReference;
+import org.onap.policy.apex.service.engine.event.ApexPluginsEventConsumer;
 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
-import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,7 +41,7 @@ import org.slf4j.LoggerFactory;
  *
  * @author Liam Fallon (liam.fallon@ericsson.com)
  */
-public class ApexWebSocketConsumer implements ApexEventConsumer, WsStringMessageListener, Runnable {
+public class ApexWebSocketConsumer extends ApexPluginsEventConsumer implements WsStringMessageListener {
     private static final int WEB_SOCKET_WAIT_SLEEP_TIME = 100;
 
     // Get a reference to the logger
@@ -57,16 +53,6 @@ public class ApexWebSocketConsumer implements ApexEventConsumer, WsStringMessage
     // The event receiver that will receive events from this consumer
     private ApexEventReceiver eventReceiver;
 
-    // The name for this consumer
-    private String name = null;
-
-    // The peer references for this event handler
-    private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class);
-
-    // The consumer thread and stopping flag
-    private Thread consumerThread;
-    private boolean stopOrderedFlag = false;
-
     // The number of events read to date
     private int eventsRead = 0;
 
@@ -103,42 +89,6 @@ public class ApexWebSocketConsumer implements ApexEventConsumer, WsStringMessage
         }
     }
 
-    /**
-     * {@inheritDoc}.
-     */
-    @Override
-    public void start() {
-        // Configure and start the event reception thread
-        final String threadName = this.getClass().getName() + ":" + this.name;
-        consumerThread = new ApplicationThreadFactory(threadName).newThread(this);
-        consumerThread.setDaemon(true);
-        consumerThread.start();
-    }
-
-    /**
-     * {@inheritDoc}.
-     */
-    @Override
-    public String getName() {
-        return name;
-    }
-
-    /**
-     * {@inheritDoc}.
-     */
-    @Override
-    public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
-        return peerReferenceMap.get(peeredMode);
-    }
-
-    /**
-     * {@inheritDoc}.
-     */
-    @Override
-    public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
-        peerReferenceMap.put(peeredMode, peeredReference);
-    }
-
     /**
      * {@inheritDoc}.
      */
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexPluginsEventConsumer.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexPluginsEventConsumer.java
new file mode 100644 (file)
index 0000000..95a263e
--- /dev/null
@@ -0,0 +1,76 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2020 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.service.engine.event;
+
+import java.util.EnumMap;
+import java.util.Map;
+import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
+import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
+
+public abstract class ApexPluginsEventConsumer implements ApexEventConsumer, Runnable {
+    // The name for this consumer
+    protected String name = null;
+
+    // The peer references for this event handler
+    protected Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap =
+            new EnumMap<>(EventHandlerPeeredMode.class);
+
+    // The consumer thread and stopping flag
+    protected Thread consumerThread;
+    protected boolean stopOrderedFlag = false;
+
+    /**
+     * {@inheritDoc}.
+     */
+    @Override
+    public void start() {
+        // Configure and start the event reception thread
+        final String threadName = this.getClass().getName() + ":" + this.name;
+        consumerThread = new ApplicationThreadFactory(threadName).newThread(this);
+        consumerThread.setDaemon(true);
+        consumerThread.start();
+    }
+
+    /**
+     * {@inheritDoc}.
+     */
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    /**
+     * {@inheritDoc}.
+     */
+    @Override
+    public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
+        return peerReferenceMap.get(peeredMode);
+    }
+
+    /**
+     * {@inheritDoc}.
+     */
+    @Override
+    public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
+        peerReferenceMap.put(peeredMode, peeredReference);
+    }
+
+}