Made changes to pooling code to address new sonar issues.
Add comments to awaitActive() methods.
Change-Id: I390173de00135a0a5fe50af82ed4ba780df9df80
Issue-ID: POLICY-728
Signed-off-by: Jim Hahn <jrh3@att.com>
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
import org.onap.policy.common.utils.properties.exception.PropertyException;
import org.onap.policy.drools.controller.DroolsController;
/**
* Maps a controller name to its associated manager.
*/
- private ConcurrentHashMap<String, PoolingManagerImpl> ctlr2pool = new ConcurrentHashMap<>(107);
+ private final ConcurrentHashMap<String, PoolingManagerImpl> ctlr2pool = new ConcurrentHashMap<>(107);
+
+ /**
+ * Decremented each time a manager enters the Active state. Used by junit tests.
+ */
+ private final CountDownLatch activeLatch = new CountDownLatch(1);
/**
* Arguments passed to beforeOffer(), which are saved for when the beforeInsert() is
return host;
}
+ /**
+ * @return a latch that will be decremented when a manager enters the active state
+ */
+ protected CountDownLatch getActiveLatch() {
+ return activeLatch;
+ }
+
@Override
public int getSequenceNumber() {
return 0;
PoolingProperties props = new PoolingProperties(name, featProps);
logger.info("pooling enabled for {}", name);
- ctlr2pool.computeIfAbsent(name, xxx -> factory.makeManager(host, controller, props));
+ ctlr2pool.computeIfAbsent(name, xxx -> factory.makeManager(host, controller, props, activeLatch));
} catch (PropertyException e) {
logger.error("pooling disabled due to exception for {}", name, e);
* @param host name/uuid of this host
* @param controller
* @param props properties to use to configure the manager
+ * @param activeLatch decremented when the manager goes Active
* @return a new pooling manager
*/
- public PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props) {
- return new PoolingManagerImpl(host, controller, props);
+ public PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props,
+ CountDownLatch activeLatch) {
+ return new PoolingManagerImpl(host, controller, props, activeLatch);
}
/**
*/
private final TopicListener listener;
+ /**
+ * Decremented each time the manager enters the Active state. Used by junit tests.
+ */
+ private final CountDownLatch activeLatch;
+
/**
* Used to encode & decode request objects received from & sent to a rule engine.
*/
/**
* Queue used when no bucket assignments are available.
*/
- private EventQueue eventq;
+ private final EventQueue eventq;
/**
* {@code True} if events offered by the controller should be intercepted,
* @param host name/uuid of this host
* @param controller controller with which this is associated
* @param props feature properties specific to the controller
+ * @param activeLatch latch to be decremented each time the manager enters the Active
+ * state
*/
- public PoolingManagerImpl(String host, PolicyController controller, PoolingProperties props) {
+ public PoolingManagerImpl(String host, PolicyController controller, PoolingProperties props,
+ CountDownLatch activeLatch) {
this.host = host;
this.controller = controller;
this.props = props;
+ this.activeLatch = activeLatch;
try {
this.listener = (TopicListener) controller;
* @return DMaaP properties
*/
private Properties makeDmaapProps(PolicyController controller, Properties source) {
- SpecProperties props = new SpecProperties("", "controller." + controller.getName(), source);
+ SpecProperties specProps = new SpecProperties("", "controller." + controller.getName(), source);
// could be UEB or DMAAP, so add both
- addDmaapConsumerProps(props, PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS);
- addDmaapConsumerProps(props, PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
+ addDmaapConsumerProps(specProps, PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS);
+ addDmaapConsumerProps(specProps, PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
- return props;
+ return specProps;
}
/**
*/
private void addDmaapConsumerProps(SpecProperties props, String prefix) {
String fullpfx = props.getSpecPrefix() + prefix + "." + topic;
-
+
props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, host);
props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, "0");
}
ScheduledFuture<?> fut = scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS);
// wrap the future in a "CancellableScheduledTask"
- return new CancellableScheduledTask() {
- @Override
- public void cancel() {
- fut.cancel(false);
- }
- };
+ return () -> fut.cancel(false);
}
@Override
TimeUnit.MILLISECONDS);
// wrap the future in a "CancellableScheduledTask"
- return new CancellableScheduledTask() {
- @Override
- public void cancel() {
- fut.cancel(false);
- }
- };
+ return () -> fut.cancel(false);
}
@Override
topic);
} else {
- logger.warn("forward event hop-count={} from topic {}", event.getNumHops(), event.getTopic());
+ logger.info("forward event hop-count={} from topic {}", event.getNumHops(), event.getTopic());
event.bumpNumHops();
publish(target, event);
}
@Override
public State goActive() {
+ activeLatch.countDown();
return new ActiveState(this);
}
@Override
public final int hashCode() {
- throw new UnsupportedOperationException("HostBucket cannot be hashed");
+ throw new UnsupportedOperationException("SpecProperties cannot be hashed");
}
@Override
public final boolean equals(Object obj) {
- throw new UnsupportedOperationException("cannot compare HostBuckets");
+ throw new UnsupportedOperationException("cannot compare SpecProperties");
}
}
@Override
public void start() {
super.start();
- schedule(getProperties().getReactivateMs(), () -> goStart());
+ schedule(getProperties().getReactivateMs(), this::goStart);
}
@Override
protected State goActive(BucketAssignments asgn) {
startDistributing(asgn);
- if (asgn.hasAssignment(getHost())) {
+ if (asgn != null && asgn.hasAssignment(getHost())) {
return mgr.goActive();
} else {
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
long minms = 2000L;
// tell the publisher to stop in minms + additional time
- Thread thread = new Thread(() -> mgr.stopPublisher(minms + 3000L));
+ CountDownLatch latch = new CountDownLatch(1);
+ Thread thread = new Thread(() -> {
+ latch.countDown();
+ mgr.stopPublisher(minms + 3000L);
+ });
thread.start();
- // give the thread a chance to start
- Thread.sleep(50L);
+ // wait for the thread to start
+ latch.await();
// interrupt it - it should immediately finish its work
thread.interrupt();
}
ctx.startHosts();
-
- ctx.awaitEvents(STD_IDENTIFICATION_MS * 2, TimeUnit.MILLISECONDS);
+ ctx.awaitAllActive(STD_IDENTIFICATION_MS * 2);
for (int x = 0; x < nmessages; ++x) {
ctx.offerExternal(makeMessage(x));
public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException {
return eventCounter.await(time, units);
}
+
+ /**
+ * Waits, for a period of time, for all hosts to enter the Active state.
+ *
+ * @param timeMs maximum time to wait, in milliseconds
+ * @throws InterruptedException
+ */
+ public void awaitAllActive(long timeMs) throws InterruptedException {
+ long tend = timeMs + System.currentTimeMillis();
+
+ for (Host host : hosts) {
+ long tremain = Math.max(0, tend - System.currentTimeMillis());
+ assertTrue(host.awaitActive(tremain));
+ }
+ }
}
/**
context.addController(controller, drools);
}
+ /**
+ * Waits, for a period of time, for the host to enter the Active state.
+ *
+ * @param timeMs time to wait, in milliseconds
+ * @return {@code true} if the host entered the Active state within the given
+ * amount of time, {@code false} otherwise
+ * @throws InterruptedException
+ */
+ public boolean awaitActive(long timeMs) throws InterruptedException {
+ return feature.getActiveLatch().await(timeMs, TimeUnit.MILLISECONDS);
+ }
+
/**
* Starts threads for the host so that it begins consuming from both the external
* "DMaaP" topic and its own internal "DMaaP" topic.
when(factory.getController(drools2)).thenReturn(controller2);
when(factory.getController(droolsDisabled)).thenReturn(controllerDisabled);
- when(factory.makeManager(any(), any(), any())).thenAnswer(args -> {
+ when(factory.makeManager(any(), any(), any(), any())).thenAnswer(args -> {
PoolingProperties props = args.getArgument(2);
PoolingManagerImpl mgr = mock(PoolingManagerImpl.class);
private DroolsController drools;
private Serializer ser;
private Factory factory;
+ private CountDownLatch active;
private PoolingManagerImpl mgr;
futures = new LinkedList<>();
ser = new Serializer();
+ active = new CountDownLatch(1);
factory = mock(Factory.class);
eventQueue = mock(EventQueue.class);
PoolingManagerImpl.setFactory(factory);
- mgr = new PoolingManagerImpl(MY_HOST, controller, poolProps);
+ mgr = new PoolingManagerImpl(MY_HOST, controller, poolProps, active);
}
@Test
PolicyController ctlr = mock(PolicyController.class);
PoolingFeatureRtException ex = expectException(PoolingFeatureRtException.class,
- () -> new PoolingManagerImpl(MY_HOST, ctlr, poolProps));
+ () -> new PoolingManagerImpl(MY_HOST, ctlr, poolProps, active));
assertNotNull(ex.getCause());
assertTrue(ex.getCause() instanceof ClassCastException);
}
when(factory.makeDmaapManager(any(), any())).thenThrow(ex);
PoolingFeatureRtException ex2 = expectException(PoolingFeatureRtException.class,
- () -> new PoolingManagerImpl(MY_HOST, controller, poolProps));
+ () -> new PoolingManagerImpl(MY_HOST, controller, poolProps, active));
assertEquals(ex, ex2.getCause());
}
public void testGetHost() {
assertEquals(MY_HOST, mgr.getHost());
- mgr = new PoolingManagerImpl(HOST2, controller, poolProps);
+ mgr = new PoolingManagerImpl(HOST2, controller, poolProps, active);
assertEquals(HOST2, mgr.getHost());
}
// route the messages to this host
CountDownLatch latch = mgr.startDistributing(makeAssignments(true));
- assertNotNull(latch);
assertTrue(latch.await(2, TimeUnit.SECONDS));
// all of the events should have been processed locally
when(eventQueue.poll()).thenAnswer(args -> lst.poll());
// route the messages to the OTHER host
- assertTrue(mgr.startDistributing(makeAssignments(false)).await(2, TimeUnit.SECONDS));
+ CountDownLatch latch = mgr.startDistributing(makeAssignments(false));
+ assertTrue(latch.await(2, TimeUnit.SECONDS));
// all of the events should have been forwarded
verify(dmaap, times(4)).publish(any());
assertTrue(st instanceof ActiveState);
assertEquals(mgr.getHost(), st.getHost());
assertEquals(asgn, mgr.getAssignments());
+ assertEquals(0, active.getCount());
}
@Test
State st = mgr.goInactive();
assertTrue(st instanceof InactiveState);
assertEquals(mgr.getHost(), st.getHost());
+ assertEquals(1, active.getCount());
}
@Test
verify(mgr).startDistributing(asgn);
}
+ @Test
+ public void testGoActive_NullAssignment() {
+ State act = mock(State.class);
+ State inact = mock(State.class);
+
+ when(mgr.goActive()).thenReturn(act);
+ when(mgr.goInactive()).thenReturn(inact);
+
+ assertEquals(inact, state.goActive(null));
+
+ verify(mgr, never()).startDistributing(any());
+ }
+
@Test
public void testGoInactive() {
State next = mock(State.class);