public class DmaapConfig {
private String server;
- private String topic;
+ private String listenerTopic;
+
+ private String publisherTopic;
private Integer fetchTimeout;
@Data
public class ElementConfig {
- private ToscaConceptIdentifier elementId;
+ private ToscaConceptIdentifier receiverId;
private ElementType elementType;
- private Integer timerSec;
+ private Integer timerMs;
private DmaapConfig topicParameterGroup;
}
--- /dev/null
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.element.main.parameters;
+
+import java.util.List;
+import lombok.Data;
+import org.onap.policy.clamp.models.acm.messages.rest.element.DmaapConfig;
+import org.onap.policy.common.endpoints.parameters.TopicParameters;
+
+@Data
+public class ElementTopicParameters extends TopicParameters {
+
+ /**
+ * Constructor.
+ * @param parameters DmaapConfig
+ */
+ public ElementTopicParameters(DmaapConfig parameters) {
+ super();
+ this.setTopic(parameters.getListenerTopic());
+ this.setServers(List.of(parameters.getServer()));
+ this.setFetchTimeout(parameters.getFetchTimeout());
+ this.setTopicCommInfrastructure(parameters.getTopicCommInfrastructure());
+ this.setUseHttps(parameters.isUseHttps());
+ }
+
+}
@Override
public void active(ElementConfig elementConfig) {
- receiver = elementConfig.getElementId();
+ receiver = elementConfig.getReceiverId();
}
}
import lombok.RequiredArgsConstructor;
import org.onap.policy.clamp.acm.element.handler.MessageActivator;
import org.onap.policy.clamp.acm.element.handler.MessageHandler;
+import org.onap.policy.clamp.acm.element.main.parameters.ElementTopicParameters;
import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException;
import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementMessage;
import org.onap.policy.clamp.models.acm.messages.rest.element.ElementConfig;
import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
-import org.onap.policy.common.endpoints.parameters.TopicParameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
* @param elementConfig the configuration
*/
public void activateElement(@NonNull ElementConfig elementConfig) {
- var topicParameters = new TopicParameters();
- topicParameters.setTopic(elementConfig.getTopicParameterGroup().getTopic());
- topicParameters.setServers(List.of(elementConfig.getTopicParameterGroup().getServer()));
- topicParameters.setFetchTimeout(elementConfig.getTopicParameterGroup().getFetchTimeout());
- topicParameters.setTopicCommInfrastructure(elementConfig.getTopicParameterGroup().getTopicCommInfrastructure());
- topicParameters.setUseHttps(elementConfig.getTopicParameterGroup().isUseHttps());
+ var listenerTopicParameters = new ElementTopicParameters(elementConfig.getTopicParameterGroup());
+
+ var publisherTopicParameters = new ElementTopicParameters(elementConfig.getTopicParameterGroup());
+ publisherTopicParameters.setTopic(elementConfig.getTopicParameterGroup().getPublisherTopic());
var parameters = new TopicParameterGroup();
- parameters.setTopicSinks(List.of(topicParameters));
- parameters.setTopicSources(List.of(topicParameters));
+ parameters.setTopicSinks(List.of(publisherTopicParameters));
+ parameters.setTopicSources(List.of(listenerTopicParameters));
if (!parameters.isValid()) {
throw new AutomationCompositionRuntimeException(Response.Status.BAD_REQUEST,
if (timerPool != null) {
throw new PfModelRuntimeException(Response.Status.CONFLICT, "StarterService alredy actived!");
}
- receiver = elementConfig.getElementId();
+ receiver = elementConfig.getReceiverId();
timerPool = new ScheduledThreadPoolExecutor(1);
timerPool.setRemoveOnCancelPolicy(true);
- future = timerPool.scheduleAtFixedRate(this::sendMessage, elementConfig.getTimerSec(),
- elementConfig.getTimerSec(), TimeUnit.MILLISECONDS);
+ future = timerPool.scheduleAtFixedRate(this::sendMessage, elementConfig.getTimerMs(),
+ elementConfig.getTimerMs(), TimeUnit.MILLISECONDS);
}
private void sendMessage() {
if (future != null) {
future.cancel(true);
}
- future = timerPool.scheduleAtFixedRate(this::sendMessage, elementConfig.getTimerSec(),
- elementConfig.getTimerSec(), TimeUnit.MILLISECONDS);
+ future = timerPool.scheduleAtFixedRate(this::sendMessage, elementConfig.getTimerMs(),
+ elementConfig.getTimerMs(), TimeUnit.MILLISECONDS);
}
@Override
assertThat(bridgeService.getType()).isEqualTo(ElementType.BRIDGE);
var elementConfig = new ElementConfig();
- elementConfig.setElementId(new ToscaConceptIdentifier("onap.policy.clamp.ac.element2", "1.0.0"));
+ elementConfig.setReceiverId(new ToscaConceptIdentifier("onap.policy.clamp.ac.element2", "1.0.0"));
bridgeService.active(elementConfig);
bridgeService.handleMessage(new ElementStatus());
var elementConfig = new ElementConfig();
elementConfig.setTopicParameterGroup(new DmaapConfig());
elementConfig.getTopicParameterGroup().setTopicCommInfrastructure("dmaap");
- elementConfig.getTopicParameterGroup().setTopic("topic");
+ elementConfig.getTopicParameterGroup().setListenerTopic("topic");
+ elementConfig.getTopicParameterGroup().setPublisherTopic("topic");
elementConfig.getTopicParameterGroup().setServer("localhost");
elementConfig.getTopicParameterGroup().setFetchTimeout(1000);
assertThat(starterService.getType()).isEqualTo(ElementType.STARTER);
var elementConfig = new ElementConfig();
- elementConfig.setTimerSec(100);
- elementConfig.setElementId(new ToscaConceptIdentifier("onap.policy.clamp.ac.element2", "1.0.0"));
+ elementConfig.setTimerMs(100);
+ elementConfig.setReceiverId(new ToscaConceptIdentifier("onap.policy.clamp.ac.element2", "1.0.0"));
starterService.active(elementConfig);
verify(messagePublisher, timeout(200).atLeastOnce()).publishMsg(any(ElementMessage.class));
starterService.deactivate();