@Mock
private BidirectionalTopicHandler handler2;
- private BidirectionalTopicActor actor;
+ private BidirectionalTopicActor<BidirectionalTopicActorParams> actor;
/**
public void testMakeOperatorParameters() {
BidirectionalTopicActorParams params = makeParams();
- final BidirectionalTopicActor prov = new BidirectionalTopicActor(ACTOR);
+ final BidirectionalTopicActor<BidirectionalTopicActorParams> prov =
+ new BidirectionalTopicActor<>(ACTOR, BidirectionalTopicActorParams.class);
Function<String, Map<String, Object>> maker =
prov.makeOperatorParameters(Util.translateToMap(prov.getName(), params));
new TreeMap<>(maker.apply("operB")).toString());
// with invalid actor parameters
- params.setOperation(null);
- assertThatThrownBy(() -> prov.makeOperatorParameters(Util.translateToMap(prov.getName(), params)))
+ params.setOperations(null);
+ Map<String, Object> map = Util.translateToMap(prov.getName(), params);
+ assertThatThrownBy(() -> prov.makeOperatorParameters(map))
.isInstanceOf(ParameterValidationRuntimeException.class);
}
@Test
public void testMakeTopicHandler() {
// use a real actor
- actor = new BidirectionalTopicActor(ACTOR);
+ actor = new BidirectionalTopicActor<>(ACTOR, BidirectionalTopicActorParams.class);
handler1 = actor.getTopicHandler(MY_SINK, MY_SOURCE1);
handler2 = actor.getTopicHandler(MY_SINK, MY_SOURCE2);
params.setTimeoutSec(TIMEOUT);
// @formatter:off
- params.setOperation(Map.of(
+ params.setOperations(Map.of(
"operA", Map.of(),
"operB", Map.of("sourceTopic", "topicB")));
// @formatter:on
return params;
}
- private class MyActor extends BidirectionalTopicActor {
+ private class MyActor extends BidirectionalTopicActor<BidirectionalTopicActorParams> {
public MyActor() {
- super(ACTOR);
+ super(ACTOR, BidirectionalTopicActorParams.class);
}
@Override