Mostly concentrated on the period at the end of summary.
But I did clear a few others for longer than 120 characters
and placement of methods next to each other. Possibly a few
others.
I did not clear everything, but will submit a few more
reviews to get the others.
Issue-ID: POLICY-881
Change-Id: I692a5349d686d52fee4040757cdc2ed8b5cc221b
Signed-off-by: Pamela Dragosh <pdragosh@research.att.com>
/**
- * Essential Topic Data
+ * Essential Topic Data.
*/
public interface Topic extends TopicRegisterable, Startable, Lockable {
/**
- * network logger
+ * network logger.
*/
public static final String NETWORK_LOGGER = "network";
/**
- * Underlying Communication infrastructure Types
+ * Underlying Communication infrastructure Types.
*/
public enum CommInfrastructure {
/**
- * UEB Communication Infrastructure
+ * UEB Communication Infrastructure.
*/
UEB,
/**
- * DMAAP Communication Infrastructure
+ * DMAAP Communication Infrastructure.
*/
DMAAP,
/**
- * NOOP for internal use only
+ * NOOP for internal use only.
*/
NOOP,
/**
- * REST Communication Infrastructure
+ * REST Communication Infrastructure.
*/
REST
}
/**
- * gets the topic name
+ * Gets the topic name.
*
* @return topic name
*/
public String getTopic();
/**
- * gets the communication infrastructure type
+ * Gets the communication infrastructure type.
*
* @return
*/
public CommInfrastructure getTopicCommInfrastructure();
/**
- * return list of servers
+ * Return list of servers.
*
* @return bus servers
*/
public List<String> getServers();
/**
- * get the more recent events in this topic entity
+ * Get the more recent events in this topic entity.
*
* @return list of most recent events
*/
public interface TopicEndpoint extends Startable, Lockable {
/**
- * singleton for global access
+ * singleton for global access.
*/
public static final TopicEndpoint manager = new ProxyTopicEndpointManager();
/**
- * Add Topic Sources to the communication infrastructure initialized per properties
+ * Add Topic Sources to the communication infrastructure initialized per properties.
*
* @param properties properties for Topic Source construction
* @return a generic Topic Source
public List<TopicSource> addTopicSources(Properties properties);
/**
- * Add Topic Sinks to the communication infrastructure initialized per properties
+ * Add Topic Sinks to the communication infrastructure initialized per properties.
*
* @param properties properties for Topic Sink construction
* @return a generic Topic Sink
public List<TopicSink> addTopicSinks(Properties properties);
/**
- * gets all Topic Sources
+ * Gets all Topic Sources.
*
* @return the Topic Source List
*/
List<TopicSource> getTopicSources();
/**
- * get the Topic Sources for the given topic name
+ * Get the Topic Sources for the given topic name.
*
* @param topicNames the topic name
*
public List<TopicSource> getTopicSources(List<String> topicNames);
/**
- * gets the Topic Source for the given topic name and underlying communication infrastructure
- * type
+ * Gets the Topic Source for the given topic name and underlying communication infrastructure
+ * type.
*
* @param commType communication infrastructure type
* @param topicName the topic name
public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName);
/**
- * get the UEB Topic Source for the given topic name
+ * Get the UEB Topic Source for the given topic name.
*
* @param topicName the topic name
*
public UebTopicSource getUebTopicSource(String topicName);
/**
- * get the DMAAP Topic Source for the given topic name
+ * Get the DMAAP Topic Source for the given topic name.
*
* @param topicName the topic name
*
public DmaapTopicSource getDmaapTopicSource(String topicName);
/**
- * get the Topic Sinks for the given topic name
+ * Get the Topic Sinks for the given topic name.
*
* @param topicNames the topic names
* @return the Topic Sink List
- * @throws IllegalStateException
- * @throws IllegalArgumentException
*/
public List<TopicSink> getTopicSinks(List<String> topicNames);
/**
- * get the Topic Sinks for the given topic name and underlying communication infrastructure type
+ * Get the Topic Sinks for the given topic name and all the underlying communication
+ * infrastructure type.
*
* @param topicName the topic name
- * @param commType communication infrastructure type
*
* @return the Topic Sink List
* @throws IllegalStateException if the entity is in an invalid state, for example multiple
* TopicWriters for a topic name and communication infrastructure
* @throws IllegalArgumentException if invalid parameters are present
*/
- public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName);
+ public List<TopicSink> getTopicSinks(String topicName);
+
+ /**
+ * Gets all Topic Sinks.
+ *
+ * @return the Topic Sink List
+ */
+ public List<TopicSink> getTopicSinks();
/**
- * get the Topic Sinks for the given topic name and all the underlying communication
- * infrastructure type
+ * Get the Topic Sinks for the given topic name and underlying communication infrastructure type.
*
* @param topicName the topic name
+ * @param commType communication infrastructure type
*
* @return the Topic Sink List
* @throws IllegalStateException if the entity is in an invalid state, for example multiple
* TopicWriters for a topic name and communication infrastructure
* @throws IllegalArgumentException if invalid parameters are present
*/
- public List<TopicSink> getTopicSinks(String topicName);
+ public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName);
/**
- * get the UEB Topic Source for the given topic name
+ * Get the UEB Topic Source for the given topic name.
*
* @param topicName the topic name
*
public UebTopicSink getUebTopicSink(String topicName);
/**
- * get the no-op Topic Sink for the given topic name
+ * Get the no-op Topic Sink for the given topic name.
*
* @param topicName the topic name
*
public NoopTopicSink getNoopTopicSink(String topicName);
/**
- * get the DMAAP Topic Source for the given topic name
+ * Get the DMAAP Topic Source for the given topic name.
*
* @param topicName the topic name
*
public DmaapTopicSink getDmaapTopicSink(String topicName);
/**
- * gets only the UEB Topic Sources
+ * Gets only the UEB Topic Sources.
*
* @return the UEB Topic Source List
*/
public List<UebTopicSource> getUebTopicSources();
/**
- * gets only the DMAAP Topic Sources
+ * Gets only the DMAAP Topic Sources.
*
* @return the DMAAP Topic Source List
*/
public List<DmaapTopicSource> getDmaapTopicSources();
/**
- * gets all Topic Sinks
- *
- * @return the Topic Sink List
- */
- public List<TopicSink> getTopicSinks();
-
- /**
- * gets only the UEB Topic Sinks
+ * Gets only the UEB Topic Sinks.
*
* @return the UEB Topic Sink List
*/
public List<UebTopicSink> getUebTopicSinks();
/**
- * gets only the DMAAP Topic Sinks
+ * Gets only the DMAAP Topic Sinks.
*
* @return the DMAAP Topic Sink List
*/
public List<DmaapTopicSink> getDmaapTopicSinks();
/**
- * gets only the NOOP Topic Sinks
+ * Gets only the NOOP Topic Sinks.
*
* @return the NOOP Topic Sinks List
*/
/**
* This implementation of the Topic Endpoint Manager, proxies operations to appropriate
- * implementations according to the communication infrastructure that are supported
+ * implementations according to the communication infrastructure that are supported.
*/
class ProxyTopicEndpointManager implements TopicEndpoint {
/**
- * Logger
+ * Logger.
*/
private static Logger logger = LoggerFactory.getLogger(ProxyTopicEndpointManager.class);
/**
- * Is this element locked?
+ * Is this element locked boolean.
*/
protected volatile boolean locked = false;
/**
- * Is this element alive?
+ * Is this element alive boolean.
*/
protected volatile boolean alive = false;
return sources;
}
+
+ @Override
+ public List<TopicSource> getTopicSources(List<String> topicNames) {
+
+ if (topicNames == null) {
+ throw new IllegalArgumentException("must provide a list of topics");
+ }
+
+ final List<TopicSource> sources = new ArrayList<>();
+ for (final String topic : topicNames) {
+ try {
+ final TopicSource uebSource = this.getUebTopicSource(topic);
+ if (uebSource != null) {
+ sources.add(uebSource);
+ }
+ } catch (final Exception e) {
+ logger.debug("No UEB source for topic: {}", topic, e);
+ }
+
+ try {
+ final TopicSource dmaapSource = this.getDmaapTopicSource(topic);
+ if (dmaapSource != null) {
+ sources.add(dmaapSource);
+ }
+ } catch (final Exception e) {
+ logger.debug("No DMAAP source for topic: {}", topic, e);
+ }
+ }
+ return sources;
+ }
@Override
public List<TopicSink> getTopicSinks() {
return sinks;
}
+ @Override
+ public List<TopicSink> getTopicSinks(List<String> topicNames) {
+
+ if (topicNames == null) {
+ throw new IllegalArgumentException("must provide a list of topics");
+ }
+
+ final List<TopicSink> sinks = new ArrayList<>();
+ for (final String topic : topicNames) {
+ try {
+ final TopicSink uebSink = this.getUebTopicSink(topic);
+ if (uebSink != null) {
+ sinks.add(uebSink);
+ }
+ } catch (final Exception e) {
+ logger.debug("No UEB sink for topic: {}", topic, e);
+ }
+
+ try {
+ final TopicSink dmaapSink = this.getDmaapTopicSink(topic);
+ if (dmaapSink != null) {
+ sinks.add(dmaapSink);
+ }
+ } catch (final Exception e) {
+ logger.debug("No DMAAP sink for topic: {}", topic, e);
+ }
+
+ try {
+ final TopicSink noopSink = this.getNoopTopicSink(topic);
+ if (noopSink != null) {
+ sinks.add(noopSink);
+ }
+ } catch (final Exception e) {
+ logger.debug("No NOOP sink for topic: {}", topic, e);
+ }
+ }
+ return sinks;
+ }
+
+ @Override
+ public List<TopicSink> getTopicSinks(String topicName) {
+ if (topicName == null) {
+ throw parmException(topicName);
+ }
+
+ final List<TopicSink> sinks = new ArrayList<>();
+
+ try {
+ sinks.add(this.getUebTopicSink(topicName));
+ } catch (final Exception e) {
+ logNoSink(topicName, e);
+ }
+
+ try {
+ sinks.add(this.getDmaapTopicSink(topicName));
+ } catch (final Exception e) {
+ logNoSink(topicName, e);
+ }
+
+ try {
+ sinks.add(this.getNoopTopicSink(topicName));
+ } catch (final Exception e) {
+ logNoSink(topicName, e);
+ }
+
+ return sinks;
+ }
+
@JsonIgnore
@Override
public List<UebTopicSource> getUebTopicSources() {
}
/**
+ * Gets the endpoints.
*
* @return list of managed endpoints
*/
return this.locked;
}
- @Override
- public List<TopicSource> getTopicSources(List<String> topicNames) {
-
- if (topicNames == null) {
- throw new IllegalArgumentException("must provide a list of topics");
- }
-
- final List<TopicSource> sources = new ArrayList<>();
- for (final String topic : topicNames) {
- try {
- final TopicSource uebSource = this.getUebTopicSource(topic);
- if (uebSource != null) {
- sources.add(uebSource);
- }
- } catch (final Exception e) {
- logger.debug("No UEB source for topic: {}", topic, e);
- }
-
- try {
- final TopicSource dmaapSource = this.getDmaapTopicSource(topic);
- if (dmaapSource != null) {
- sources.add(dmaapSource);
- }
- } catch (final Exception e) {
- logger.debug("No DMAAP source for topic: {}", topic, e);
- }
- }
- return sources;
- }
-
- @Override
- public List<TopicSink> getTopicSinks(List<String> topicNames) {
-
- if (topicNames == null) {
- throw new IllegalArgumentException("must provide a list of topics");
- }
-
- final List<TopicSink> sinks = new ArrayList<>();
- for (final String topic : topicNames) {
- try {
- final TopicSink uebSink = this.getUebTopicSink(topic);
- if (uebSink != null) {
- sinks.add(uebSink);
- }
- } catch (final Exception e) {
- logger.debug("No UEB sink for topic: {}", topic, e);
- }
-
- try {
- final TopicSink dmaapSink = this.getDmaapTopicSink(topic);
- if (dmaapSink != null) {
- sinks.add(dmaapSink);
- }
- } catch (final Exception e) {
- logger.debug("No DMAAP sink for topic: {}", topic, e);
- }
-
- try {
- final TopicSink noopSink = this.getNoopTopicSink(topic);
- if (noopSink != null) {
- sinks.add(noopSink);
- }
- } catch (final Exception e) {
- logger.debug("No NOOP sink for topic: {}", topic, e);
- }
- }
- return sinks;
- }
-
@Override
public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName) {
}
}
- @Override
- public List<TopicSink> getTopicSinks(String topicName) {
- if (topicName == null) {
- throw parmException(topicName);
- }
-
- final List<TopicSink> sinks = new ArrayList<>();
-
- try {
- sinks.add(this.getUebTopicSink(topicName));
- } catch (final Exception e) {
- logNoSink(topicName, e);
- }
-
- try {
- sinks.add(this.getDmaapTopicSink(topicName));
- } catch (final Exception e) {
- logNoSink(topicName, e);
- }
-
- try {
- sinks.add(this.getNoopTopicSink(topicName));
- } catch (final Exception e) {
- logNoSink(topicName, e);
- }
-
- return sinks;
- }
-
private void logNoSink(String topicName, Exception ex) {
logger.debug("No sink for topic: {}", topicName, ex);
}
package org.onap.policy.common.endpoints.event.comm;
/**
- * Listener for event messages entering the Policy Engine
+ * Listener for event messages entering the Policy Engine.
*/
@FunctionalInterface
public interface TopicListener {
-
- /**
- * Notification of a new Event over a given Topic
- *
- * @param commType communication infrastructure type
- * @param topic topic name
- * @param event event message as a string
- */
- public void onTopicEvent(Topic.CommInfrastructure commType, String topic, String event);
+
+ /**
+ * Notification of a new Event over a given Topic.
+ *
+ * @param commType communication infrastructure type
+ * @param topic topic name
+ * @param event event message as a string
+ */
+ public void onTopicEvent(Topic.CommInfrastructure commType, String topic, String event);
}
package org.onap.policy.common.endpoints.event.comm;
/**
- * Marks a Topic entity as registerable
+ * Marks a Topic entity as registerable.
*/
public interface TopicRegisterable {
-
- /**
- * Register for notification of events with this Topic Entity
- *
- * @param topicListener the listener of events
- */
- public void register(TopicListener topicListener);
-
- /**
- * Unregisters for notification of events with this Topic Entity
- *
- * @param topicListener the listener of events
- */
- public void unregister(TopicListener topicListener);
+
+ /**
+ * Register for notification of events with this Topic Entity.
+ *
+ * @param topicListener the listener of events
+ */
+ public void register(TopicListener topicListener);
+
+ /**
+ * Unregisters for notification of events with this Topic Entity.
+ *
+ * @param topicListener the listener of events
+ */
+ public void unregister(TopicListener topicListener);
}
package org.onap.policy.common.endpoints.event.comm;
/**
- * Marks a given Topic Endpoint as able to send messages over a topic
+ * Marks a given Topic Endpoint as able to send messages over a topic.
*/
public interface TopicSink extends Topic {
-
- /**
- * Sends a string message over this Topic Endpoint
- *
- * @param message message to send
- *
- * @return true if the send operation succeeded, false otherwise
- * @throws IllegalArgumentException an invalid message has been provided
- * @throws IllegalStateException the entity is in an state that prevents
- * it from sending messages, for example, locked or stopped.
- */
- public boolean send(String message);
+
+ /**
+ * Sends a string message over this Topic Endpoint.
+ *
+ * @param message message to send
+ *
+ * @return true if the send operation succeeded, false otherwise
+ * @throws IllegalArgumentException an invalid message has been provided
+ * @throws IllegalStateException the entity is in an state that prevents
+ * it from sending messages, for example, locked or stopped.
+ */
+ public boolean send(String message);
}
/**
* Marker for a Topic Entity, indicating that the entity is able to read
- * over a topic
+ * over a topic.
*/
public interface TopicSource extends Topic {
-
- /**
- * pushes an event into the source programatically
- *
- * @param event the event in json format
- * @return true if it can be processed correctly, false otherwise
- */
- public boolean offer(String event);
+
+ /**
+ * Pushes an event into the source programatically.
+ *
+ * @param event the event in json format
+ * @return true if it can be processed correctly, false otherwise
+ */
+ public boolean offer(String event);
}
\ No newline at end of file
package org.onap.policy.common.endpoints.event.comm.bus;
/**
- * API
+ * API.
*/
public interface ApiKeyEnabled {
- /**
- * @return api key
- */
- public String getApiKey();
-
- /**
- * @return api secret
- */
- public String getApiSecret();
+ /**
+ * @return api key
+ */
+ public String getApiKey();
+
+ /**
+ * @return api secret
+ */
+ public String getApiSecret();
}
import org.onap.policy.common.endpoints.event.comm.TopicSink;
/**
- * Topic Sink over Bus Infrastructure (DMAAP/UEB)
+ * Topic Sink over Bus Infrastructure (DMAAP/UEB).
*/
public interface BusTopicSink extends ApiKeyEnabled, TopicSink {
/**
- * Log Failures after X number of retries
+ * Log Failures after X number of retries.
*/
public static final int DEFAULT_LOG_SEND_FAILURES_AFTER = 1;
/**
- * Sets the UEB partition key for published messages
+ * Sets the UEB partition key for published messages.
*
* @param partitionKey the partition key
*/
public void setPartitionKey(String partitionKey);
/**
- * return the partition key in used by the system to publish messages
+ * Return the partition key in used by the system to publish messages.
*
* @return the partition key
*/
import org.onap.policy.common.endpoints.event.comm.TopicSource;
/**
- * Generic Topic Source for UEB/DMAAP Communication Infrastructure
+ * Generic Topic Source for UEB/DMAAP Communication Infrastructure.
*
*/
public interface BusTopicSource extends ApiKeyEnabled, TopicSource {
/**
- * Default Timeout fetching in milliseconds
+ * Default Timeout fetching in milliseconds.
*/
public static int DEFAULT_TIMEOUT_MS_FETCH = 15000;
/**
- * Default maximum number of messages fetch at the time
+ * Default maximum number of messages fetch at the time.
*/
public static int DEFAULT_LIMIT_FETCH = 100;
/**
- * Definition of No Timeout fetching
+ * Definition of No Timeout fetching.
*/
public static int NO_TIMEOUT_MS_FETCH = -1;
/**
- * Definition of No limit fetching
+ * Definition of No limit fetching.
*/
public static int NO_LIMIT_FETCH = -1;
/**
- * gets the consumer group
+ * Gets the consumer group.
*
* @return consumer group
*/
public String getConsumerGroup();
/**
- * gets the consumer instance
+ * Gets the consumer instance.
*
* @return consumer instance
*/
public String getConsumerInstance();
/**
- * gets the fetch timeout
+ * Gets the fetch timeout.
*
* @return fetch timeout
*/
public int getFetchTimeout();
/**
- * gets the fetch limit
+ * Gets the fetch limit.
*
* @return fetch limit
*/
public interface DmaapTopicSink extends BusTopicSink {
/**
- * Factory of UebTopicWriter for instantiation and management purposes
+ * Factory of UebTopicWriter for instantiation and management purposes.
*/
public static final DmaapTopicSinkFactory factory = new IndexedDmaapTopicSinkFactory();
import org.slf4j.LoggerFactory;
/**
- * DMAAP Topic Sink Factory
+ * DMAAP Topic Sink Factory.
*/
public interface DmaapTopicSinkFactory {
public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
/**
- * Instantiates a new DMAAP Topic Sink
+ * Instantiates a new DMAAP Topic Sink.
*
* @param servers list of servers
* @param topic topic name
* @throws IllegalArgumentException if invalid parameters are present
*/
public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
- String password, String partitionKey, String environment, String aftEnvironment, String partner,
- String latitude, String longitude, Map<String, String> additionalProps, boolean managed, boolean useHttps,
- boolean allowSelfSignedCerts);
+ String password, String partitionKey, String environment, String aftEnvironment,
+ String partner, String latitude, String longitude, Map<String, String> additionalProps,
+ boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
/**
- * Instantiates a new DMAAP Topic Sink
+ * Instantiates a new DMAAP Topic Sink.
*
* @param servers list of servers
* @param topic topic name
* @throws IllegalArgumentException if invalid parameters are present
*/
public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
- String password, String partitionKey, boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
+ String password, String partitionKey, boolean managed,
+ boolean useHttps, boolean allowSelfSignedCerts);
/**
- * Creates an DMAAP Topic Sink based on properties files
+ * Creates an DMAAP Topic Sink based on properties files.
*
* @param properties Properties containing initialization values
* @return an DMAAP Topic Sink
public List<DmaapTopicSink> build(Properties properties);
/**
- * Instantiates a new DMAAP Topic Sink
+ * Instantiates a new DMAAP Topic Sink.
*
* @param servers list of servers
* @param topic topic name
public DmaapTopicSink build(List<String> servers, String topic);
/**
- * Destroys an DMAAP Topic Sink based on a topic
+ * Destroys an DMAAP Topic Sink based on a topic.
*
* @param topic topic name
* @throws IllegalArgumentException if invalid parameters are present
public void destroy(String topic);
/**
- * gets an DMAAP Topic Sink based on topic name
+ * Destroys all DMAAP Topic Sinks.
+ */
+ public void destroy();
+
+ /**
+ * Gets an DMAAP Topic Sink based on topic name.
*
* @param topic the topic name
* @return an DMAAP Topic Sink with topic name
public DmaapTopicSink get(String topic);
/**
- * Provides a snapshot of the DMAAP Topic Sinks
+ * Provides a snapshot of the DMAAP Topic Sinks.
*
* @return a list of the DMAAP Topic Sinks
*/
public List<DmaapTopicSink> inventory();
-
- /**
- * Destroys all DMAAP Topic Sinks
- */
- public void destroy();
}
/* ------------- implementation ----------------- */
/**
- * Factory of DMAAP Reader Topics indexed by topic name
+ * Factory of DMAAP Reader Topics indexed by topic name.
*/
class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
private static final String MISSING_TOPIC = "A topic must be provided";
/**
- * Logger
+ * Logger.
*/
private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSinkFactory.class);
/**
- * DMAAP Topic Name Index
+ * DMAAP Topic Name Index.
*/
protected HashMap<String, DmaapTopicSink> dmaapTopicWriters = new HashMap<>();
@Override
public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
- String password, String partitionKey, String environment, String aftEnvironment, String partner,
- String latitude, String longitude, Map<String, String> additionalProps, boolean managed, boolean useHttps,
- boolean allowSelfSignedCerts) {
+ String password, String partitionKey, String environment, String aftEnvironment,
+ String partner, String latitude, String longitude, Map<String, String> additionalProps,
+ boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
if (topic == null || topic.isEmpty()) {
throw new IllegalArgumentException(MISSING_TOPIC);
public interface DmaapTopicSource extends BusTopicSource {
/**
- * factory for managing and tracking DMAAP sources
+ * factory for managing and tracking DMAAP sources.
*/
public static DmaapTopicSourceFactory factory = new IndexedDmaapTopicSourceFactory();
}
import org.slf4j.LoggerFactory;
/**
- * DMAAP Topic Source Factory
+ * DMAAP Topic Source Factory.
*/
public interface DmaapTopicSourceFactory {
public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
/**
- * Creates an DMAAP Topic Source based on properties files
+ * Creates an DMAAP Topic Source based on properties files.
*
* @param properties Properties containing initialization values
*
public List<DmaapTopicSource> build(Properties properties);
/**
- * Instantiates a new DMAAP Topic Source
+ * Instantiates a new DMAAP Topic Source.
*
* @param servers list of servers
* @param topic topic name
* @throws IllegalArgumentException if invalid parameters are present
*/
public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
- String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
+ String password, String consumerGroup, String consumerInstance,
+ int fetchTimeout, int fetchLimit,
boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
/**
- * Instantiates a new DMAAP Topic Source
+ * Instantiates a new DMAAP Topic Source.
*
* @param servers list of servers
* @param topic topic name
* @throws IllegalArgumentException if invalid parameters are present
*/
public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
- String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
- String environment, String aftEnvironment, String partner, String latitude, String longitude,
- Map<String, String> additionalProps, boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
+ String password, String consumerGroup, String consumerInstance, int fetchTimeout,
+ int fetchLimit, String environment, String aftEnvironment, String partner,
+ String latitude, String longitude, Map<String, String> additionalProps,
+ boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
/**
- * Instantiates a new DMAAP Topic Source
+ * Instantiates a new DMAAP Topic Source.
*
* @param servers list of servers
* @param topic topic name
public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret);
/**
- * Instantiates a new DMAAP Topic Source
+ * Instantiates a new DMAAP Topic Source.
*
* @param servers list of servers
* @param topic topic name
public DmaapTopicSource build(List<String> servers, String topic);
/**
- * Destroys an DMAAP Topic Source based on a topic
+ * Destroys an DMAAP Topic Source based on a topic.
*
* @param topic topic name
* @throws IllegalArgumentException if invalid parameters are present
public void destroy(String topic);
/**
- * Destroys all DMAAP Topic Sources
+ * Destroys all DMAAP Topic Sources.
*/
public void destroy();
/**
- * gets an DMAAP Topic Source based on topic name
+ * Gets an DMAAP Topic Source based on topic name.
*
* @param topic the topic name
* @return an DMAAP Topic Source with topic name
public DmaapTopicSource get(String topic);
/**
- * Provides a snapshot of the DMAAP Topic Sources
+ * Provides a snapshot of the DMAAP Topic Sources.
*
* @return a list of the DMAAP Topic Sources
*/
/* ------------- implementation ----------------- */
/**
- * Factory of DMAAP Source Topics indexed by topic name
+ * Factory of DMAAP Source Topics indexed by topic name.
*/
class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
private static final String MISSING_TOPIC = "A topic must be provided";
/**
- * Logger
+ * Logger.
*/
private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSourceFactory.class);
/**
- * DMaaP Topic Name Index
+ * DMaaP Topic Name Index.
*/
protected HashMap<String, DmaapTopicSource> dmaapTopicSources = new HashMap<>();
uebTopicSource.shutdown();
}
+ @Override
+ public void destroy() {
+ List<DmaapTopicSource> readers = this.inventory();
+ for (DmaapTopicSource reader : readers) {
+ reader.shutdown();
+ }
+
+ synchronized (this) {
+ this.dmaapTopicSources.clear();
+ }
+ }
+
/**
* {@inheritDoc}
*/
return new ArrayList<>(this.dmaapTopicSources.values());
}
- @Override
- public void destroy() {
- List<DmaapTopicSource> readers = this.inventory();
- for (DmaapTopicSource reader : readers) {
- reader.shutdown();
- }
-
- synchronized (this) {
- this.dmaapTopicSources.clear();
- }
- }
-
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
import org.slf4j.LoggerFactory;
/**
- * NOOP topic sink
+ * NOOP topic sink.
*/
public class NoopTopicSink extends TopicBase implements TopicSink {
/**
- * factory
+ * factory.
*/
public static final NoopTopicSinkFactory factory = new IndexedNoopTopicSinkFactory();
/**
- * logger
+ * logger.
*/
private static Logger logger = LoggerFactory.getLogger(NoopTopicSink.class);
/**
- * net logger
+ * net logger.
*/
private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER);
/**
- * constructor
+ * constructor.
*
* @param servers servers
* @param topic topic
import org.slf4j.LoggerFactory;
/**
- * Noop Topic Sink Factory
+ * Noop Topic Sink Factory.
*/
public interface NoopTopicSinkFactory {
/**
- * Creates noop topic sinks based on properties files
+ * Creates noop topic sinks based on properties files.
*
* @param properties Properties containing initialization values
*
public List<NoopTopicSink> build(Properties properties);
/**
- * builds a noop sink
+ * builds a noop sink.
*
* @param servers list of servers
* @param topic topic name
public NoopTopicSink build(List<String> servers, String topic, boolean managed);
/**
- * Destroys a sink based on the topic
+ * Destroys a sink based on the topic.
*
* @param topic topic name
* @throws IllegalArgumentException if invalid parameters are present
public void destroy(String topic);
/**
- * gets a sink based on topic name
+ * Destroys all sinks.
+ */
+ public void destroy();
+
+ /**
+ * gets a sink based on topic name.
*
* @param topic the topic name
*
public NoopTopicSink get(String topic);
/**
- * Provides a snapshot of the UEB Topic Writers
+ * Provides a snapshot of the UEB Topic Writers.
*
* @return a list of the UEB Topic Writers
*/
public List<NoopTopicSink> inventory();
- /**
- * Destroys all sinks
- */
- public void destroy();
}
/* ------------- implementation ----------------- */
/**
- * Factory of noop sinks
+ * Factory of noop sinks.
*/
class IndexedNoopTopicSinkFactory implements NoopTopicSinkFactory {
private static final String MISSING_TOPIC = "A topic must be provided";
/**
- * Logger
+ * Logger.
*/
private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class);
/**
- * noop topic sinks map
+ * noop topic sinks map.
*/
protected HashMap<String, NoopTopicSink> noopTopicSinks = new HashMap<>();
package org.onap.policy.common.endpoints.event.comm.bus;
/**
- * Topic Writer over UEB Infrastructure
+ * Topic Writer over UEB Infrastructure.
*/
public interface UebTopicSink extends BusTopicSink {
/**
- * Factory of UEB Topic Sinks for instantiation and management purposes
+ * Factory of UEB Topic Sinks for instantiation and management purposes.
*/
public static final UebTopicSinkFactory factory = new IndexedUebTopicSinkFactory();
}
import org.slf4j.LoggerFactory;
/**
- * UEB Topic Sink Factory
+ * UEB Topic Sink Factory.
*/
public interface UebTopicSinkFactory {
/**
- * Instantiates a new UEB Topic Writer
+ * Instantiates a new UEB Topic Writer.
*
* @param servers list of servers
* @param topic topic name
boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
/**
- * Creates an UEB Topic Writer based on properties files
+ * Creates an UEB Topic Writer based on properties files.
*
* @param properties Properties containing initialization values
*
public List<UebTopicSink> build(Properties properties);
/**
- * Instantiates a new UEB Topic Writer
+ * Instantiates a new UEB Topic Writer.
*
* @param servers list of servers
* @param topic topic name
public UebTopicSink build(List<String> servers, String topic);
/**
- * Destroys an UEB Topic Writer based on a topic
+ * Destroys an UEB Topic Writer based on a topic.
*
* @param topic topic name
* @throws IllegalArgumentException if invalid parameters are present
public void destroy(String topic);
/**
- * gets an UEB Topic Writer based on topic name
+ * Destroys all UEB Topic Writers.
+ */
+ public void destroy();
+
+ /**
+ * gets an UEB Topic Writer based on topic name.
*
* @param topic the topic name
*
public UebTopicSink get(String topic);
/**
- * Provides a snapshot of the UEB Topic Writers
+ * Provides a snapshot of the UEB Topic Writers.
*
* @return a list of the UEB Topic Writers
*/
public List<UebTopicSink> inventory();
-
- /**
- * Destroys all UEB Topic Writers
- */
- public void destroy();
}
/* ------------- implementation ----------------- */
/**
- * Factory of UEB Reader Topics indexed by topic name
+ * Factory of UEB Reader Topics indexed by topic name.
*/
class IndexedUebTopicSinkFactory implements UebTopicSinkFactory {
private static final String MISSING_TOPIC = "A topic must be provided";
/**
- * Logger
+ * Logger.
*/
private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class);
/**
- * UEB Topic Name Index
+ * UEB Topic Name Index.
*/
protected HashMap<String, UebTopicSink> uebTopicSinks = new HashMap<>();
package org.onap.policy.common.endpoints.event.comm.bus;
/**
- * Topic Source for UEB Communication Infrastructure
+ * Topic Source for UEB Communication Infrastructure.
*
*/
public interface UebTopicSource extends BusTopicSource {
/**
- * factory for managing and tracking UEB readers
+ * factory for managing and tracking UEB readers.
*/
public static UebTopicSourceFactory factory = new IndexedUebTopicSourceFactory();
}
import org.slf4j.LoggerFactory;
/**
- * UEB Topic Source Factory
+ * UEB Topic Source Factory.
*/
public interface UebTopicSourceFactory {
/**
- * Creates an UEB Topic Source based on properties files
+ * Creates an UEB Topic Source based on properties files.
*
* @param properties Properties containing initialization values
*
public List<UebTopicSource> build(Properties properties);
/**
- * Instantiates a new UEB Topic Source
+ * Instantiates a new UEB Topic Source.
*
* @param servers list of servers
* @param topic topic name
* @throws IllegalArgumentException if invalid parameters are present
*/
public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret,
- String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean managed,
+ String consumerGroup, String consumerInstance,
+ int fetchTimeout, int fetchLimit, boolean managed,
boolean useHttps, boolean allowSelfSignedCerts);
/**
- * Instantiates a new UEB Topic Source
+ * Instantiates a new UEB Topic Source.
*
* @param servers list of servers
* @param topic topic name
public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret);
/**
- * Instantiates a new UEB Topic Source
+ * Instantiates a new UEB Topic Source.
*
* @param servers list of servers
* @param topic topic name
public UebTopicSource build(List<String> servers, String topic);
/**
- * Destroys an UEB Topic Source based on a topic
+ * Destroys an UEB Topic Source based on a topic.
*
* @param topic topic name
* @throws IllegalArgumentException if invalid parameters are present
public void destroy(String topic);
/**
- * Destroys all UEB Topic Sources
+ * Destroys all UEB Topic Sources.
*/
public void destroy();
/**
- * gets an UEB Topic Source based on topic name
+ * Gets an UEB Topic Source based on topic name.
*
* @param topic the topic name
* @return an UEB Topic Source with topic name
public UebTopicSource get(String topic);
/**
- * Provides a snapshot of the UEB Topic Sources
+ * Provides a snapshot of the UEB Topic Sources.
*
* @return a list of the UEB Topic Sources
*/
/* ------------- implementation ----------------- */
/**
- * Factory of UEB Source Topics indexed by topic name
+ * Factory of UEB Source Topics indexed by topic name.
*/
class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
private static final String MISSING_TOPIC = "A topic must be provided";
/**
- * Logger
+ * Logger.
*/
private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSourceFactory.class);
/**
- * UEB Topic Name Index
+ * UEB Topic Name Index.
*/
protected HashMap<String, UebTopicSource> uebTopicSources = new HashMap<>();
uebTopicSource.shutdown();
}
+ @Override
+ public void destroy() {
+ List<UebTopicSource> readers = this.inventory();
+ for (UebTopicSource reader : readers) {
+ reader.shutdown();
+ }
+
+ synchronized (this) {
+ this.uebTopicSources.clear();
+ }
+ }
+
/**
* {@inheritDoc}
*/
return new ArrayList<>(this.uebTopicSources.values());
}
- @Override
- public void destroy() {
- List<UebTopicSource> readers = this.inventory();
- for (UebTopicSource reader : readers) {
- reader.shutdown();
- }
-
- synchronized (this) {
- this.uebTopicSources.clear();
- }
- }
-
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
import org.slf4j.LoggerFactory;
/**
- * Wrapper around libraries to consume from message bus
+ * Wrapper around libraries to consume from message bus.
*/
public interface BusConsumer {
/**
- * fetch messages
+ * fetch messages.
*
* @return list of messages
* @throws Exception when error encountered by underlying libraries
public Iterable<String> fetch() throws InterruptedException, IOException;
/**
- * close underlying library consumer
+ * close underlying library consumer.
*/
public void close();
}
/**
- * Cambria based consumer
+ * Cambria based consumer.
*/
public static class CambriaConsumerWrapper implements FilterableBusConsumer {
/**
- * logger
+ * logger.
*/
private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
private final Object consLocker = new Object();
/**
- * Cambria client
+ * Cambria client.
*/
private CambriaConsumer consumer;
/**
- * Cambria client to use for next fetch
+ * Cambria client to use for next fetch.
*/
private CambriaConsumer newConsumer = null;
/**
- * fetch timeout
+ * fetch timeout.
*/
protected int fetchTimeout;
/**
- * close condition
+ * close condition.
*/
protected Object closeCondition = new Object();
/**
- * Cambria Consumer Wrapper
+ * Cambria Consumer Wrapper.
* BusTopicParam object contains the following parameters
- * servers messaging bus hosts
+ * servers messaging bus hosts.
* topic topic
* apiKey API Key
* apiSecret API Secret
* fetchTimeout Fetch Timeout
* fetchLimit Fetch Limit
*
- * @param busTopicParams
- * @throws GeneralSecurityException
- * @throws MalformedURLException
+ * @param busTopicParams - The parameters for the bus topic
+ * @throws GeneralSecurityException - Security exception
+ * @throws MalformedURLException - Malformed URL exception
*/
public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
}
/**
- * MR based consumer
+ * MR based consumer.
*/
public abstract class DmaapConsumerWrapper implements BusConsumer {
/**
- * logger
+ * logger.
*/
private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
protected static final String PROTOCOL_PROP = "Protocol";
/**
- * fetch timeout
+ * fetch timeout.
*/
protected int fetchTimeout;
/**
- * close condition
+ * close condition.
*/
protected Object closeCondition = new Object();
/**
- * MR Consumer
+ * MR Consumer.
*/
protected MRConsumerImpl consumer;
/**
- * MR Consumer Wrapper
- * <p>
- * servers messaging bus hosts
+ * MR Consumer Wrapper.
+ *
+ * <p>servers messaging bus hosts
* topic topic
* apiKey API Key
* apiSecret API Secret
* fetchLimit Fetch Limit
*
* @param busTopicParams contains above listed attributes
- * @throws MalformedURLException
+ * @throws MalformedURLException URL should be valid
*/
public DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
}
/**
- * MR based consumer
+ * MR based consumer.
*/
public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
private final Properties props;
/**
- * BusTopicParams contain the following parameters
- * MR Consumer Wrapper
- * <p>
- * servers messaging bus hosts
+ * BusTopicParams contain the following parameters.
+ * MR Consumer Wrapper.
+ *
+ * <p>servers messaging bus hosts
* topic topic
* apiKey API Key
* apiSecret API Secret
* fetchLimit Fetch Limit
*
* @param busTopicParams contains above listed params
- * @throws MalformedURLException
+ * @throws MalformedURLException URL should be valid
*/
public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
public interface BusPublisher {
/**
- * sends a message
+ * sends a message.
*
* @param partitionId id
* @param message the message
public boolean send(String partitionId, String message);
/**
- * closes the publisher
+ * closes the publisher.
*/
public void close();
/**
- * Cambria based library publisher
+ * Cambria based library publisher.
*/
public static class CambriaPublisherWrapper implements BusPublisher {
private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class);
/**
- * The actual Cambria publisher
+ * The actual Cambria publisher.
*/
@JsonIgnore
protected volatile CambriaBatchingPublisher publisher;
}
/**
- * DmaapClient library wrapper
+ * DmaapClient library wrapper.
*/
public abstract class DmaapPublisherWrapper implements BusPublisher {
private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class);
/**
- * MR based Publisher
+ * MR based Publisher.
*/
protected MRSimplerBatchPublisher publisher;
protected Properties props;
/**
- * MR Publisher Wrapper
+ * MR Publisher Wrapper.
*
* @param servers messaging bus hosts
* @param topic topic
}
/**
- * DmaapClient library wrapper
+ * DmaapClient library wrapper.
*/
public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
/**
- * MR based Publisher
+ * MR based Publisher.
*/
public DmaapAafPublisherWrapper(List<String> servers, String topic, String aafLogin, String aafPassword,
boolean useHttps) {
import org.onap.policy.common.endpoints.event.comm.bus.ApiKeyEnabled;
/**
- * Bus Topic Base
+ * Bus Topic Base.
*/
public abstract class BusTopicBase extends TopicBase implements ApiKeyEnabled {
/**
- * API Key
+ * API Key.
*/
protected String apiKey;
/**
- * API Secret
+ * API Secret.
*/
protected String apiSecret;
/**
- * Use https
+ * Use https.
*/
protected boolean useHttps;
/**
- * allow self signed certificates
+ * allow self signed certificates.
*/
protected boolean allowSelfSignedCerts;
/**
- * Instantiates a new Bus Topic Base
+ * Instantiates a new Bus Topic Base.
*
- * servers list of servers
+ * <p>servers list of servers
* topic topic name
* apiKey API Key
* apiSecret API Secret
* useHttps does connection use HTTPS?
* allowSelfSignedCerts are self-signed certificates allow
- * @param busTopicParams
- * @return a Bus Topic Base
+ * @param busTopicParams holds all our parameters
* @throws IllegalArgumentException if invalid parameters are present
*/
public BusTopicBase(BusTopicParams busTopicParams) {
}
/**
+ * Is using HTTPS.
+ *
* @return if using https
*/
public boolean isUseHttps() {
}
/**
+ * Is self signed certificates allowed.
+ *
* @return if self signed certificates are allowed
*/
public boolean isAllowSelfSignedCerts() {
import java.util.Map;
/**
- * Member variables of this Params class are as follows
- * servers DMaaP servers
+ * Member variables of this Params class are as follows.
+ *
+ * <p>servers DMaaP servers
* topic DMaaP Topic to be monitored
* apiKey DMaaP API Key (optional)
* apiSecret DMaaP API Secret (optional)
package org.onap.policy.common.endpoints.event.comm.bus.internal;
-import java.util.List;
import java.util.UUID;
import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSink;
public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopicSink {
/**
- * loggers
+ * Loggers.
*/
private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER);
/**
- * The partition key to publish to
+ * The partition key to publish to.
*/
protected String partitionId;
/**
- * message bus publisher
+ * Message bus publisher.
*/
protected BusPublisher publisher;
/**
- * constructor for abstract sink
+ * Constructor for abstract sink.
* @param busTopicParams contains below listed attributes
- * servers servers
- * topic topic
- * apiKey api secret
- * apiSecret api secret
- * partitionId partition id
- * useHttps does connection use HTTPS?
- * allowSelfSignedCerts are self-signed certificates allow *
+ * servers servers
+ * topic topic
+ * apiKey api secret
+ * apiSecret api secret
+ * partitionId partition id
+ * useHttps does connection use HTTPS?
+ * allowSelfSignedCerts are self-signed certificates allow *
* @throws IllegalArgumentException in invalid parameters are passed in
*/
public InlineBusTopicSink(BusTopicParams busTopicParams) {
}
/**
- * Initialize the Bus publisher
+ * Initialize the Bus publisher.
*/
public abstract void init();
protected Map<String, String> additionalProps = null;
/**
- * BusTopicParams contains the below mentioned attributes
+ * BusTopicParams contains the below mentioned attributes.
* servers DMaaP servers
* topic DMaaP Topic to be monitored
* apiKey DMaaP API Key (optional)
package org.onap.policy.common.endpoints.event.comm.bus.internal;
-import java.util.List;
-
import org.onap.policy.common.endpoints.event.comm.Topic;
import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink;
import org.slf4j.Logger;
public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSink {
/**
- * logger
+ * Logger.
*/
private static Logger logger = LoggerFactory.getLogger(InlineUebTopicSink.class);
/**
* Argument-based UEB Topic Writer instantiation. BusTopicParams contains below mentioned
- * attributes
+ * attributes.
*
- * servers list of UEB servers available for publishing
+ * <p>servers list of UEB servers available for publishing
* topic the topic to publish to
* apiKey the api key (optional)
* apiSecret the api secret (optional)
}
/**
- * Instantiation of internal resources
+ * Instantiation of internal resources.
*/
@Override
public void init() {
/**
* This topic source implementation specializes in reading messages over a bus topic source and
- * notifying its listeners
+ * notifying its listeners.
*/
public abstract class SingleThreadedBusTopicSource extends BusTopicBase
implements Runnable, BusTopicSource, FilterableTopicSource {
private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER);
/**
- * Bus consumer group
+ * Bus consumer group.
*/
protected final String consumerGroup;
/**
- * Bus consumer instance
+ * Bus consumer instance.
*/
protected final String consumerInstance;
/**
- * Bus fetch timeout
+ * Bus fetch timeout.
*/
protected final int fetchTimeout;
/**
- * Bus fetch limit
+ * Bus fetch limit.
*/
protected final int fetchLimit;
/**
- * Message Bus Consumer
+ * Message Bus Consumer.
*/
protected BusConsumer consumer;
/**
- * Independent thread reading message over my topic
+ * Independent thread reading message over my topic.
*/
protected Thread busPollerThread;
}
/**
- * Initialize the Bus client
+ * Initialize the Bus client.
*/
public abstract void init() throws MalformedURLException;
}
/**
- * Run thread method for the Bus Reader
+ * Run thread method for the Bus Reader.
*/
@Override
public void run() {
/**
* This topic reader implementation specializes in reading messages over DMAAP topic and notifying
- * its listeners
+ * its listeners.
*/
public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource implements DmaapTopicSource, Runnable {
try {
this.init();
} catch (Exception e) {
- logger.error("ERROR during init in dmaap-source: cannot create topic {} because of {}", topic, e.getMessage(), e);
+ logger.error("ERROR during init in dmaap-source: cannot create topic {} because of {}",
+ topic, e.getMessage(), e);
throw new IllegalArgumentException(e);
}
}
/**
- * Initialize the Cambria or MR Client
+ * Initialize the Cambria or MR Client.
*/
@Override
public void init() throws MalformedURLException {
/**
* This topic source implementation specializes in reading messages over an UEB Bus topic source and
- * notifying its listeners
+ * notifying its listeners.
*/
public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource implements UebTopicSource {
/**
+ * Constructor.
+ *
* @param busTopicParams Parameters object containing all the required inputs
* @throws IllegalArgumentException An invalid parameter passed in
*/
}
/**
- * Initialize the Cambria client
+ * Initialize the Cambria client.
*/
@Override
public void init() {
public abstract class TopicBase implements Topic {
/**
- * logger
+ * Logger.
*/
private static Logger logger = LoggerFactory.getLogger(TopicBase.class);
/**
- * list of servers
+ * List of servers.
*/
protected List<String> servers;
/**
- * Topic
+ * Topic.
*/
protected String topic;
/**
- * event cache
+ * Event cache.
*/
protected CircularFifoQueue<String> recentEvents = new CircularFifoQueue<>(10);
/**
* Am I running? reflects invocation of start()/stop() !locked & start() => alive stop() =>
- * !alive
+ * !alive.
*/
protected volatile boolean alive = false;
/**
* Am I locked? reflects invocation of lock()/unlock() operations locked => !alive (but not in
* the other direction necessarily) locked => !offer, !run, !start, !stop (but this last one is
- * obvious since locked => !alive)
+ * obvious since locked => !alive).
*/
protected volatile boolean locked = false;
/**
- * All my subscribers for new message notifications
+ * All my subscribers for new message notifications.
*/
protected final ArrayList<TopicListener> topicListeners = new ArrayList<>();
/**
- * Instantiates a new Topic Base
+ * Instantiates a new Topic Base.
*
* @param servers list of servers
* @param topic topic name
*
- * @return a Topic Base
* @throws IllegalArgumentException if invalid parameters are present
*/
public TopicBase(List<String> servers, String topic) {
}
/**
- * broadcast event to all listeners
+ * Broadcast event to all listeners.
*
* @param message the event
* @return true if all notifications are performed with no error, false otherwise
}
/**
- * take a snapshot of current topic listeners
+ * Take a snapshot of current topic listeners.
*
* @return the topic listeners
*/
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import org.slf4j.LoggerFactory;
/**
- * Http Client Factory
+ * Http Client Factory.
*/
public interface HttpClientFactory {
/**
- * build and http client with the following parameters
+ * Build and http client with the following parameters.
*/
public HttpClient build(String name, boolean https, boolean selfSignedCerts, String hostname, int port,
String baseUrl, String userName, String password, boolean managed)
throws KeyManagementException, NoSuchAlgorithmException;
/**
- * build http client from properties
+ * Build http client from properties.
*/
public List<HttpClient> build(Properties properties) throws KeyManagementException, NoSuchAlgorithmException;
/**
- * get http client
+ * Get http client.
*
* @param name the name
* @return the http client
public HttpClient get(String name);
/**
- * list of http clients
+ * List of http clients.
*
* @return http clients
*/
public List<HttpClient> inventory();
/**
- * destroy by name
+ * Destroy by name.
*
* @param name name
*/
/**
- * http client factory implementation indexed by name
+ * HTTP client factory implementation indexed by name.
*/
class IndexedHttpClientFactory implements HttpClientFactory {
/**
- * Logger
+ * Logger.
*/
private static Logger logger = LoggerFactory.getLogger(IndexedHttpClientFactory.class);
public class JerseyClient implements HttpClient {
/**
- * Logger
+ * Logger.
*/
private static Logger logger = LoggerFactory.getLogger(JerseyClient.class);
import org.onap.policy.common.capabilities.Startable;
/**
- * Http Servlet Server interface
+ * Http Servlet Server interface.
*/
public interface HttpServletServer extends Startable {
/**
- * Factory of Http Servlet Servers
+ * Factory of Http Servlet Servers.
*/
HttpServletServerFactory factory = new IndexedHttpServletServerFactory();
/**
+ * Get the port.
*
* @return port
*/
int getPort();
/**
- * enables basic authentication with user and password on the the relative path relativeUriPath
+ * Enables basic authentication with user and password on the the relative path relativeUriPath.
*
* @param user user
* @param password password
void setBasicAuthentication(String user, String password, String relativeUriPath);
/**
- * adds a filter at the specified path
+ * Adds a filter at the specified path.
*
* @param filterPath filter path
* @param filterClass filter class
void addFilterClass(String filterPath, String filterClass);
/**
- * adds a JAX-RS servlet class to serve REST requests
+ * Adds a JAX-RS servlet class to serve REST requests.
*
* @param servletPath servlet path
* @param restClass JAX-RS API Class
void addServletClass(String servletPath, String restClass);
/**
- * adds a package containing JAX-RS classes to serve REST requests
+ * Adds a package containing JAX-RS classes to serve REST requests.
*
* @param servletPath servlet path
* @param restPackage JAX-RS package to scan
void addServletPackage(String servletPath, String restPackage);
/**
- * blocking start of the http server
+ * Blocking start of the http server.
*
* @param maxWaitTime max time to wait for the start to take place
* @return true if start was successful
import org.slf4j.LoggerFactory;
/**
- * Factory of HTTP Servlet-Enabled Servlets
+ * Factory of HTTP Servlet-Enabled Servlets.
*/
public interface HttpServletServerFactory {
/**
- * builds an http or https server with support for servlets
+ * Builds an http or https server with support for servlets.
*
* @param name name
* @param https use secured http over tls connection
boolean managed);
/**
- * builds an http server with support for servlets
+ * Builds an http server with support for servlets.
*
* @param name name
* @param host binding host
boolean managed);
/**
- * list of http servers per properties
+ * Build a list of http servers per properties.
*
* @param properties properties based configuration
* @return list of http servers
List<HttpServletServer> build(Properties properties);
/**
- * gets a server based on the port
+ * Gets a server based on the port.
*
* @param port port
* @return http server
HttpServletServer get(int port);
/**
- * provides an inventory of servers
+ * Provides an inventory of servers.
*
* @return inventory of servers
*/
List<HttpServletServer> inventory();
/**
- * destroys server bound to a port
+ * Destroys server bound to a port.
*
- * @param port
+ * @param port the port the server is bound to
*/
void destroy(int port);
/**
- * destroys the factory and therefore all servers
+ * Destroys the factory and therefore all servers.
*/
void destroy();
}
/**
- * Indexed factory implementation
+ * Indexed factory implementation.
*/
class IndexedHttpServletServerFactory implements HttpServletServerFactory {
private static final String SPACES_COMMA_SPACES = "\\s*,\\s*";
/**
- * logger
+ * logger.
*/
protected static Logger logger = LoggerFactory.getLogger(IndexedHttpServletServerFactory.class);
/**
- * servers index
+ * servers index.
*/
protected HashMap<Integer, HttpServletServer> servers = new HashMap<>();
@Override
- public synchronized HttpServletServer build(String name, boolean https, String host, int port, String contextPath, boolean swagger,
+ public synchronized HttpServletServer build(String name, boolean https,
+ String host, int port, String contextPath, boolean swagger,
boolean managed) {
if (servers.containsKey(port)) {
import io.swagger.jersey.config.JerseyJaxrsConfig;
/**
- * REST Jetty Server that uses Jersey Servlets to support JAX-RS Web Services
+ * REST Jetty Server that uses Jersey Servlets to support JAX-RS Web Services.
*/
public class JettyJerseyServer extends JettyServletServer {
/**
- * Swagger API Base Path
+ * Swagger API Base Path.
*/
protected static final String SWAGGER_API_BASEPATH = "swagger.api.basepath";
/**
- * Swagger Context ID
+ * Swagger Context ID.
*/
protected static final String SWAGGER_CONTEXT_ID = "swagger.context.id";
/**
- * Swagger Scanner ID
+ * Swagger Scanner ID.
*/
protected static final String SWAGGER_SCANNER_ID = "swagger.scanner.id";
/**
- * Swagger Pretty Print
+ * Swagger Pretty Print.
*/
protected static final String SWAGGER_PRETTY_PRINT = "swagger.pretty.print";
/**
- * Swagger Packages
+ * Swagger Packages.
*/
protected static final String SWAGGER_INIT_PACKAGES_PARAM_VALUE = "io.swagger.jaxrs.listing";
/**
- * Jersey Packages Init Param Name
+ * Jersey Packages Init Param Name.
*/
protected static final String JERSEY_INIT_PACKAGES_PARAM_NAME = "jersey.config.server.provider.packages";
/**
- * Jersey Packages Init Param Value
+ * Jersey Packages Init Param Value.
*/
protected static final String JERSEY_INIT_PACKAGES_PARAM_VALUE = "com.fasterxml.jackson.jaxrs.json";
/**
- * Jersey Classes Init Param Name
+ * Jersey Classes Init Param Name.
*/
protected static final String JERSEY_INIT_CLASSNAMES_PARAM_NAME = "jersey.config.server.provider.classnames";
/**
- * Jersey Jackson Classes Init Param Value
+ * Jersey Jackson Classes Init Param Value.
*/
protected static final String JERSEY_JACKSON_INIT_CLASSNAMES_PARAM_VALUE =
"com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider";
/**
- * Jersey Swagger Classes Init Param Value
+ * Jersey Swagger Classes Init Param Value.
*/
protected static final String SWAGGER_INIT_CLASSNAMES_PARAM_VALUE =
"io.swagger.jaxrs.listing.ApiListingResource," + "io.swagger.jaxrs.listing.SwaggerSerializers";
/**
- * Logger
+ * Logger.
*/
protected static Logger logger = LoggerFactory.getLogger(JettyJerseyServer.class);
/**
- * Container for servlets
+ * Container for servlets.
*/
protected HashMap<String, ServletHolder> servlets = new HashMap<>();
/**
- * Swagger ID
+ * Swagger ID.
*/
protected String swaggerId = null;
/**
- * Constructor
+ * Constructor.
*
* @param name name
* @param https enable https?
}
/**
- * attaches a swagger initialization servlet
+ * Attaches a swagger initialization servlet.
*/
protected void attachSwaggerServlet(boolean https) {
}
/**
- * retrieves cached server based on servlet path
+ * Retrieves cached server based on servlet path.
*
* @param servletPath servlet path
* @return the jetty servlet holder
import org.slf4j.LoggerFactory;
/**
- * Http Server implementation using Embedded Jetty
+ * Http Server implementation using Embedded Jetty.
*/
public abstract class JettyServletServer implements HttpServletServer, Runnable {
/**
- * Keystore/Truststore system property names
+ * Keystore/Truststore system property names.
*/
public static final String SYSTEM_KEYSTORE_PROPERTY_NAME = "javax.net.ssl.keyStore";
public static final String SYSTEM_KEYSTORE_PASSWORD_PROPERTY_NAME = "javax.net.ssl.keyStorePassword";
public static final String SYSTEM_TRUSTSTORE_PASSWORD_PROPERTY_NAME = "javax.net.ssl.trustStorePassword";
/**
- * Logger
+ * Logger.
*/
private static Logger logger = LoggerFactory.getLogger(JettyServletServer.class);
/**
- * server name
+ * Server name.
*/
protected final String name;
/**
- * server host address
+ * Server host address.
*/
protected final String host;
/**
- * server port to bind
+ * Server port to bind.
*/
protected final int port;
/**
- * server auth user name
+ * Server auth user name.
*/
protected String user;
/**
- * server auth password name
+ * Server auth password name.
*/
protected String password;
/**
- * server base context path
+ * Server base context path.
*/
protected final String contextPath;
/**
- * embedded jetty server
+ * Embedded jetty server.
*/
protected final Server jettyServer;
/**
- * servlet context
+ * Servlet context.
*/
protected final ServletContextHandler context;
/**
- * jetty connector
+ * Jetty connector.
*/
protected final ServerConnector connector;
/**
- * jetty thread
+ * Jetty thread.
*/
protected volatile Thread jettyThread;
/**
- * start condition
+ * Start condition.
*/
protected Object startCondition = new Object();
/**
- * constructor
+ * Constructor.
*
* @param name server name
* @param host server host
this.jettyServer = new Server();
this.jettyServer.setRequestLog(new Slf4jRequestLog());
- if (https)
+ if (https) {
this.connector = httpsConnector();
- else
+ } else {
this.connector = httpConnector();
-
+ }
+
this.connector.setName(srvName);
this.connector.setReuseAddress(true);
this.connector.setPort(port);
sslContextFactory.setKeyStorePath(keyStore);
String ksPassword = System.getProperty(SYSTEM_KEYSTORE_PASSWORD_PROPERTY_NAME);
- if (ksPassword != null)
+ if (ksPassword != null) {
sslContextFactory.setKeyStorePassword(ksPassword);
+ }
}
String trustStore = System.getProperty(SYSTEM_TRUSTSTORE_PROPERTY_NAME);
sslContextFactory.setTrustStorePath(trustStore);
String tsPassword = System.getProperty(SYSTEM_TRUSTSTORE_PASSWORD_PROPERTY_NAME);
- if (tsPassword != null)
+ if (tsPassword != null) {
sslContextFactory.setTrustStorePassword(tsPassword);
+ }
}
HttpConfiguration https = new HttpConfiguration();
}
/**
- * jetty server execution
+ * jetty server execution.
*/
@Override
public void run() {
}
/**
+ * Get name.
+ *
* @return the name
*/
public String getName() {
}
/**
+ * Get host.
+ *
* @return the host
*/
public String getHost() {
}
/**
+ * Get user.
+ *
* @return the user
*/
public String getUser() {
}
/**
+ * Get password.
+ *
* @return the password
*/
@JsonIgnore
import org.slf4j.LoggerFactory;
/**
- * HttpServletServer JUNIT tests
+ * HttpServletServer JUNIT tests.
*/
public class HttpServerTest {
/**
- * Logger
+ * Logger.
*/
private static Logger logger = LoggerFactory.getLogger(HttpServerTest.class);
}
/**
- * performs an http request
+ * performs an http request.
*
- * @throws MalformedURLException
- * @throws IOException
- * @throws InterruptedException
+ * @throws MalformedURLException make sure URL is good
+ * @throws IOException thrown is IO exception occurs
+ * @throws InterruptedException thrown if thread interrupted occurs
*/
protected String http(HttpServletServer server, String aUrl)
throws MalformedURLException, IOException, InterruptedException {
URL url = new URL(aUrl);
String response = null;
- int numRetries = 1, maxNumberRetries = 5;
+ int numRetries = 1;
+ int maxNumberRetries = 5;
while (numRetries <= maxNumberRetries) {
try {
response = response(url);
}
/**
- * gets http response
+ * gets http response.
*
* @param url url
*
- * @throws IOException
+ * @throws IOException if an I/O error occurs
*/
protected String response(URL url) throws IOException {
String response = "";
import org.slf4j.LoggerFactory;
/**
- * NOOP Endpoint Tests
+ * NOOP Endpoint Tests.
*/
public class NoopTopicTest implements TopicListener {
/**
- * Logger
+ * Logger.
*/
private static Logger logger = LoggerFactory.getLogger(NoopTopicTest.class);