# ============LICENSE_START=======================================================
# feature-pooling-dmaap
# ================================================================================
-# Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# Topic used for inter-host communication for a particular controller
# pooling.<controller-name>.topic=XXX
-# These specify how the request id is to be extracted from each type of
-# object that may be presented to a controller from shared topics
-# (i.e., topics where hosts do not all receive a copy of the event)
-extractor.requestId.org.onap.policy.controlloop.VirtualControlLoopEvent=${requestId}
-
-
# Each controller that is enabled should have its own topic and the
-# corresponding ueb.xxx properties. However, for now, just assume that
-# the amsterdam-cl and beijing-cl features will not both be enabled
-# at the same time.
-
-pooling.amsterdam.enabled=true
-pooling.amsterdam.topic=${env:POOLING_TOPIC}
-
-pooling.beijing.enabled=true
-pooling.beijing.topic=${env:POOLING_TOPIC}
+# corresponding dmaap.xxx properties. However, for now, just assume that
+# the usecases features will not both be enabled at the same time.
+pooling.usecases.enabled=true
+pooling.usecases.topic=${env:POOLING_TOPIC}
# the list of sources and sinks should be identical
-ueb.source.topics=POOLING_TOPIC
-ueb.sink.topics=POOLING_TOPIC
-
-ueb.source.topics.POOLING_TOPIC.servers=${env:DMAAP_SERVERS}
-ueb.source.topics.POOLING_TOPIC.effectiveTopic=${env:POOLING_TOPIC}
-ueb.source.topics.POOLING_TOPIC.apiKey=
-ueb.source.topics.POOLING_TOPIC.apiSecret=
-
-ueb.sink.topics.POOLING_TOPIC.servers=${env:DMAAP_SERVERS}
-ueb.sink.topics.POOLING_TOPIC.effectiveTopic=${env:POOLING_TOPIC}
-ueb.sink.topics.POOLING_TOPIC.apiKey=
-ueb.sink.topics.POOLING_TOPIC.apiSecret=
+dmaap.source.topics=POOLING_TOPIC
+dmaap.sink.topics=POOLING_TOPIC
+
+dmaap.source.topics.POOLING_TOPIC.servers=${env:DMAAP_SERVERS}
+dmaap.source.topics.POOLING_TOPIC.effectiveTopic=${env:POOLING_TOPIC}
+dmaap.source.topics.POOLING_TOPIC.apiKey=
+dmaap.source.topics.POOLING_TOPIC.apiSecret=
+
+dmaap.sink.topics.POOLING_TOPIC.servers=${env:DMAAP_SERVERS}
+dmaap.sink.topics.POOLING_TOPIC.effectiveTopic=${env:POOLING_TOPIC}
+dmaap.sink.topics.POOLING_TOPIC.apiKey=
+dmaap.sink.topics.POOLING_TOPIC.apiSecret=
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
package org.onap.policy.drools.pooling;
import java.util.List;
-import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
/**
* Topic source whose filter is to be manipulated.
*/
- private final FilterableTopicSource topicSource;
+ private final TopicSource topicSource;
/**
* Where to publish messages.
this.topicSource = findTopicSource();
this.topicSink = findTopicSink();
- // verify that we can set the filter
- setFilter(null);
-
} catch (IllegalArgumentException e) {
logger.error("failed to attach to topic {}", topic);
throw new PoolingFeatureException(e);
* @return the topic source
* @throws PoolingFeatureException if the source doesn't exist or is not filterable
*/
- private FilterableTopicSource findTopicSource() throws PoolingFeatureException {
+ private TopicSource findTopicSource() throws PoolingFeatureException {
for (TopicSource src : getTopicSources()) {
if (topic.equals(src.getTopic())) {
- if (src instanceof FilterableTopicSource) {
- return (FilterableTopicSource) src;
-
- } else {
- throw new PoolingFeatureException("topic source " + topic + " is not filterable");
- }
+ return src;
}
}
topicSource.unregister(listener);
}
- /**
- * Sets the server-side filter to be used by the consumer.
- *
- * @param filter the filter string, or {@code null} if no filter is to be used
- * @throws PoolingFeatureException if the topic is not filterable
- */
- public void setFilter(String filter) throws PoolingFeatureException {
- try {
- logger.debug("change filter for topic {} to {}", topic, filter);
- topicSource.setFilter(filter);
-
- } catch (UnsupportedOperationException e) {
- throw new PoolingFeatureException("cannot filter topic " + topic, e);
- }
- }
-
/**
* Publishes a message to the sink.
*
private final CountDownLatch activeLatch = new CountDownLatch(1);
/**
- * Arguments passed to beforeOffer(), which are saved for when the beforeInsert() is called
- * later. As multiple threads can be active within the methods at the same time, we must keep
- * this in thread local storage.
+ * Topic names passed to beforeOffer(), which are saved for when the beforeInsert() is
+ * called later. As multiple threads can be active within the methods at the same
+ * time, we must keep this in thread local storage.
*/
- private ThreadLocal<OfferArgs> offerArgs = new ThreadLocal<>();
+ private ThreadLocal<String> offerTopics = new ThreadLocal<>();
/**
* Constructor.
return false;
}
- if (mgr.beforeOffer(protocol, topic2, event)) {
+ if (mgr.beforeOffer(topic2, event)) {
return true;
}
- offerArgs.set(new OfferArgs(protocol, topic2, event));
+ offerTopics.set(topic2);
return false;
}
@Override
public boolean beforeInsert(DroolsController droolsController, Object fact) {
- OfferArgs args = offerArgs.get();
- if (args == null) {
+ String topic = offerTopics.get();
+ if (topic == null) {
logger.warn("missing arguments for feature-pooling-dmaap in beforeInsert");
return false;
}
return false;
}
- return mgr.beforeInsert(args.protocol, args.topic, args.event, fact);
+ return mgr.beforeInsert(topic, fact);
}
@Override
boolean success) {
// clear any stored arguments
- offerArgs.remove();
+ offerTopics.remove();
return false;
}
boolean apply(PoolingManagerImpl mgr) throws PoolingFeatureException;
}
- /**
- * Arguments captured from beforeOffer().
- */
- private static class OfferArgs {
-
- /**
- * Protocol of the receiving topic.
- */
- private CommInfrastructure protocol;
-
- /**
- * Topic on which the event was received.
- */
- private String topic;
-
- /**
- * The event text that was received on the topic.
- */
- private String event;
-
- /**
- * Constructor.
- *
- * @param protocol protocol
- * @param topic topic
- * @param event the actual event data received on the topic
- */
- public OfferArgs(CommInfrastructure protocol, String topic, String event) {
- this.protocol = protocol;
- this.topic = topic;
- this.event = event;
- }
- }
-
/*
* The remaining methods may be overridden by junit tests.
*/
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
package org.onap.policy.drools.pooling;
import org.onap.policy.drools.pooling.message.BucketAssignments;
-import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.state.State;
import org.onap.policy.drools.pooling.state.StateTimerTask;
*/
void publish(String channel, Message msg);
- /**
- * Handles a {@link Forward} event that was received from the internal topic.
- *
- * @param event event
- */
- void handle(Forward event);
-
/**
* Schedules a timer to fire after a delay.
*
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import com.google.gson.JsonParseException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.util.Map;
-import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
-import org.onap.policy.common.utils.properties.SpecProperties;
import org.onap.policy.drools.controller.DroolsController;
-import org.onap.policy.drools.pooling.extractor.ClassExtractors;
import org.onap.policy.drools.pooling.message.BucketAssignments;
-import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Leader;
import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
*/
private final PolicyController controller;
- /**
- * Where to offer events that have been forwarded to this host (i.e, the controller).
- */
- private final TopicListener listener;
-
/**
* Decremented each time the manager enters the Active state. Used by junit tests.
*/
*/
private final DmaapManager dmaapMgr;
- /**
- * Used to extract the request id from the decoded message.
- */
- private final ClassExtractors extractors;
-
/**
* Lock used while updating {@link #current}. In general, public methods must use
* this, while private methods assume the lock is already held.
*/
private ScheduledThreadPoolExecutor scheduler = null;
- /**
- * {@code True} if events offered by the controller should be intercepted,
- * {@code false} otherwise.
- */
- private boolean intercept = true;
-
/**
* Constructs the manager, initializing all of the data structures.
*
this.activeLatch = activeLatch;
try {
- this.listener = (TopicListener) controller;
this.serializer = new Serializer();
this.topic = props.getPoolingTopic();
- this.extractors = makeClassExtractors(makeExtractorProps(controller, props.getSource()));
this.dmaapMgr = makeDmaapManager(props.getPoolingTopic());
this.current = new IdleState(this);
return props;
}
- /**
- * Makes properties for configuring extractors.
- *
- * @param controller the controller for which the extractors will be configured
- * @param source properties from which to get the extractor properties
- * @return extractor properties
- */
- private Properties makeExtractorProps(PolicyController controller, Properties source) {
- return new SpecProperties(PoolingProperties.PROP_EXTRACTOR_PREFIX, controller.getName(), source);
- }
-
/**
* Indicates that the controller is about to start. Starts the publisher for the
* internal topic, and creates a thread pool for the timers.
current.cancelTimers();
current = newState;
- // set the filter before starting the state
- setFilter(newState.getFilter());
newState.start();
}
}
- /**
- * Sets the server-side filter for the internal topic.
- *
- * @param filter new filter to be used
- */
- private void setFilter(Map<String, Object> filter) {
- try {
- dmaapMgr.setFilter(serializer.encodeFilter(filter));
-
- } catch (JsonParseException e) {
- logger.error("failed to encode server-side filter for topic {}, {}", topic, filter, e);
-
- } catch (PoolingFeatureException e) {
- logger.error("failed to set server-side filter for topic {}, {}", topic, filter, e);
- }
- }
-
@Override
public CancellableScheduledTask schedule(long delayMs, StateTimerTask task) {
// wrap the task in a TimerAction and schedule it
* and let {@link #beforeInsert(Object, String, String, Object) beforeInsert()} handle
* it instead, as it already has the decoded message.
*
- * @param protocol protocol
* @param topic2 topic
* @param event event
* @return {@code true} if the event was handled by the manager, {@code false} if it
* must still be handled by the invoker
*/
- public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
+ public boolean beforeOffer(String topic2, String event) {
- if (!controller.isLocked() || !intercept) {
+ if (!controller.isLocked()) {
// we should NOT intercept this message - let the invoker handle it
return false;
}
- return handleExternal(protocol, topic2, event, extractRequestId(decodeEvent(topic2, event)));
+ return handleExternal(topic2, decodeEvent(topic2, event));
}
/**
* Called by the DroolsController before it inserts the event into the rule engine.
*
- * @param protocol protocol
* @param topic2 topic
- * @param event original event text, as received from the Bus
- * @param event2 event, as an object
+ * @param event event, as an object
* @return {@code true} if the event was handled by the manager, {@code false} if it
* must still be handled by the invoker
*/
- public boolean beforeInsert(CommInfrastructure protocol, String topic2, String event, Object event2) {
-
- if (!intercept) {
- // we should NOT intercept this message - let the invoker handle it
- return false;
- }
-
- return handleExternal(protocol, topic2, event, extractRequestId(event2));
+ public boolean beforeInsert(String topic2, Object event) {
+ return handleExternal(topic2, event);
}
/**
* Handles an event from an external topic.
*
- * @param protocol protocol
* @param topic2 topic
- * @param event event
- * @param reqid request id extracted from the event, or {@code null} if it couldn't be
- * extracted
+ * @param event event, as an object, or {@code null} if it cannot be decoded
* @return {@code true} if the event was handled by the manager, {@code false} if it
* must still be handled by the invoker
*/
- private boolean handleExternal(CommInfrastructure protocol, String topic2, String event, String reqid) {
- if (reqid == null) {
- // no request id - let the invoker handle it
- return false;
- }
-
- if (reqid.isEmpty()) {
- logger.warn("handle locally due to empty request id for topic {}", topic2);
- // no request id - let the invoker handle it
+ private boolean handleExternal(String topic2, Object event) {
+ if (event == null) {
+ // no event - let the invoker handle it
return false;
}
- Forward ev = makeForward(protocol, topic2, event, reqid);
- if (ev == null) {
- // invalid args - consume the message
- logger.warn("constructed an invalid Forward message on topic {}", getTopic());
- return true;
- }
-
synchronized (curLocker) {
- return handleExternal(ev);
+ return handleExternal(topic2, event, event.hashCode());
}
}
/**
* Handles an event from an external topic.
*
- * @param event event
+ * @param topic2 topic
+ * @param event event, as an object
+ * @param eventHashCode event's hash code
* @return {@code true} if the event was handled, {@code false} if the invoker should
* handle it
*/
- private boolean handleExternal(Forward event) {
+ private boolean handleExternal(String topic2, Object event, int eventHashCode) {
if (assignments == null) {
// no bucket assignments yet - handle locally
- logger.info("handle event locally for request {}", event.getRequestId());
+ logger.info("handle event locally for request {}", event);
// we did NOT consume the event
return false;
} else {
- return handleEvent(event);
+ return handleEvent(topic2, event, eventHashCode);
}
}
/**
* Handles a {@link Forward} event, possibly forwarding it again.
*
- * @param event event
+ * @param topic2 topic
+ * @param event event, as an object
+ * @param eventHashCode event's hash code
* @return {@code true} if the event was handled, {@code false} if the invoker should
* handle it
*/
- private boolean handleEvent(Forward event) {
- String target = assignments.getAssignedHost(event.getRequestId().hashCode());
+ private boolean handleEvent(String topic2, Object event, int eventHashCode) {
+ String target = assignments.getAssignedHost(eventHashCode);
if (target == null) {
/*
* This bucket has no assignment - just discard the event
*/
- logger.warn("discarded event for unassigned bucket from topic {}", event.getTopic());
+ logger.warn("discarded event for unassigned bucket from topic {}", topic2);
return true;
}
/*
* Message belongs to this host - allow the controller to handle it.
*/
- logger.info("handle local event for request {} from topic {}", event.getRequestId(), event.getTopic());
+ logger.info("handle local event for request {} from topic {}", event, topic2);
return false;
}
- // forward to a different host, if hop count has been exhausted
- if (event.getNumHops() > MAX_HOPS) {
- logger.warn("message discarded - hop count {} exceeded {} for topic {}", event.getNumHops(), MAX_HOPS,
- topic);
-
- } else {
- logger.info("forward event hop-count={} from topic {}", event.getNumHops(), event.getTopic());
- event.bumpNumHops();
- publish(target, event);
- }
-
- // either way, consume the event
+ // not our message, consume the event
+ logger.warn("discarded event for host {} from topic {}", target, topic2);
return true;
}
- /**
- * Extract the request id from an event object.
- *
- * @param event the event object, or {@code null}
- * @return the event's request id, or {@code null} if it can't be extracted
- */
- private String extractRequestId(Object event) {
- if (event == null) {
- return null;
- }
-
- Object reqid = extractors.extract(event);
- return (reqid != null ? reqid.toString() : null);
- }
-
/**
* Decodes an event from a String into an event Object.
*
}
}
- /**
- * Makes a {@link Forward}, and validates its contents.
- *
- * @param protocol protocol
- * @param topic2 topic
- * @param event event
- * @param reqid request id
- * @return a new message, or {@code null} if the message was invalid
- */
- private Forward makeForward(CommInfrastructure protocol, String topic2, String event, String reqid) {
- try {
- Forward ev = new Forward(host, protocol, topic2, event, reqid);
-
- // required for the validity check
- ev.setChannel(host);
-
- ev.checkValidity();
-
- return ev;
-
- } catch (PoolingFeatureException e) {
- logger.error("invalid message for topic {}", topic2, e);
- return null;
- }
- }
-
- @Override
- public void handle(Forward event) {
- synchronized (curLocker) {
- if (!handleExternal(event)) {
- // this host should handle it - inject it
- inject(event);
- }
- }
- }
-
- /**
- * Injects an event into the controller.
- *
- * @param event event
- */
- private void inject(Forward event) {
- logger.info("inject event for request {} from topic {}", event.getRequestId(), event.getTopic());
-
- try {
- intercept = false;
- listener.onTopicEvent(event.getProtocol(), event.getTopic(), event.getPayload());
-
- } finally {
- intercept = true;
- }
- }
-
/**
* Handles an event from the internal topic. This uses reflection to identify the
* appropriate process() method to invoke, based on the type of Message that was
}
}
- /*
- * The remaining methods may be overridden by junit tests.
- */
-
- /**
- * Creates object extractors.
- *
- * @param props properties used to configure the extractors
- * @return a new set of extractors
- */
- protected ClassExtractors makeClassExtractors(Properties props) {
- return new ClassExtractors(props, PoolingProperties.PROP_EXTRACTOR_PREFIX,
- PoolingProperties.EXTRACTOR_TYPE);
- }
-
/**
* Creates a DMaaP manager.
*
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
import com.google.gson.JsonParseException;
import java.util.HashMap;
import java.util.Map;
-import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Heartbeat;
import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
private static final Map<String, Class<? extends Message>> type2class = new HashMap<>();
static {
- class2type.put(Forward.class, "forward");
class2type.put(Heartbeat.class, "heartbeat");
class2type.put(Identification.class, "identification");
class2type.put(Leader.class, "leader");
/**
* Encodes a filter.
- *
+ *
* @param filter filter to be encoded
* @return the filter, serialized as a JSON string
*/
/**
* Encodes a message.
- *
+ *
* @param msg message to be encoded
* @return the message, serialized as a JSON string
*/
/**
* Decodes a JSON string into a Message.
- *
+ *
* @param msg JSON string representing the message
* @return the message
*/
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2019-2020 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Extractors for each object class. Properties define how the data is to be
- * extracted for a given class, where the properties are similar to the
- * following:
- *
- * <pre>
- * <code><a.prefix>.<class.name> = ${event.reqid}</code>
- * </pre>
- *
- * <p>For any given field name (e.g., "reqid"), it first looks for a public "getXxx()"
- * method to extract the specified field. If that fails, then it looks for a public field
- * by the given name. If that also fails, and the object is a <i>Map</i> subclass, then it
- * simply uses the "get(field-name)" method to extract the data from the map.
- */
-public class ClassExtractors {
-
- private static final Logger logger = LoggerFactory.getLogger(ClassExtractors.class);
-
- /**
- * Properties that specify how the data is to be extracted from a given
- * class.
- */
- private final Properties properties;
-
- /**
- * Property prefix, including a trailing ".".
- */
- private final String prefix;
-
- /**
- * Type of item to be extracted.
- */
- private final String type;
-
- /**
- * Maps the class name to its extractor.
- */
- private final ConcurrentHashMap<String, Extractor> class2extractor = new ConcurrentHashMap<>();
-
- /**
- * Constructor.
- *
- * @param props properties that specify how the data is to be extracted from
- * a given class
- * @param prefix property name prefix, prepended before the class name
- * @param type type of item to be extracted
- */
- public ClassExtractors(Properties props, String prefix, String type) {
- this.properties = props;
- this.prefix = (prefix.endsWith(".") ? prefix : prefix + ".");
- this.type = type;
- }
-
- /**
- * Gets the number of extractors in the map.
- *
- * @return gets the number of extractors in the map
- */
- protected int size() {
- return class2extractor.size();
- }
-
- /**
- * Extracts the desired data item from an object.
- *
- * @param object object from which to extract the data item
- * @return the extracted item, or {@code null} if it could not be extracted
- */
- public Object extract(Object object) {
- if (object == null) {
- return null;
- }
-
- Extractor ext = getExtractor(object);
-
- return ext.extract(object);
- }
-
- /**
- * Gets the extractor for the given type of object, creating one if it
- * doesn't exist yet.
- *
- * @param object object whose extracted is desired
- * @return an extractor for the object
- */
- private Extractor getExtractor(Object object) {
- Class<?> clazz = object.getClass();
- Extractor ext = class2extractor.get(clazz.getName());
-
- if (ext == null) {
- // allocate a new extractor, if another thread doesn't beat us to it
- ext = class2extractor.computeIfAbsent(clazz.getName(), xxx -> buildExtractor(clazz));
- }
-
- return ext;
- }
-
- /**
- * Builds an extractor for the class.
- *
- * @param clazz class for which the extractor should be built
- *
- * @return a new extractor
- */
- private Extractor buildExtractor(Class<?> clazz) {
- String value = properties.getProperty(prefix + clazz.getName(), null);
- if (value != null) {
- // property has config info for this class - build the extractor
- return buildExtractor(clazz, value);
- }
-
- /*
- * Get the extractor, if any, for the super class or interfaces, but
- * don't add one if it doesn't exist
- */
- Extractor ext = getClassExtractor(clazz, false);
- if (ext != null) {
- return ext;
- }
-
- /*
- * No extractor defined for for this class or its super class - we
- * cannot extract data items from objects of this type, so just
- * allocated a null extractor.
- */
- logger.warn("missing property {}{}", prefix, clazz.getName());
- return new NullExtractor();
- }
-
- /**
- * Builds an extractor for the class, based on the config value extracted
- * from the corresponding property.
- *
- * @param clazz class for which the extractor should be built
- * @param value config value (e.g., "${event.request.id}"
- * @return a new extractor
- */
- private Extractor buildExtractor(Class<?> clazz, String value) {
- if (!value.startsWith("${")) {
- logger.warn("property value for {}{} does not start with {}", prefix, clazz.getName(), "'${'");
- return new NullExtractor();
- }
-
- if (!value.endsWith("}")) {
- logger.warn("property value for {}{} does not end with '}'", prefix, clazz.getName());
- return new NullExtractor();
- }
-
- // get the part in the middle
- String val = value.substring(2, value.length() - 1);
- if (val.startsWith(".")) {
- logger.warn("property value for {}{} begins with '.'", prefix, clazz.getName());
- return new NullExtractor();
- }
-
- if (val.endsWith(".")) {
- logger.warn("property value for {}{} ends with '.'", prefix, clazz.getName());
- return new NullExtractor();
- }
-
- // everything's valid - create the extractor
- try {
- ComponetizedExtractor ext = new ComponetizedExtractor(clazz, val.split("[.]"));
-
- /*
- * If there's only one extractor, then just return it, otherwise
- * return the whole extractor.
- */
- return (ext.extractors.length == 1 ? ext.extractors[0] : ext);
-
- } catch (ExtractorException e) {
- logger.warn("cannot build extractor for {}", clazz.getName(), e);
- return new NullExtractor();
- }
- }
-
- /**
- * Gets the extractor for a class, examining all super classes and
- * interfaces.
- *
- * @param clazz class whose extractor is desired
- * @param addOk {@code true} if the extractor may be added, provided the
- * property is defined, {@code false} otherwise
- * @return the extractor to be used for the class, or {@code null} if no
- * extractor has been defined yet
- */
- private Extractor getClassExtractor(Class<?> clazz, boolean addOk) {
- if (clazz == null) {
- return null;
- }
-
- Extractor ext = null;
-
- if (addOk) {
- String val = properties.getProperty(prefix + clazz.getName(), null);
-
- if (val != null) {
- /*
- * A property is defined for this class, so create the extractor
- * for it.
- */
- return class2extractor.computeIfAbsent(clazz.getName(), xxx -> buildExtractor(clazz));
- }
- }
-
- // see if the superclass has an extractor
- if ((ext = getClassExtractor(clazz.getSuperclass(), true)) != null) {
- return ext;
- }
-
- // check the interfaces, too
- for (Class<?> clz : clazz.getInterfaces()) {
- if ((ext = getClassExtractor(clz, true)) != null) {
- break;
- }
- }
-
- return ext;
- }
-
- /**
- * Extractor that always returns {@code null}. Used when no extractor could
- * be built for a given object type.
- */
- private class NullExtractor implements Extractor {
-
- @Override
- public Object extract(Object object) {
- logger.info("cannot extract {} from {}", type, object.getClass());
- return null;
- }
- }
-
- /**
- * Component-ized extractor. Extracts an object that is referenced
- * hierarchically, where each name identifies a particular component within
- * the hierarchy. Supports retrieval from {@link Map} objects, as well as
- * via getXxx() methods, or by direct field retrieval.
- *
- * <p>Note: this will <i>not</i> work if POJOs are contained within a Map.
- */
- private class ComponetizedExtractor implements Extractor {
-
- /**
- * Extractor for each component.
- */
- private final Extractor[] extractors;
-
- /**
- * Constructor.
- *
- * @param clazz the class associated with the object at the root of the
- * hierarchy
- * @param names name associated with each component
- * @throws ExtractorException extractor exception
- */
- public ComponetizedExtractor(Class<?> clazz, String[] names) throws ExtractorException {
- this.extractors = new Extractor[names.length];
-
- Class<?> clz = clazz;
-
- for (int x = 0; x < names.length; ++x) {
- String comp = names[x];
-
- Pair<Extractor, Class<?>> pair = buildExtractor(clz, comp);
-
- extractors[x] = pair.getLeft();
- clz = pair.getRight();
- }
- }
-
- /**
- * Builds an extractor for the given component of an object.
- *
- * @param clazz type of object from which the component will be
- * extracted
- * @param comp name of the component to extract
- * @return a pair containing the extractor and the extracted object's
- * type
- * @throws ExtractorException extrator exception
- */
- private Pair<Extractor, Class<?>> buildExtractor(Class<?> clazz, String comp) throws ExtractorException {
-
- Pair<Extractor, Class<?>> pair = getMethodExtractor(clazz, comp);
-
- if (pair == null) {
- pair = getFieldExtractor(clazz, comp);
- }
-
- if (pair == null) {
- pair = getMapExtractor(clazz, comp);
- }
-
-
- // didn't find an extractor
- if (pair == null) {
- throw new ExtractorException("class " + clazz + " contains no element " + comp);
- }
-
- return pair;
- }
-
- @Override
- public Object extract(Object object) {
- Object obj = object;
-
- for (Extractor ext : extractors) {
- if (obj == null) {
- break;
- }
-
- obj = ext.extract(obj);
- }
-
- return obj;
- }
-
- /**
- * Gets an extractor that invokes a getXxx() method to retrieve the
- * object.
- *
- * @param clazz container's class
- * @param name name of the property to be retrieved
- * @return a new extractor, or {@code null} if the class does not
- * contain the corresponding getXxx() method
- * @throws ExtractorException if the getXxx() method is inaccessible
- */
- private Pair<Extractor, Class<?>> getMethodExtractor(Class<?> clazz, String name) throws ExtractorException {
- Method meth;
-
- String nm = "get" + StringUtils.capitalize(name);
-
- try {
- meth = clazz.getMethod(nm);
-
- Class<?> retType = meth.getReturnType();
- if (retType == void.class) {
- // it's a void method, thus it won't return an object
- return null;
- }
-
- return Pair.of(new MethodExtractor(meth), retType);
-
- } catch (NoSuchMethodException expected) {
- // no getXxx() method, maybe there's a field by this name
- logger.debug("no method {} in {}", nm, clazz.getName(), expected);
- return null;
-
- } catch (SecurityException e) {
- throw new ExtractorException("inaccessible method " + clazz + "." + nm, e);
- }
- }
-
- /**
- * Gets an extractor for a field within the object.
- *
- * @param clazz container's class
- * @param name name of the field whose value is to be extracted
- * @return a new extractor, or {@code null} if the class does not
- * contain the given field
- * @throws ExtractorException if the field is inaccessible
- */
- private Pair<Extractor, Class<?>> getFieldExtractor(Class<?> clazz, String name) throws ExtractorException {
-
- Field field = getClassField(clazz, name);
- if (field == null) {
- return null;
- }
-
- return Pair.of(new FieldExtractor(field), field.getType());
- }
-
- /**
- * Gets an extractor for an item within a Map object.
- *
- * @param clazz container's class
- * @param key item key within the map
- * @return a new extractor, or {@code null} if the class is not a Map
- * subclass
- */
- private Pair<Extractor, Class<?>> getMapExtractor(Class<?> clazz, String key) {
-
- if (!Map.class.isAssignableFrom(clazz)) {
- return null;
- }
-
- /*
- * Don't know the value's actual type, so we'll assume it's a Map
- * for now. Things should still work OK, as this is only used to
- * direct the constructor on what type of extractor to create next.
- * If the object turns out not to be a map, then the MapExtractor
- * for the next component will just return null.
- */
- return Pair.of(new MapExtractor(key), Map.class);
- }
-
- /**
- * Gets field within a class, examining all super classes and
- * interfaces.
- *
- * @param clazz class whose field is desired
- * @param name name of the desired field
- * @return the field within the class, or {@code null} if the field does
- * not exist
- * @throws ExtractorException if the field is inaccessible
- */
- private Field getClassField(Class<?> clazz, String name) throws ExtractorException {
- if (clazz == null) {
- return null;
- }
-
- try {
- return clazz.getField(name);
-
- } catch (NoSuchFieldException expected) {
- // no field by this name - try super class & interfaces
- logger.debug("no field {} in {}", name, clazz.getName(), expected);
- return null;
-
- } catch (SecurityException e) {
- throw new ExtractorException("inaccessible field " + clazz + "." + name, e);
- }
- }
- }
-}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-/**
- * Used to extract an object contained within another object.
- */
-@FunctionalInterface
-public interface Extractor {
-
- /**
- * Extracts an object contained within another object.
- *
- * @param object object from which to extract the contained object
- * @return the extracted value, or {@code null} if it cannot be extracted
- */
- Object extract(Object object);
-}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-/**
- * Exception generated by extractors.
- */
-public class ExtractorException extends Exception {
- private static final long serialVersionUID = 1L;
-
- public ExtractorException() {
- super();
- }
-
- public ExtractorException(String message) {
- super(message);
- }
-
- public ExtractorException(Throwable cause) {
- super(cause);
- }
-
- public ExtractorException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public ExtractorException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
- super(message, cause, enableSuppression, writableStackTrace);
- }
-
-}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import java.lang.reflect.Field;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Used to extract an object stored in one of the container's fields.
- */
-public class FieldExtractor implements Extractor {
-
- private static final Logger logger = LoggerFactory.getLogger(FieldExtractor.class);
-
- /**
- * Field containing the object.
- */
- private final Field field;
-
- /**
- * Constructor.
- *
- * @param field field containing the object
- */
- public FieldExtractor(Field field) {
- this.field = field;
- }
-
- @Override
- public Object extract(Object object) {
- try {
- return field.get(object);
-
- } catch (IllegalAccessException | IllegalArgumentException e) {
- logger.warn("cannot get {} from {}", field.getName(), object.getClass(), e);
- return null;
- }
- }
-}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Used to extract an object stored in a map.
- */
-public class MapExtractor implements Extractor {
-
- private static final Logger logger = LoggerFactory.getLogger(MapExtractor.class);
-
- /**
- * Key to the item to extract from the map.
- */
- private final String key;
-
- /**
- * Constructor.
- *
- * @param key key to the item to extract from the map
- */
- public MapExtractor(String key) {
- this.key = key;
- }
-
- @Override
- public Object extract(Object object) {
-
- if (object instanceof Map) {
- Map<?, ?> map = (Map<?, ?>) object;
-
- return map.get(key);
-
- } else {
- logger.warn("expecting a map, instead of {}", object.getClass());
- return null;
- }
- }
-}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Used to extract an object by invoking a method on the container.
- */
-public class MethodExtractor implements Extractor {
-
- private static final Logger logger = LoggerFactory.getLogger(MethodExtractor.class);
-
- /**
- * Method to invoke to extract the contained object.
- */
- private final Method method;
-
- /**
- * Constructor.
- *
- * @param method method to invoke to extract the contained object
- */
- public MethodExtractor(Method method) {
- this.method = method;
- }
-
- @Override
- public Object extract(Object object) {
- try {
- return method.invoke(object);
-
- } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
- logger.warn("cannot invoke {} on {}", method.getName(), object.getClass(), e);
- return null;
- }
- }
-}
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/**
* Constructor.
- *
+ *
* @param hostArray maps a bucket number (i.e., array index) to a host. All values
* must be non-null
*/
/**
* Gets the leader, which is the host with the minimum UUID.
- *
+ *
* @return the assignment leader
*/
public String getLeader() {
/**
* Determines if a host has an assignment.
- *
+ *
* @param host host to be checked
* @return {@code true} if the host has an assignment, {@code false} otherwise
*/
/**
* Gets all of the hosts that have an assignment.
- *
+ *
* @return all of the hosts that have an assignment
*/
public Set<String> getAllHosts() {
/**
* Gets the host assigned to a given bucket.
- *
+ *
* @param hashCode hash code of the item whose assignment is desired
* @return the assigned host, or {@code null} if the item has no assigned host
*/
return null;
}
- return hostArray[(hashCode & MAX_BUCKETS_MASK) % hostArray.length];
+ return hostArray[(Math.abs(hashCode) & MAX_BUCKETS_MASK) % hostArray.length];
}
/**
* Gets the number of buckets.
- *
+ *
* @return the number of buckets
*/
public int size() {
/**
* Checks the validity of the assignments, verifying that all buckets have been
* assigned to a host.
- *
+ *
* @throws PoolingFeatureException if the assignments are invalid
*/
public void checkValidity() throws PoolingFeatureException {
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.message;
-
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.drools.pooling.PoolingFeatureException;
-
-/**
- * Message to forward an event to another host.
- */
-public class Forward extends Message {
-
- /**
- * Number of hops (i.e., number of times it's been forwarded) so far.
- */
- private int numHops;
-
- /**
- * Time, in milliseconds, at which the message was created.
- */
- private long createTimeMs;
-
- /**
- * Protocol of the receiving topic.
- */
- private CommInfrastructure protocol;
-
- /**
- * Topic on which the event was received.
- */
- private String topic;
-
- /**
- * The event pay load that was received on the topic.
- */
- private String payload;
-
- /**
- * The request id that was extracted from the event.
- */
- private String requestId;
-
- /**
- * Constructor.
- */
- public Forward() {
- super();
- }
-
- /**
- * Constructor.
- *
- * @param source host on which the message originated
- * @param protocol protocol
- * @param topic topic
- * @param payload the actual event data received on the topic
- * @param requestId request id
- */
- public Forward(String source, CommInfrastructure protocol, String topic, String payload, String requestId) {
- super(source);
-
- this.numHops = 0;
- this.createTimeMs = System.currentTimeMillis();
- this.protocol = protocol;
- this.topic = topic;
- this.payload = payload;
- this.requestId = requestId;
- }
-
- /**
- * Increments {@link #numHops}.
- */
- public void bumpNumHops() {
- ++numHops;
- }
-
- public int getNumHops() {
- return numHops;
- }
-
- public void setNumHops(int numHops) {
- this.numHops = numHops;
- }
-
- public long getCreateTimeMs() {
- return createTimeMs;
- }
-
- public void setCreateTimeMs(long createTimeMs) {
- this.createTimeMs = createTimeMs;
- }
-
- public CommInfrastructure getProtocol() {
- return protocol;
- }
-
- public void setProtocol(CommInfrastructure protocol) {
- this.protocol = protocol;
- }
-
- public String getTopic() {
- return topic;
- }
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
- public String getPayload() {
- return payload;
- }
-
- public void setPayload(String payload) {
- this.payload = payload;
- }
-
- public String getRequestId() {
- return requestId;
- }
-
- public void setRequestId(String requestId) {
- this.requestId = requestId;
- }
-
- public boolean isExpired(long minCreateTimeMs) {
- return (createTimeMs < minCreateTimeMs);
-
- }
-
- @Override
- public void checkValidity() throws PoolingFeatureException {
-
- super.checkValidity();
-
- if (protocol == null) {
- throw new PoolingFeatureException("missing message protocol");
- }
-
- if (topic == null || topic.isEmpty()) {
- throw new PoolingFeatureException("missing message topic");
- }
-
- /*
- * Note: an empty pay load is OK, as an empty message could have been
- * received on the topic.
- */
-
- if (payload == null) {
- throw new PoolingFeatureException("missing message payload");
- }
-
- if (requestId == null || requestId.isEmpty()) {
- throw new PoolingFeatureException("missing message requestId");
- }
-
- if (numHops < 0) {
- throw new PoolingFeatureException("invalid message hop count");
- }
- }
-
-}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.state;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * Filter Utilities. These methods create <i>TreeMap</i> objects, because they
- * should only contain a small number of items.
- */
-public class FilterUtils {
- // message element names
- public static final String MSG_CHANNEL = "channel";
- public static final String MSG_TIMESTAMP = "timestampMs";
-
- // json element names
- protected static final String JSON_CLASS = "class";
- protected static final String JSON_FILTERS = "filters";
- protected static final String JSON_FIELD = "field";
- protected static final String JSON_VALUE = "value";
-
- // values to be stuck into the "class" element
- protected static final String CLASS_OR = "Or";
- protected static final String CLASS_AND = "And";
- protected static final String CLASS_EQUALS = "Equals";
-
- /**
- * Constructor.
- */
- private FilterUtils() {
- super();
- }
-
- /**
- * Makes a filter that verifies that a field equals a value.
- *
- * @param field name of the field to check
- * @param value desired value
- * @return a map representing an "equals" filter
- */
- public static Map<String, Object> makeEquals(String field, String value) {
- Map<String, Object> map = new TreeMap<>();
- map.put(JSON_CLASS, CLASS_EQUALS);
- map.put(JSON_FIELD, field);
- map.put(JSON_VALUE, value);
-
- return map;
- }
-
- /**
- * Makes an "and" filter, where all of the items must be true.
- *
- * @param items items to be checked
- * @return an "and" filter
- */
- public static Map<String, Object> makeAnd(@SuppressWarnings("unchecked") Map<String, Object>... items) {
- Map<String, Object> map = new TreeMap<>();
- map.put(JSON_CLASS, CLASS_AND);
- map.put(JSON_FILTERS, items);
-
- return map;
- }
-
- /**
- * Makes an "or" filter, where at least one of the items must be true.
- *
- * @param items items to be checked
- * @return an "or" filter
- */
- public static Map<String, Object> makeOr(@SuppressWarnings("unchecked") Map<String, Object>... items) {
- Map<String, Object> map = new TreeMap<>();
- map.put(JSON_CLASS, CLASS_OR);
- map.put(JSON_FILTERS, items);
-
- return map;
- }
-}
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
package org.onap.policy.drools.pooling.state;
-import static org.onap.policy.drools.pooling.state.FilterUtils.MSG_CHANNEL;
-import static org.onap.policy.drools.pooling.state.FilterUtils.MSG_TIMESTAMP;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeAnd;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
-
-import java.util.Map;
import org.onap.policy.drools.pooling.PoolingManager;
import org.onap.policy.drools.pooling.message.Heartbeat;
-import org.onap.policy.drools.pooling.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Constructor.
- *
+ *
* @param mgr pooling manager
*/
public StartState(PoolingManager mgr) {
/**
* Get Heart beat time stamp in milliseconds.
- *
+ *
* @return the time stamp inserted into the heart beat message
*/
public long getHbTimestampMs() {
return null;
}
-
- @SuppressWarnings("unchecked")
- @Override
- public Map<String, Object> getFilter() {
- // ignore everything except our most recent heart beat message
- return makeOr(makeEquals(MSG_CHANNEL, Message.ADMIN), makeAnd(makeEquals(MSG_CHANNEL, getHost()),
- makeEquals(MSG_TIMESTAMP, String.valueOf(hbTimestampMs))));
-
- }
-
}
package org.onap.policy.drools.pooling.state;
-import static org.onap.policy.drools.pooling.state.FilterUtils.MSG_CHANNEL;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
-
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import org.onap.policy.drools.pooling.CancellableScheduledTask;
import org.onap.policy.drools.pooling.PoolingManager;
import org.onap.policy.drools.pooling.PoolingProperties;
import org.onap.policy.drools.pooling.message.BucketAssignments;
-import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Heartbeat;
import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
-import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
import org.onap.policy.drools.pooling.message.Query;
import org.slf4j.Logger;
this.mgr = mgr;
}
- /**
- * Gets the server-side filter to use when polling the DMaaP internal topic. The
- * default method returns a filter that accepts messages on the admin channel and on
- * the host's own channel.
- *
- * @return the server-side filter to use.
- */
- @SuppressWarnings("unchecked")
- public Map<String, Object> getFilter() {
- return makeOr(makeEquals(MSG_CHANNEL, Message.ADMIN), makeEquals(MSG_CHANNEL, getHost()));
- }
-
/**
* Cancels the timers added by this state.
*/
return mgr.goInactive();
}
- /**
- * Processes a message. The default method passes it to the manager to handle and
- * returns {@code null}.
- *
- * @param msg message to be processed
- * @return the new state, or {@code null} if the state is unchanged
- */
- public State process(Forward msg) {
- if (getHost().equals(msg.getChannel())) {
- logger.info("received Forward message from {} on topic {}", msg.getSource(), getTopic());
- mgr.handle(msg);
-
- } else {
- logger.info("discard Forward message to {} from {} on topic {}", msg.getChannel(), msg.getSource(),
- getTopic());
- }
-
- return null;
- }
-
/**
* Processes a message. The default method just returns {@code null}.
*
mgr.publishAdmin(msg);
}
- /**
- * Publishes a message on the specified channel.
- *
- * @param channel channel
- * @param msg message to be published
- */
- protected final void publish(String channel, Forward msg) {
- mgr.publish(channel, msg);
- }
-
/**
* Publishes a message on the specified channel.
*
import java.util.concurrent.CountDownLatch;
import org.junit.Before;
import org.junit.Test;
-import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
private static final String EXPECTED = "expected";
private static final String MY_TOPIC = "my.topic";
private static final String MSG = "a message";
- private static final String FILTER = "a filter";
private TopicListener listener;
- private FilterableTopicSource source;
+ private TopicSource source;
private boolean gotSources;
private TopicSink sink;
private boolean gotSinks;
@Before
public void setUp() throws Exception {
listener = mock(TopicListener.class);
- source = mock(FilterableTopicSource.class);
+ source = mock(TopicSource.class);
gotSources = false;
sink = mock(TopicSink.class);
gotSinks = false;
};
}
- @Test(expected = PoolingFeatureException.class)
- public void testDmaapManager_CannotFilter() throws PoolingFeatureException {
- // force an error when setFilter() is called
- doThrow(new UnsupportedOperationException(EXPECTED)).when(source).setFilter(any());
-
- new DmaapManagerImpl(MY_TOPIC);
- }
-
@Test
public void testGetTopic() {
assertEquals(MY_TOPIC, mgr.getTopic());
}
- @Test(expected = PoolingFeatureException.class)
- public void testFindTopicSource_NotFilterableTopicSource() throws PoolingFeatureException {
-
- // matching topic, but doesn't have the correct interface
- TopicSource source2 = mock(TopicSource.class);
- when(source2.getTopic()).thenReturn(MY_TOPIC);
-
- new DmaapManagerImpl(MY_TOPIC) {
- @Override
- protected List<TopicSource> getTopicSources() {
- return Arrays.asList(source2);
- }
- };
- }
-
@Test(expected = PoolingFeatureException.class)
public void testFindTopicSource_NotFound() throws PoolingFeatureException {
// one item in list, and its topic doesn't match
verify(source).unregister(listener);
}
- @Test
- public void testSetFilter() throws PoolingFeatureException {
- assertThatCode(() -> mgr.setFilter(FILTER)).doesNotThrowAnyException();
- }
-
- @Test(expected = PoolingFeatureException.class)
- public void testSetFilter_Exception() throws PoolingFeatureException {
- // force an error when setFilter() is called
- doThrow(new UnsupportedOperationException(EXPECTED)).when(source).setFilter(any());
-
- mgr.setFilter(FILTER);
- }
-
@Test
public void testPublish() throws PoolingFeatureException {
// cannot publish before starting
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2020 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import lombok.Getter;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
import org.onap.policy.common.endpoints.event.comm.Topic;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
import org.onap.policy.drools.controller.DroolsController;
-import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.system.PolicyController;
import org.onap.policy.drools.system.PolicyEngine;
import org.slf4j.Logger;
* Maps a channel to its queue. Does <i>not</i> include the "admin" channel.
*/
private final ConcurrentMap<String, BlockingQueue<String>> channel2queue = new ConcurrentHashMap<>(7);
- /**
- * Queue for the external "DMaaP" topic.
- */
- private final BlockingQueue<String> externalTopic = new LinkedBlockingQueue<String>();
/**
* Counts the number of decode errors.
*/
}
/**
- * Offers an event to the external topic.
+ * Offers an event to the external topic. As each host needs a copy, it is posted
+ * to each Host's queue.
*
* @param event event
*/
public void offerExternal(String event) {
- externalTopic.offer(event);
+ for (Host host : hosts) {
+ host.getExternalTopic().offer(event);
+ }
}
/**
channel2queue.values().forEach(queue -> queue.offer(message));
}
- /**
- * Offers amessage to an internal channel.
- *
- * @param channel channel
- * @param message message
- */
-
- public void offerInternal(String channel, String message) {
- BlockingQueue<String> queue = channel2queue.get(channel);
- if (queue != null) {
- queue.offer(message);
- }
- }
-
/**
* Associates a controller with its drools controller.
*
return drools2policy.get(droolsController);
}
- /**
- * Constructor.
- *
- * @return queue for the external topic
- */
-
- public BlockingQueue<String> getExternalTopic() {
- return externalTopic;
- }
-
/**
* Get decode errors.
*
private final BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>();
+ /**
+ * Queue for the external "DMaaP" topic.
+ */
+ @Getter
+ private final BlockingQueue<String> externalTopic = new LinkedBlockingQueue<String>();
+
/**
* Source that reads from the external topic and posts to the listener.
*/
doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any());
context.addController(controller, drools);
// arrange to read from the external topic
- externalSource = new TopicSourceImpl(context, false);
+ externalSource = new TopicSourceImpl(EXTERNAL_TOPIC, externalTopic);
feature = new PoolingFeatureImpl(context);
}
private static class TopicSinkImpl extends TopicImpl implements TopicSink {
private final Context context;
- /**
- * Used to decode the messages so that the channel can be extracted.
- */
- private final Serializer serializer = new Serializer();
/**
* Constructor.
return false;
}
try {
- Message msg = serializer.decodeMsg(message);
- String channel = msg.getChannel();
- if (Message.ADMIN.equals(channel)) {
- // add to every queue
- context.offerInternal(message);
- } else {
- // add to a specific queue
- context.offerInternal(channel, message);
- }
+ context.offerInternal(message);
return true;
} catch (JsonParseException e) {
logger.warn("could not decode message: {}", message);
* Source implementation that reads from a queue associated with a topic.
*/
- private static class TopicSourceImpl extends TopicImpl implements FilterableTopicSource {
+ private static class TopicSourceImpl extends TopicImpl implements TopicSource {
private final String topic;
/**
/**
* Constructor.
*
- * @param context context
- * @param internal {@code true} if to read from the internal topic, {@code false}
- * to read from the external topic
+ * @param type topic type
+ * @param queue topic from which to read
*/
- public TopicSourceImpl(Context context, boolean internal) {
- if (internal) {
- this.topic = INTERNAL_TOPIC;
- this.queue = context.getCurrentHost().getInternalQueue();
- } else {
- this.topic = EXTERNAL_TOPIC;
- this.queue = context.getExternalTopic();
- }
- }
-
- @Override
- public void setFilter(String filter) {
- logger.info("topic filter set to: {}", filter);
+ public TopicSourceImpl(String type, BlockingQueue<String> queue) {
+ this.topic = type;
+ this.queue = queue;
}
@Override
@Override
protected List<TopicSource> getTopicSources() {
- return Arrays.asList(new TopicSourceImpl(currentContext.get(), true));
+ return Arrays.asList(new TopicSourceImpl(INTERNAL_TOPIC,
+ currentContext.get().getCurrentHost().getInternalQueue()));
}
@Override
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2020 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@Test
public void testBeforeOffer() {
assertFalse(pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1));
- verify(mgr1).beforeOffer(CommInfrastructure.UEB, TOPIC1, EVENT1);
+ verify(mgr1).beforeOffer(TOPIC1, EVENT1);
// ensure that the args were captured
pool.beforeInsert(drools1, OBJECT1);
- verify(mgr1).beforeInsert(CommInfrastructure.UEB, TOPIC1, EVENT1, OBJECT1);
+ verify(mgr1).beforeInsert(TOPIC1, OBJECT1);
// ensure it's still in the map by re-invoking
assertFalse(pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC2, EVENT2));
- verify(mgr1).beforeOffer(CommInfrastructure.UEB, TOPIC2, EVENT2);
+ verify(mgr1).beforeOffer(TOPIC2, EVENT2);
// ensure that the new args were captured
pool.beforeInsert(drools1, OBJECT2);
- verify(mgr1).beforeInsert(CommInfrastructure.UEB, TOPIC2, EVENT2, OBJECT2);
+ verify(mgr1).beforeInsert(TOPIC2, OBJECT2);
assertFalse(pool.beforeOffer(controllerDisabled, CommInfrastructure.UEB, TOPIC1, EVENT1));
public void testBeforeOffer_MgrTrue() {
// manager will return true
- when(mgr1.beforeOffer(any(), any(), any())).thenReturn(true);
+ when(mgr1.beforeOffer(any(), any())).thenReturn(true);
assertTrue(pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1));
- verify(mgr1).beforeOffer(CommInfrastructure.UEB, TOPIC1, EVENT1);
+ verify(mgr1).beforeOffer(TOPIC1, EVENT1);
// ensure it's still in the map by re-invoking
assertTrue(pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC2, EVENT2));
- verify(mgr1).beforeOffer(CommInfrastructure.UEB, TOPIC2, EVENT2);
+ verify(mgr1).beforeOffer(TOPIC2, EVENT2);
assertFalse(pool.beforeOffer(controllerDisabled, CommInfrastructure.UEB, TOPIC1, EVENT1));
}
public void testBeforeInsert() {
pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1);
assertFalse(pool.beforeInsert(drools1, OBJECT1));
- verify(mgr1).beforeInsert(CommInfrastructure.UEB, TOPIC1, EVENT1, OBJECT1);
+ verify(mgr1).beforeInsert(TOPIC1, OBJECT1);
// ensure it's still in the map by re-invoking
pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC2, EVENT2);
assertFalse(pool.beforeInsert(drools1, OBJECT2));
- verify(mgr1).beforeInsert(CommInfrastructure.UEB, TOPIC2, EVENT2, OBJECT2);
+ verify(mgr1).beforeInsert(TOPIC2, OBJECT2);
pool.beforeOffer(controllerDisabled, CommInfrastructure.UEB, TOPIC2, EVENT2);
assertFalse(pool.beforeInsert(droolsDisabled, OBJECT1));
// call beforeInsert without beforeOffer
assertFalse(pool.beforeInsert(drools1, OBJECT1));
- verify(mgr1, never()).beforeInsert(any(), any(), any(), any());
+ verify(mgr1, never()).beforeInsert(any(), any());
assertFalse(pool.beforeInsert(droolsDisabled, OBJECT1));
- verify(mgr1, never()).beforeInsert(any(), any(), any(), any());
+ verify(mgr1, never()).beforeInsert(any(), any());
}
@Test
pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1);
assertFalse(pool.beforeInsert(drools1, OBJECT1));
- verify(mgr1, never()).beforeInsert(any(), any(), any(), any());
+ verify(mgr1, never()).beforeInsert(any(), any());
}
@Test
pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1);
assertFalse(pool.beforeInsert(drools1, OBJECT1));
- verify(mgr1, never()).beforeInsert(any(), any(), any(), any());
+ verify(mgr1, never()).beforeInsert(any(), any());
}
@Test
pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1);
assertFalse(pool.beforeInsert(drools1, OBJECT1));
- verify(mgr1, never()).beforeInsert(any(), any(), any(), any());
+ verify(mgr1, never()).beforeInsert(any(), any());
}
@Test
assertFalse(pool.afterOffer(controller1, CommInfrastructure.UEB, TOPIC2, EVENT2, true));
assertFalse(pool.beforeInsert(drools1, OBJECT1));
- verify(mgr1, never()).beforeInsert(any(), any(), any(), any());
+ verify(mgr1, never()).beforeInsert(any(), any());
assertFalse(pool.beforeInsert(droolsDisabled, OBJECT1));
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.contains;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.drools.controller.DroolsController;
-import org.onap.policy.drools.pooling.extractor.ClassExtractors;
import org.onap.policy.drools.pooling.message.BucketAssignments;
-import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Heartbeat;
import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
private static final String THE_EVENT = "the event";
private static final Object DECODED_EVENT = new Object();
- private static final String REQUEST_ID = "my.request.id";
/**
* Number of dmaap.publish() invocations that should be issued when the manager is
private PoolingProperties poolProps;
private ListeningController controller;
- private ClassExtractors extractors;
private DmaapManager dmaap;
private boolean gotDmaap;
private ScheduledThreadPoolExecutor sched;
ser = new Serializer();
active = new CountDownLatch(1);
- extractors = mock(ClassExtractors.class);
dmaap = mock(DmaapManager.class);
gotDmaap = false;
controller = mock(ListeningController.class);
schedCount = 0;
drools = mock(DroolsController.class);
- when(extractors.extract(DECODED_EVENT)).thenReturn(REQUEST_ID);
-
when(controller.getName()).thenReturn(MY_CONTROLLER);
when(controller.getDrools()).thenReturn(drools);
when(controller.isAlive()).thenReturn(true);
assertEquals(mgr.getHost(), st.getHost());
}
- @Test
- public void testPoolingManagerImpl_ClassEx() {
- /*
- * this controller does not implement TopicListener, which should cause a
- * ClassCastException
- */
- PolicyController ctlr = mock(PolicyController.class);
-
- assertThatThrownBy(() -> new PoolingManagerTest(MY_HOST, ctlr, poolProps, active))
- .isInstanceOf(PoolingFeatureRtException.class).hasCauseInstanceOf(ClassCastException.class);
- }
-
@Test
public void testPoolingManagerImpl_PoolEx() throws PoolingFeatureException {
// throw an exception when we try to create the dmaap manager
startMgr();
mgr.startDistributing(makeAssignments(false));
- Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
- mgr.handle(msg);
- verify(dmaap, times(START_PUB + 1)).publish(any());
+ verify(dmaap, times(START_PUB)).publish(any());
mgr.beforeStop();
verify(dmaap).stopConsumer(mgr);
verify(sched).shutdownNow();
- verify(dmaap, times(START_PUB + 2)).publish(any());
+ verify(dmaap, times(START_PUB + 1)).publish(any());
verify(dmaap).publish(contains("offline"));
assertTrue(mgr.getCurrent() instanceof IdleState);
// verify that next message is handled locally
- mgr.handle(msg);
- verify(dmaap, times(START_PUB + 2)).publish(any());
- verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
+ verify(dmaap, times(START_PUB + 1)).publish(any());
}
@Test
// start should invoke changeState()
startMgr();
- int ntimes = 0;
-
- // should have set the filter for the StartState
- verify(dmaap, times(++ntimes)).setFilter(any());
-
/*
* now go offline while it's locked
*/
lockMgr();
- // should have set the new filter
- verify(dmaap, times(++ntimes)).setFilter(any());
-
// should have cancelled the timers
assertEquals(2, futures.size());
verify(futures.poll()).cancel(false);
*/
unlockMgr();
- // should have set the new filter
- verify(dmaap, times(++ntimes)).setFilter(any());
-
// new timers should now be active
assertEquals(2, futures.size());
verify(futures.poll(), never()).cancel(false);
verify(futures.poll(), never()).cancel(false);
}
- @Test
- public void testSetFilter() throws Exception {
- // start should cause a filter to be set
- startMgr();
-
- verify(dmaap).setFilter(any());
- }
-
- @Test
- public void testSetFilter_DmaapEx() throws Exception {
-
- // generate an exception
- doThrow(new PoolingFeatureException()).when(dmaap).setFilter(any());
-
- // start should invoke setFilter()
- assertThatCode(() -> startMgr()).doesNotThrowAnyException();
-
- // no exception, means success
- }
-
@Test
public void testSchedule() throws Exception {
// must start the scheduler
}
@Test
- public void testBeforeOffer_Unlocked_NoIntercept() throws Exception {
+ public void testBeforeOffer_Unlocked() throws Exception {
startMgr();
- assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
- }
-
- @Test
- public void testBeforeOffer_Locked_NoIntercept() throws Exception {
- startMgr();
-
- lockMgr();
+ // route the message to another host
+ mgr.startDistributing(makeAssignments(false));
- assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
+ assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
}
@Test
- public void testBeforeOffer_Locked_Intercept() throws Exception {
+ public void testBeforeOffer_Locked() throws Exception {
startMgr();
lockMgr();
- // route the message to this host
- mgr.startDistributing(makeAssignments(true));
-
- final CountDownLatch latch = catchRecursion(false);
-
- Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
- mgr.handle(msg);
-
- verify(dmaap, times(START_PUB)).publish(any());
- verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
+ // route the message to another host
+ mgr.startDistributing(makeAssignments(false));
- // ensure we made it past both beforeXxx() methods
- assertEquals(0, latch.getCount());
+ assertTrue(mgr.beforeOffer(TOPIC2, THE_EVENT));
}
@Test
- public void testBeforeInsert_Intercept() throws Exception {
+ public void testBeforeInsert() throws Exception {
startMgr();
lockMgr();
// route the message to this host
mgr.startDistributing(makeAssignments(true));
- final CountDownLatch latch = catchRecursion(true);
-
- Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
- mgr.handle(msg);
-
- verify(dmaap, times(START_PUB)).publish(any());
- verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
-
- // ensure we made it past both beforeXxx() methods
- assertEquals(0, latch.getCount());
- }
-
- @Test
- public void testBeforeInsert_NoIntercept() throws Exception {
- validateUnhandled(CommInfrastructure.UEB);
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
}
@Test
public void testHandleExternalCommInfrastructureStringStringString_InvalidMsg() throws Exception {
startMgr();
- assertTrue(mgr.beforeInsert(null, TOPIC2, THE_EVENT, DECODED_EVENT));
+ assertFalse(mgr.beforeInsert(TOPIC2, "invalid message"));
}
@Test
public void testHandleExternalCommInfrastructureStringStringString() throws Exception {
- validateUnhandled(CommInfrastructure.UEB);
+ validateUnhandled();
}
@Test
public void testHandleExternalForward_NoAssignments() throws Exception {
- validateUnhandled(CommInfrastructure.UEB);
+ validateUnhandled();
}
@Test
@Test
public void testHandleEvent_NullTarget() throws Exception {
// buckets have null targets
- validateHandled(new BucketAssignments(new String[] {null, null}), START_PUB);
+ validateDiscarded(new BucketAssignments(new String[] {null, null}));
}
@Test
}
@Test
- public void testHandleEvent_DiffHost_TooManyHops() throws Exception {
- startMgr();
-
- // route the message to this host
- mgr.startDistributing(makeAssignments(false));
-
- Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
- msg.setNumHops(PoolingManagerImpl.MAX_HOPS + 1);
- mgr.handle(msg);
-
- // shouldn't publish
- verify(dmaap, times(START_PUB)).publish(any());
- verify(controller, never()).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
- }
-
- @Test
- public void testHandleEvent_DiffHost_Forward() throws Exception {
- validateHandled(makeAssignments(false), START_PUB + 1);
- }
-
- @Test
- public void testExtractRequestId_NullEvent() throws Exception {
- startMgr();
-
- assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, null));
- }
-
- @Test
- public void testExtractRequestId_NullReqId() throws Exception {
- validateHandleReqId(null);
- }
-
- @Test
- public void testExtractRequestId() throws Exception {
- startMgr();
-
+ public void testHandleEvent_DiffHost() throws Exception {
// route the message to the *OTHER* host
- mgr.startDistributing(makeAssignments(false));
-
- assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+ validateDiscarded(makeAssignments(false));
}
@Test
// create assignments, though they are irrelevant
mgr.startDistributing(makeAssignments(false));
- assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
+ assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
}
@Test
// create assignments, though they are irrelevant
mgr.startDistributing(makeAssignments(false));
- assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
+ assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
}
@Test
// create assignments, though they are irrelevant
mgr.startDistributing(makeAssignments(false));
- assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
+ assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
}
@Test
// create assignments, though they are irrelevant
mgr.startDistributing(makeAssignments(false));
- assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
+ assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
}
@Test
// route to another host
mgr.startDistributing(makeAssignments(false));
- assertTrue(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
- }
-
- @Test
- public void testMakeForward() throws Exception {
- startMgr();
-
- // route the message to another host
- mgr.startDistributing(makeAssignments(false));
-
- assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
-
- verify(dmaap, times(START_PUB + 1)).publish(any());
- }
-
- @Test
- public void testMakeForward_InvalidMsg() throws Exception {
- startMgr();
-
- // route the message to another host
- mgr.startDistributing(makeAssignments(false));
-
- assertTrue(mgr.beforeInsert(null, TOPIC2, THE_EVENT, DECODED_EVENT));
-
- // should not have tried to publish a message
- verify(dmaap, times(START_PUB)).publish(any());
- }
-
- @Test
- public void testHandle_SameHost() throws Exception {
- startMgr();
-
- // route the message to this host
- mgr.startDistributing(makeAssignments(true));
-
- Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
- mgr.handle(msg);
-
- verify(dmaap, times(START_PUB)).publish(any());
- verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
- }
-
- @Test
- public void testHandle_DiffHost() throws Exception {
- startMgr();
-
- // route the message to this host
- mgr.startDistributing(makeAssignments(false));
-
- Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
- mgr.handle(msg);
-
- verify(dmaap, times(START_PUB + 1)).publish(any());
- verify(controller, never()).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
- }
-
- @Test
- public void testInject() throws Exception {
- startMgr();
-
- // route the message to this host
- mgr.startDistributing(makeAssignments(true));
-
- final CountDownLatch latch = catchRecursion(true);
-
- Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
- mgr.handle(msg);
-
- verify(dmaap, times(START_PUB)).publish(any());
- verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
-
- // ensure we made it past both beforeXxx() methods
- assertEquals(0, latch.getCount());
- }
-
- @Test
- public void testInject_Ex() throws Exception {
- startMgr();
-
- // route the message to this host
- mgr.startDistributing(makeAssignments(true));
-
- // generate RuntimeException when onTopicEvent() is invoked
- doThrow(new IllegalArgumentException("expected")).when(controller).onTopicEvent(any(), any(), any());
-
- final CountDownLatch latch = catchRecursion(true);
-
- Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
- mgr.handle(msg);
-
- verify(dmaap, times(START_PUB)).publish(any());
- verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
-
- // ensure we made it past both beforeXxx() methods
- assertEquals(0, latch.getCount());
+ assertTrue(mgr.beforeOffer(TOPIC2, THE_EVENT));
}
@Test
// null assignments should cause message to be processed locally
mgr.startDistributing(null);
- assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
verify(dmaap, times(START_PUB)).publish(any());
- // route the message to this host
+ // message for this host
mgr.startDistributing(makeAssignments(true));
- assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
- verify(dmaap, times(START_PUB)).publish(any());
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
- // route the message to the other host
+ // message for another host
mgr.startDistributing(makeAssignments(false));
- assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
- verify(dmaap, times(START_PUB + 1)).publish(any());
+ assertTrue(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
}
@Test
private void validateHandleReqId(String requestId) throws PoolingFeatureException {
startMgr();
- when(extractors.extract(any())).thenReturn(requestId);
-
- assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
}
private void validateNoForward() throws PoolingFeatureException {
// route the message to this host
mgr.startDistributing(makeAssignments(true));
- assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
verify(dmaap, times(START_PUB)).publish(any());
}
- private void validateHandled(BucketAssignments assignments, int publishCount) throws PoolingFeatureException {
+ private void validateUnhandled() throws PoolingFeatureException {
startMgr();
-
- // route the message to the *OTHER* host
- mgr.startDistributing(assignments);
-
- assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
-
- verify(dmaap, times(publishCount)).publish(any());
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
}
- private void validateUnhandled(CommInfrastructure infra) throws PoolingFeatureException {
+ private void validateDiscarded(BucketAssignments bucketAssignments) throws PoolingFeatureException {
startMgr();
- assertFalse(mgr.beforeInsert(infra, TOPIC2, THE_EVENT, DECODED_EVENT));
- }
-
- /**
- * Configure the mock controller to act like a real controller, invoking beforeOffer
- * and then beforeInsert, so we can make sure they pass through. We'll keep count to
- * ensure we don't get into infinite recursion.
- *
- * @param invokeBeforeInsert {@code true} if beforeInsert() should be invoked,
- * {@code false} if it should be skipped
- *
- * @return a latch that will be counted down if both beforeXxx() methods return false
- */
- private CountDownLatch catchRecursion(boolean invokeBeforeInsert) {
- CountDownLatch recursion = new CountDownLatch(3);
- CountDownLatch latch = new CountDownLatch(1);
-
- doAnswer(args -> {
-
- recursion.countDown();
- if (recursion.getCount() == 0) {
- fail("recursive calls to onTopicEvent");
- }
-
- int iarg = 0;
- CommInfrastructure proto = args.getArgument(iarg++);
- String topic = args.getArgument(iarg++);
- String event = args.getArgument(iarg++);
-
- if (mgr.beforeOffer(proto, topic, event)) {
- return null;
- }
- if (invokeBeforeInsert && mgr.beforeInsert(proto, topic, event, DECODED_EVENT)) {
- return null;
- }
-
- latch.countDown();
-
- return null;
- }).when(controller).onTopicEvent(any(), any(), any());
+ // buckets have null targets
+ mgr.startDistributing(bucketAssignments);
- return latch;
+ assertTrue(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
}
/**
* @return a new bucket assignment
*/
private BucketAssignments makeAssignments(boolean sameHost) {
- int slot = REQUEST_ID.hashCode() % 2;
+ int slot = DECODED_EVENT.hashCode() % 2;
// slot numbers are 0 and 1 - reverse them if it's for a different host
if (!sameHost) {
*/
private void lockMgr() {
mgr.beforeLock();
+ when(controller.isLocked()).thenReturn(true);
}
/**
*/
private void unlockMgr() {
mgr.afterUnlock();
+ when(controller.isLocked()).thenReturn(false);
}
/**
super(host, controller, props, activeLatch);
}
- @Override
- protected ClassExtractors makeClassExtractors(Properties props) {
- return extractors;
- }
-
@Override
protected DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
gotDmaap = true;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeAnd;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
import com.google.gson.JsonParseException;
-import java.util.Map;
-import java.util.TreeMap;
import org.junit.Test;
import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Query;
assertThatCode(() -> new Serializer()).doesNotThrowAnyException();
}
- @Test
- @SuppressWarnings("unchecked")
- public void testEncodeFilter() throws Exception {
- final Serializer ser = new Serializer();
-
- /*
- * Ensure raw maps serialize as expected. Use a TreeMap so the field
- * order is predictable.
- */
- Map<String, Object> top = new TreeMap<>();
- Map<String, Object> inner = new TreeMap<>();
- top.put("abc", 20);
- top.put("def", inner);
- top.put("ghi", true);
- inner.put("xyz", 30);
- assertEquals("{'abc':20,'def':{'xyz':30},'ghi':true}".replace('\'', '"'), ser.encodeFilter(top));
-
- /*
- * Ensure we can encode a complicated filter without throwing an
- * exception
- */
- Map<String, Object> complexFilter = makeAnd(makeEquals("fieldC", "valueC"),
- makeOr(makeEquals("fieldA", "valueA"), makeEquals("fieldB", "valueB")));
- String val = ser.encodeFilter(complexFilter);
- assertFalse(val.isEmpty());
- }
-
@Test
public void testEncodeMsg_testDecodeMsg() throws Exception {
Serializer ser = new Serializer();
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-
-import java.util.Map;
-import java.util.Properties;
-import java.util.TreeMap;
-import java.util.function.Function;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ClassExtractorsTest {
-
- private static final int NTIMES = 5;
-
- private static final String MY_TYPE = "theType";
- private static final String PROP_PREFIX = "extractor." + MY_TYPE + ".";
-
- private static final String VALUE = "a value";
- private static final Integer INT_VALUE = 10;
- private static final Integer INT_VALUE2 = 20;
-
- private Properties props;
- private ClassExtractors map;
-
- /**
- * Setup.
- *
- */
- @Before
- public void setUp() {
- props = new Properties();
-
- props.setProperty(PROP_PREFIX + Simple.class.getName(), "${intValue}");
- props.setProperty(PROP_PREFIX + WithString.class.getName(), "${strValue}");
-
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
- }
-
- @Test
- public void testExtract() {
- Simple obj = new Simple();
- assertEquals(INT_VALUE, map.extract(obj));
-
- // string value
- assertEquals(VALUE, tryIt(Simple.class, "${strValue}", xxx -> new Simple()));
-
- // null object
- assertNull(map.extract(null));
-
- // values from two different kinds of objects
- props = new Properties();
- props.setProperty(PROP_PREFIX + Simple.class.getName(), "${intValue}");
- props.setProperty(PROP_PREFIX + WithString.class.getName(), "${strValue}");
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
-
- assertEquals(INT_VALUE, map.extract(new Simple()));
- assertEquals(VALUE, map.extract(new Sub()));
-
- // values from a superclass method, but property defined for subclass
- props = new Properties();
- props.setProperty(PROP_PREFIX + Sub.class.getName(), "${strValue}");
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
-
- assertEquals(VALUE, map.extract(new Sub()));
-
- // values from a superclass field, but property defined for subclass
- props = new Properties();
- props.setProperty(PROP_PREFIX + Sub.class.getName(), "${intValue}");
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
-
- assertEquals(INT_VALUE, map.extract(new Sub()));
-
-
- // prefix includes trailing "."
- props = new Properties();
- props.setProperty(PROP_PREFIX + Simple.class.getName(), "${intValue}");
- map = new ClassExtractors(props, PROP_PREFIX.substring(0, PROP_PREFIX.length() - 1), MY_TYPE);
- assertEquals(INT_VALUE, map.extract(new Simple()));
-
-
- // values from an class in a different file
- props = new Properties();
- props.setProperty(PROP_PREFIX + ClassExtractorsTestSupport.class.getName(), "${nested.theValue}");
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
-
- assertEquals(ClassExtractorsTestSupport2.NESTED_VALUE, map.extract(new ClassExtractorsTestSupport()));
- }
-
- @Test
- public void testGetExtractor() {
- Simple obj = new Simple();
-
- // repeat - shouldn't re-create the extractor
- for (int x = 0; x < NTIMES; ++x) {
- assertEquals("x=" + x, INT_VALUE, map.extract(obj));
- assertEquals("x=" + x, 1, map.size());
- }
- }
-
- @Test
- public void testBuildExtractorClass_TopLevel() {
- // extractor defined for top-level class
- props = new Properties();
- props.setProperty(PROP_PREFIX + Sub.class.getName(), "${strValue}");
-
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
- assertEquals(VALUE, map.extract(new Sub()));
-
- // one extractor for top-level class
- assertEquals(1, map.size());
- }
-
- @Test
- public void testBuildExtractorClass_SuperClass() {
- // extractor defined for superclass (interface)
- assertEquals(VALUE, map.extract(new Sub()));
-
- // one extractor for top-level class and one for interface
- assertEquals(2, map.size());
- }
-
- @Test
- public void testBuildExtractorClass_NotDefined() {
- // no extractor defined for "this" class
- assertNull(map.extract(this));
-
- // one NULL extractor for top-level class
- assertEquals(1, map.size());
- }
-
- @Test
- public void testBuildExtractorClassString() {
- // no leading "${"
- assertNull(tryIt(Simple.class, "intValue}", xxx -> new Simple()));
-
- // no trailing "}"
- assertNull(tryIt(Simple.class, "${intValue", xxx -> new Simple()));
-
- // leading "."
- assertNull(tryIt(Sub.class, "${.simple.strValue}", xxx -> new Sub()));
-
- // trailing "."
- assertNull(tryIt(Sub.class, "${simple.strValue.}", xxx -> new Sub()));
-
- // one component
- assertEquals(VALUE, tryIt(Sub.class, "${strValue}", xxx -> new Sub()));
-
- // two components
- assertEquals(VALUE, tryIt(Sub.class, "${simple.strValue}", xxx -> new Sub()));
-
- // invalid component
- assertNull(tryIt(Sub.class, "${unknown}", xxx -> new Sub()));
- }
-
- @Test
- public void testGetClassExtractor_InSuper() {
- // field in the superclass
- assertEquals(INT_VALUE, tryIt(Super.class, "${intValue}", xxx -> new Sub()));
- }
-
- @Test
- public void testGetClassExtractor_InInterface() {
- // defined in the interface
- assertEquals(VALUE, map.extract(new Sub()));
- }
-
- @Test
- public void testNullExtractorExtract() {
- // empty properties - should only create NullExtractor
- map = new ClassExtractors(new Properties(), PROP_PREFIX, MY_TYPE);
-
- Simple obj = new Simple();
-
- // repeat - shouldn't re-create the extractor
- for (int x = 0; x < NTIMES; ++x) {
- assertNull("x=" + x, map.extract(obj));
- assertEquals("x=" + x, 1, map.size());
- }
- }
-
- @Test
- public void testComponetizedExtractor() {
- // one component
- assertEquals(VALUE, tryIt(Sub.class, "${strValue}", xxx -> new Sub()));
-
- // three components
- assertEquals(VALUE, tryIt(Sub.class, "${cont.data.strValue}", xxx -> new Sub()));
- }
-
- @Test
- public void testComponetizedExtractorBuildExtractor_Method() {
- assertEquals(INT_VALUE, tryIt(Simple.class, "${intValue}", xxx -> new Simple()));
- }
-
- @Test
- public void testComponetizedExtractorBuildExtractor_Field() {
- assertEquals(VALUE, tryIt(Simple.class, "${strValue}", xxx -> new Simple()));
- }
-
- @Test
- public void testComponetizedExtractorBuildExtractor_Map() {
- Map<String, Object> inner = new TreeMap<>();
- inner.put("inner1", "abc1");
- inner.put("inner2", "abc2");
-
- Map<String, Object> outer = new TreeMap<>();
- outer.put("outer1", "def1");
- outer.put("outer2", inner);
-
- Simple obj = new Simple();
-
- props.setProperty(PROP_PREFIX + Simple.class.getName(), "${mapValue}");
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
- assertEquals(null, map.extract(obj));
-
- obj.mapValue = outer;
- props.setProperty(PROP_PREFIX + Simple.class.getName(), "${mapValue.outer2.inner2}");
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
- assertEquals("abc2", map.extract(obj));
- }
-
- @Test
- public void testComponetizedExtractorBuildExtractor_Unknown() {
- assertNull(tryIt(Simple.class, "${unknown2}", xxx -> new Simple()));
- }
-
- @Test
- public void testComponetizedExtractorExtract_MiddleNull() {
- // data component is null
- assertEquals(null, tryIt(Sub.class, "${cont.data.strValue}", xxx -> {
- Sub obj = new Sub();
- obj.cont.simpleValue = null;
- return obj;
- }));
- }
-
- @Test
- public void testComponetizedExtractorGetMethodExtractor_VoidMethod() {
- // tell it to use getVoidValue()
- props.setProperty(PROP_PREFIX + Simple.class.getName(), "${voidValue}");
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
-
- Simple obj = new Simple();
- assertNull(map.extract(obj));
-
- assertFalse(obj.voidInvoked);
- }
-
- @Test
- public void testComponetizedExtractorGetMethodExtractor() {
- assertEquals(INT_VALUE, map.extract(new Simple()));
- }
-
- @Test
- public void testComponetizedExtractorGetFieldExtractor() {
- // use a field
- assertEquals(VALUE, tryIt(Simple.class, "${strValue}", xxx -> new Simple()));
- }
-
- @Test
- public void testComponetizedExtractorGetMapExtractor() {
- Map<String, Object> inner = new TreeMap<>();
- inner.put("inner1", "abc1");
- inner.put("inner2", "abc2");
-
- Map<String, Object> outer = new TreeMap<>();
- outer.put("outer1", "def1");
- outer.put("outer2", inner);
-
- Simple obj = new Simple();
-
- obj.mapValue = outer;
- props.setProperty(PROP_PREFIX + Simple.class.getName(), "${mapValue.outer2.inner2}");
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
- assertEquals("abc2", map.extract(obj));
- }
-
- @Test
- public void testComponetizedExtractorGetMapExtractor_MapSubclass() {
- Map<String, Object> inner = new TreeMap<>();
- inner.put("inner1", "abc1");
- inner.put("inner2", "abc2");
-
- MapSubclass outer = new MapSubclass();
- outer.put("outer1", "def1");
- outer.put("outer2", inner);
-
- Simple obj = new Simple();
-
- props.setProperty(PROP_PREFIX + Simple.class.getName(), "${mapValue}");
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
- assertEquals(null, map.extract(obj));
-
- obj.mapValue = outer;
- props.setProperty(PROP_PREFIX + Simple.class.getName(), "${mapValue.outer2.inner2}");
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
- assertEquals("abc2", map.extract(obj));
- }
-
- /**
- * Sets a property for the given class, makes an object, and then returns
- * the value extracted.
- *
- * @param clazz class whose property is to be set
- * @param propval value to which to set the property
- * @param makeObj function to create the object whose data is to be
- * extracted
- * @return the extracted data, or {@code null} if nothing was extracted
- */
- private Object tryIt(Class<?> clazz, String propval, Function<Void, Object> makeObj) {
- Properties props = new Properties();
- props.setProperty(PROP_PREFIX + clazz.getName(), propval);
-
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
-
- return map.extract(makeObj.apply(null));
- }
-
- /**
- * A Map subclass, used to verify that getMapExtractor() still handles it.
- */
- private static class MapSubclass extends TreeMap<String, Object> {
- private static final long serialVersionUID = 1L;
-
- }
-
- /**
- * A simple class.
- */
- private static class Simple {
-
- /**
- * This will not be used because getIntValue() will override it.
- */
- @SuppressWarnings("unused")
- public final int intValue = INT_VALUE2;
-
- /**
- * Used to verify retrieval via a field name.
- */
- @SuppressWarnings("unused")
- public final String strValue = VALUE;
-
- /**
- * Used to verify retrieval within maps.
- */
- @SuppressWarnings("unused")
- public Map<String, Object> mapValue = null;
-
- /**
- * {@code True} if {@link #getVoidValue()} was invoked, {@code false}
- * otherwise.
- */
- private boolean voidInvoked = false;
-
- /**
- * This function will supercede the value in the "intValue" field.
- *
- * @return INT_VALUE
- */
- @SuppressWarnings("unused")
- public Integer getIntValue() {
- return INT_VALUE;
- }
-
- /**
- * Used to verify that void functions are not invoked.
- */
- @SuppressWarnings("unused")
- public void getVoidValue() {
- voidInvoked = true;
- }
- }
-
- /**
- * Used to verify multi-component retrieval.
- */
- private static class Container {
- public Simple simpleValue = new Simple();
-
- @SuppressWarnings("unused")
- public Simple getData() {
- return simpleValue;
- }
- }
-
- /**
- * Used to verify extraction when the property refers to an interface.
- */
- private static interface WithString {
-
- String getStrValue();
- }
-
- /**
- * Used to verify retrieval within a superclass.
- */
- private static class Super implements WithString {
-
- @SuppressWarnings("unused")
- public final int intValue = INT_VALUE;
-
- @Override
- public String getStrValue() {
- return VALUE;
- }
- }
-
- /**
- * Used to verify retrieval within a subclass.
- */
- private static class Sub extends Super {
-
- @SuppressWarnings("unused")
- public final Simple simple = new Simple();
-
- /**
- * Used to verify multi-component retrieval.
- */
- public final Container cont = new Container();
- }
-}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-/**
- * Used to test extractors.
- */
-public class ClassExtractorsTestSupport {
-
- private ClassExtractorsTestSupport2 nested = new ClassExtractorsTestSupport2();
-
- /**
- * Constructor.
- */
- public ClassExtractorsTestSupport() {
- super();
- }
-
- public ClassExtractorsTestSupport2 getNested() {
- return nested;
- }
-}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-/**
- * Used to test extractors.
- */
-public class ClassExtractorsTestSupport2 {
-
- public static final int NESTED_VALUE = 30;
-
- public final int theValue = NESTED_VALUE;
-}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import static org.junit.Assert.assertEquals;
-
-import org.junit.Test;
-import org.onap.policy.common.utils.test.ExceptionsTester;
-
-public class ExtractorExceptionTest extends ExceptionsTester {
-
- @Test
- public void test() {
- assertEquals(5, test(ExtractorException.class));
- }
-
-}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-import java.lang.reflect.Field;
-import org.junit.Before;
-import org.junit.Test;
-
-public class FieldExtractorTest {
-
- private static final String VALUE = "the value";
- private static final Integer INT_VALUE = 10;
-
- private Field field;
- private FieldExtractor ext;
-
- @Before
- public void setUp() throws Exception {
- field = MyClass.class.getDeclaredField("value");
- ext = new FieldExtractor(field);
- }
-
- @Test
- public void testExtract() throws Exception {
- assertEquals(VALUE, ext.extract(new MyClass()));
-
- // repeat
- assertEquals(VALUE, ext.extract(new MyClass()));
-
- // null value
- MyClass obj = new MyClass();
- obj.value = null;
- assertEquals(null, ext.extract(obj));
-
- obj.value = VALUE + "X";
- assertEquals(VALUE + "X", ext.extract(obj));
-
- // different value type
- field = MyClass.class.getDeclaredField("value2");
- ext = new FieldExtractor(field);
- assertEquals(INT_VALUE, ext.extract(new MyClass()));
- }
-
- @Test
- public void testExtract_ArgEx() {
- // pass it the wrong class type
- assertNull(ext.extract(this));
- }
-
- private static class MyClass {
- @SuppressWarnings("unused")
- public String value = VALUE;
-
- @SuppressWarnings("unused")
- public int value2 = INT_VALUE;
- }
-}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.junit.Before;
-import org.junit.Test;
-
-public class MapExtractorTest {
- private static final String KEY = "a.key";
- private static final String VALUE = "a.value";
-
- private MapExtractor ext;
-
- @Before
- public void setUp() {
- ext = new MapExtractor(KEY);
- }
-
- @Test
- public void testExtract_NotAMap() {
-
- // object is not a map (i.e., it's a String)
- assertNull(ext.extract(KEY));
- }
-
- @Test
- public void testExtract_MissingValue() {
-
- Map<String, Object> map = new HashMap<>();
- map.put(KEY + "x", VALUE + "x");
-
- // object is a map, but doesn't have the key
- assertNull(ext.extract(map));
- }
-
- @Test
- public void testExtract() {
-
- Map<String, Object> map = new HashMap<>();
- map.put(KEY + "x", VALUE + "x");
- map.put(KEY, VALUE);
-
- // object is a map and contains the key
- assertEquals(VALUE, ext.extract(map));
-
- // change to value to a different type
- map.put(KEY, 20);
- assertEquals(20, ext.extract(map));
- }
-
-}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-import java.lang.reflect.Method;
-import org.junit.Before;
-import org.junit.Test;
-
-public class MethodExtractorTest {
-
- private static final String VALUE = "the value";
- private static final Integer INT_VALUE = 10;
-
- private Method meth;
- private MethodExtractor ext;
-
- @Before
- public void setUp() throws Exception {
- meth = MyClass.class.getMethod("getValue");
- ext = new MethodExtractor(meth);
- }
-
- @Test
- public void testExtract() throws Exception {
- assertEquals(VALUE, ext.extract(new MyClass()));
-
- // repeat
- assertEquals(VALUE, ext.extract(new MyClass()));
-
- // null value
- MyClass obj = new MyClass();
- meth = MyClass.class.getMethod("getNullValue");
- ext = new MethodExtractor(meth);
- assertEquals(null, ext.extract(obj));
-
- // different value type
- meth = MyClass.class.getMethod("getIntValue");
- ext = new MethodExtractor(meth);
- assertEquals(INT_VALUE, ext.extract(new MyClass()));
- }
-
- @Test
- public void testExtract_ArgEx() {
- // pass it the wrong class type
- assertNull(ext.extract(this));
- }
-
- @Test
- public void testExtract_InvokeEx() throws Exception {
- // invoke method that throws an exception
- meth = MyClass.class.getMethod("throwException");
- ext = new MethodExtractor(meth);
- assertEquals(null, ext.extract(new MyClass()));
- }
-
- private static class MyClass {
-
- @SuppressWarnings("unused")
- public String getValue() {
- return VALUE;
- }
-
- @SuppressWarnings("unused")
- public int getIntValue() {
- return INT_VALUE;
- }
-
- @SuppressWarnings("unused")
- public String getNullValue() {
- return null;
- }
-
- @SuppressWarnings("unused")
- public String throwException() {
- throw new IllegalStateException("expected");
- }
- }
-
-}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.message;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import org.junit.Test;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-
-public class ForwardTest extends SupportBasicMessageTester<Forward> {
- // values set by makeValidMessage()
- public static final CommInfrastructure VALID_PROTOCOL = CommInfrastructure.UEB;
- public static final int VALID_HOPS = 0;
- public static final String VALID_TOPIC = "topicA";
- public static final String VALID_PAYLOAD = "payloadA";
- public static final String VALID_REQUEST_ID = "requestIdA";
-
- /**
- * Time, in milliseconds, after which the most recent message was created.
- */
- private static long tcreateMs;
-
- public ForwardTest() {
- super(Forward.class);
- }
-
- @Test
- public void testBumpNumHops() {
- Forward msg = makeValidMessage();
-
- for (int x = 0; x < 3; ++x) {
- assertEquals("x=" + x, x, msg.getNumHops());
- msg.bumpNumHops();
- }
- }
-
- @Test
- public void testGetNumHops_testSetNumHops() {
- Forward msg = makeValidMessage();
-
- // from constructor
- assertEquals(VALID_HOPS, msg.getNumHops());
-
- msg.setNumHops(5);
- assertEquals(5, msg.getNumHops());
-
- msg.setNumHops(7);
- assertEquals(7, msg.getNumHops());
- }
-
- @Test
- public void testGetCreateTimeMs_testSetCreateTimeMs() {
- Forward msg = makeValidMessage();
-
- // from constructor
- assertTrue(msg.getCreateTimeMs() >= tcreateMs);
-
- msg.setCreateTimeMs(1000L);
- assertEquals(1000L, msg.getCreateTimeMs());
-
- msg.setCreateTimeMs(2000L);
- assertEquals(2000L, msg.getCreateTimeMs());
- }
-
- @Test
- public void testGetProtocol_testSetProtocol() {
- Forward msg = makeValidMessage();
-
- // from constructor
- assertEquals(CommInfrastructure.UEB, msg.getProtocol());
-
- msg.setProtocol(CommInfrastructure.DMAAP);
- assertEquals(CommInfrastructure.DMAAP, msg.getProtocol());
-
- msg.setProtocol(CommInfrastructure.UEB);
- assertEquals(CommInfrastructure.UEB, msg.getProtocol());
- }
-
- @Test
- public void testGetTopic_testSetTopic() {
- Forward msg = makeValidMessage();
-
- // from constructor
- assertEquals(VALID_TOPIC, msg.getTopic());
-
- msg.setTopic("topicX");
- assertEquals("topicX", msg.getTopic());
-
- msg.setTopic("topicY");
- assertEquals("topicY", msg.getTopic());
- }
-
- @Test
- public void testGetPayload_testSetPayload() {
- Forward msg = makeValidMessage();
-
- // from constructor
- assertEquals(VALID_PAYLOAD, msg.getPayload());
-
- msg.setPayload("payloadX");
- assertEquals("payloadX", msg.getPayload());
-
- msg.setPayload("payloadY");
- assertEquals("payloadY", msg.getPayload());
- }
-
- @Test
- public void testGetRequestId_testSetRequestId() {
- Forward msg = makeValidMessage();
-
- // from constructor
- assertEquals(VALID_REQUEST_ID, msg.getRequestId());
-
- msg.setRequestId("reqX");
- assertEquals("reqX", msg.getRequestId());
-
- msg.setRequestId("reqY");
- assertEquals("reqY", msg.getRequestId());
- }
-
- @Test
- public void testIsExpired() {
- Forward msg = makeValidMessage();
-
- long tcreate = msg.getCreateTimeMs();
- assertTrue(msg.isExpired(tcreate + 1));
- assertTrue(msg.isExpired(tcreate + 10));
-
- assertFalse(msg.isExpired(tcreate));
- assertFalse(msg.isExpired(tcreate - 1));
- assertFalse(msg.isExpired(tcreate - 10));
- }
-
- @Test
- public void testCheckValidity_InvalidFields() throws Exception {
- // null source (i.e., superclass field)
- expectCheckValidityFailure(msg -> msg.setSource(null));
-
- // null protocol
- expectCheckValidityFailure(msg -> msg.setProtocol(null));
-
- // null or empty topic
- expectCheckValidityFailure_NullOrEmpty((msg, value) -> msg.setTopic(value));
-
- // null payload
- expectCheckValidityFailure(msg -> msg.setPayload(null));
-
- // empty payload should NOT throw an exception
- Forward forward = makeValidMessage();
- forward.setPayload("");
- forward.checkValidity();
-
- // null or empty requestId
- expectCheckValidityFailure_NullOrEmpty((msg, value) -> msg.setRequestId(value));
-
- // invalid hop count
- expectCheckValidityFailure(msg -> msg.setNumHops(-1));
- }
-
- @Override
- public Forward makeValidMessage() {
- tcreateMs = System.currentTimeMillis();
-
- Forward msg = new Forward(VALID_HOST, VALID_PROTOCOL, VALID_TOPIC, VALID_PAYLOAD, VALID_REQUEST_ID);
- msg.setChannel(VALID_CHANNEL);
-
- return msg;
- }
-
- @Override
- public void testDefaultConstructorFields(Forward msg) {
- super.testDefaultConstructorFields(msg);
-
- assertEquals(VALID_HOPS, msg.getNumHops());
- assertEquals(0, msg.getCreateTimeMs());
- assertNull(msg.getPayload());
- assertNull(msg.getProtocol());
- assertNull(msg.getRequestId());
- assertNull(msg.getTopic());
- }
-
- @Override
- public void testValidFields(Forward msg) {
- super.testValidFields(msg);
-
- assertEquals(VALID_HOPS, msg.getNumHops());
- assertTrue(msg.getCreateTimeMs() >= tcreateMs);
- assertEquals(VALID_PAYLOAD, msg.getPayload());
- assertEquals(VALID_PROTOCOL, msg.getProtocol());
- assertEquals(VALID_REQUEST_ID, msg.getRequestId());
- assertEquals(VALID_TOPIC, msg.getTopic());
- }
-}
import static org.mockito.Mockito.when;
import java.util.Arrays;
-import java.util.Map;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.junit.Before;
import org.onap.policy.drools.pooling.message.BucketAssignments;
import org.onap.policy.drools.pooling.message.Heartbeat;
import org.onap.policy.drools.pooling.message.Leader;
-import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
import org.onap.policy.drools.pooling.message.Query;
assertEquals(MY_HOST, msg.getRight().getSource());
}
- @Test
- public void testGetFilter() {
- Map<String, Object> filter = state.getFilter();
-
- FilterUtilsTest utils = new FilterUtilsTest();
-
- utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
- utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
- utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
- }
-
@Test
public void testProcessHeartbeat_NullHost() {
assertNull(state.process(new Heartbeat()));
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.state;
-
-import static org.junit.Assert.assertEquals;
-import static org.onap.policy.drools.pooling.state.FilterUtils.CLASS_AND;
-import static org.onap.policy.drools.pooling.state.FilterUtils.CLASS_EQUALS;
-import static org.onap.policy.drools.pooling.state.FilterUtils.CLASS_OR;
-import static org.onap.policy.drools.pooling.state.FilterUtils.JSON_CLASS;
-import static org.onap.policy.drools.pooling.state.FilterUtils.JSON_FIELD;
-import static org.onap.policy.drools.pooling.state.FilterUtils.JSON_FILTERS;
-import static org.onap.policy.drools.pooling.state.FilterUtils.JSON_VALUE;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeAnd;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
-
-import java.util.Map;
-import org.junit.Test;
-
-public class FilterUtilsTest {
-
- @Test
- public void testMakeEquals() {
- checkEquals("abc", "def", makeEquals("abc", "def"));
- }
-
- @Test
- public void testMakeAnd() {
- @SuppressWarnings("unchecked")
- Map<String, Object> filter =
- makeAnd(makeEquals("an1", "av1"), makeEquals("an2", "av2"), makeEquals("an3", "av3"));
-
- checkArray(CLASS_AND, 3, filter);
- checkEquals("an1", "av1", getItem(filter, 0));
- checkEquals("an2", "av2", getItem(filter, 1));
- checkEquals("an3", "av3", getItem(filter, 2));
- }
-
- @Test
- public void testMakeOr() {
- @SuppressWarnings("unchecked")
- Map<String, Object> filter =
- makeOr(makeEquals("on1", "ov1"), makeEquals("on2", "ov2"), makeEquals("on3", "ov3"));
-
- checkArray(CLASS_OR, 3, filter);
- checkEquals("on1", "ov1", getItem(filter, 0));
- checkEquals("on2", "ov2", getItem(filter, 1));
- checkEquals("on3", "ov3", getItem(filter, 2));
- }
-
- /**
- * Checks that the filter contains an array.
- *
- * @param expectedClassName type of filter this should represent
- * @param expectedCount number of items expected in the array
- * @param filter filter to be examined
- */
- protected void checkArray(String expectedClassName, int expectedCount, Map<String, Object> filter) {
- assertEquals(expectedClassName, filter.get(JSON_CLASS));
-
- Object[] val = (Object[]) filter.get(JSON_FILTERS);
- assertEquals(expectedCount, val.length);
- }
-
- /**
- * Checks that a map represents an "equals".
- *
- * @param name name of the field on the left side of the equals
- * @param value value on the right side of the equals
- * @param map map whose content is to be examined
- */
- protected void checkEquals(String name, String value, Map<String, Object> map) {
- assertEquals(CLASS_EQUALS, map.get(JSON_CLASS));
- assertEquals(name, map.get(JSON_FIELD));
- assertEquals(value, map.get(JSON_VALUE));
- }
-
- /**
- * Gets a particular sub-filter from the array contained within a filter.
- *
- * @param filter containing filter
- * @param index index of the sub-filter of interest
- * @return the sub-filter with the given index
- */
- @SuppressWarnings("unchecked")
- protected Map<String, Object> getItem(Map<String, Object> filter, int index) {
- Object[] val = (Object[]) filter.get(JSON_FILTERS);
-
- return (Map<String, Object>) val[index];
- }
-
-}
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
package org.onap.policy.drools.pooling.state;
import static org.junit.Assert.assertNull;
-import static org.mockito.Matchers.any;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import org.onap.policy.drools.pooling.message.BucketAssignments;
-import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Heartbeat;
import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
-import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
import org.onap.policy.drools.pooling.message.Query;
state = new IdleState(mgr);
}
- @Test
- public void testGetFilter() {
- Map<String, Object> filter = state.getFilter();
-
- FilterUtilsTest utils = new FilterUtilsTest();
-
- utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
- utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
- utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
- }
-
- @Test
- public void testProcessForward() {
- Forward msg = new Forward();
- msg.setChannel(MY_HOST);
- assertNull(state.process(msg));
-
- verify(mgr).handle(msg);
- }
-
@Test
public void testProcessHeartbeat() {
assertNull(state.process(new Heartbeat(PREV_HOST, 0L)));
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2020 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.util.Map;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.Before;
import org.junit.Test;
import org.onap.policy.drools.pooling.message.BucketAssignments;
import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
-import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Query;
public class InactiveStateTest extends SupportBasicStateTester {
state = new InactiveState(mgr);
}
- @Test
- public void testGetFilter() {
- Map<String, Object> filter = state.getFilter();
-
- FilterUtilsTest utils = new FilterUtilsTest();
-
- utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
- utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
- utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
- }
-
@Test
public void testProcessLeader() {
State next = mock(State.class);
import static org.mockito.Mockito.when;
import java.util.Arrays;
-import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import org.onap.policy.drools.pooling.message.BucketAssignments;
hostBucket = new HostBucket(MY_HOST);
}
- @Test
- public void testGetFilter() {
- Map<String, Object> filter = state.getFilter();
-
- FilterUtilsTest utils = new FilterUtilsTest();
-
- utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
- utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
- utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
- }
-
@Test
public void testProcessQuery() {
State next = mock(State.class);
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2020 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.util.Map;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.Before;
import org.junit.Test;
import org.onap.policy.drools.pooling.message.BucketAssignments;
import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
-import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
public class QueryStateTest extends SupportBasicStateTester {
state = new QueryState(mgr);
}
- @Test
- public void testGetFilter() {
- Map<String, Object> filter = state.getFilter();
-
- FilterUtilsTest utils = new FilterUtilsTest();
-
- utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
- utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
- utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
- }
-
@Test
public void testGoQuery() {
assertNull(state.goQuery());
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.util.Map;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.junit.Before;
import org.junit.Test;
-import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Heartbeat;
import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
-import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
import org.onap.policy.drools.pooling.message.Query;
state = new StartState(mgr);
}
- @Test
- public void testGetFilter() {
- Map<String, Object> filter = state.getFilter();
-
- FilterUtilsTest utils = new FilterUtilsTest();
-
- utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
- utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
-
- // get the sub-filter
- filter = utils.getItem(filter, 1);
-
- utils.checkArray(FilterUtils.CLASS_AND, 2, filter);
- utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 0));
- utils.checkEquals(FilterUtils.MSG_TIMESTAMP, String.valueOf(state.getHbTimestampMs()),
- utils.getItem(filter, 1));
- }
-
@Test
public void testStart() {
state.start();
assertEquals(MY_HOST, state.getHost());
}
- @Test
- public void testProcessForward() {
- Forward msg = new Forward();
- msg.setChannel(MY_HOST);
- assertNull(state.process(msg));
-
- verify(mgr).handle(msg);
- }
-
@Test
public void testProcessHeartbeat() {
Heartbeat msg = new Heartbeat();
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import org.onap.policy.drools.pooling.CancellableScheduledTask;
import org.onap.policy.drools.pooling.PoolingManager;
import org.onap.policy.drools.pooling.message.BucketAssignments;
-import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Heartbeat;
import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
-import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
import org.onap.policy.drools.pooling.message.Query;
verify(sched3).cancel();
}
- @Test
- public void testGetFilter() {
- Map<String, Object> filter = state.getFilter();
-
- FilterUtilsTest utils = new FilterUtilsTest();
-
- utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
- utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
- utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
- }
-
@Test
public void testStart() {
assertThatCode(() -> state.start()).doesNotThrowAnyException();
assertEquals(next, next2);
}
- @Test
- public void testProcessForward() {
- Forward msg = new Forward();
- assertNull(state.process(msg));
-
- verify(mgr, never()).handle(msg);
-
- msg.setChannel(MY_HOST);
- assertNull(state.process(msg));
-
- verify(mgr).handle(msg);
- }
-
@Test
public void testProcessHeartbeat() {
assertNull(state.process(new Heartbeat()));
verify(mgr).publishAdmin(msg);
}
- @Test
- public void testPublishStringForward() {
- String chnl = "channelF";
- Forward msg = new Forward();
-
- state.publish(chnl, msg);
-
- verify(mgr).publish(chnl, msg);
- }
-
@Test
public void testPublishStringHeartbeat() {
String chnl = "channelH";
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicReference;
when(props.getActiveHeartbeatMs()).thenReturn(STD_ACTIVE_HEARTBEAT_MS);
when(props.getInterHeartbeatMs()).thenReturn(STD_INTER_HEARTBEAT_MS);
- prevState = new State(mgr) {
- @Override
- public Map<String, Object> getFilter() {
- throw new UnsupportedOperationException("cannot filter");
- }
- };
+ prevState = new State(mgr) {};
// capture publish() arguments
doAnswer(invocation -> {