* ONAP
* ================================================================================
* Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
/**
* Manager for the internal DMaaP topic.
*/
- private final DmaapManager dmaapMgr;
+ private final TopicMessageManager topicMessageManager;
/**
* Lock used while updating {@link #current}. In general, public methods must use
private ScheduledThreadPoolExecutor scheduler = null;
/**
- * Constructs the manager, initializing all of the data structures.
+ * Constructs the manager, initializing all the data structures.
*
* @param host name/uuid of this host
* @param controller controller with which this is associated
try {
this.serializer = new Serializer();
this.topic = props.getPoolingTopic();
- this.dmaapMgr = makeDmaapManager(props.getPoolingTopic());
+ this.topicMessageManager = makeTopicMessagesManager(props.getPoolingTopic());
this.current = new IdleState(this);
logger.info("allocating host {} to controller {} for topic {}", host, controller.getName(), topic);
public void beforeStart() {
synchronized (curLocker) {
if (scheduler == null) {
- dmaapMgr.startPublisher();
+ topicMessageManager.startPublisher();
logger.debug("make scheduler thread for topic {}", getTopic());
scheduler = makeScheduler();
public void afterStart() {
synchronized (curLocker) {
if (current instanceof IdleState) {
- dmaapMgr.startConsumer(this);
+ topicMessageManager.startConsumer(this);
changeState(new StartState(this));
}
}
if (!(current instanceof IdleState)) {
changeState(new IdleState(this));
- dmaapMgr.stopConsumer(this);
+ topicMessageManager.stopConsumer(this);
publishAdmin(new Offline(getHost()));
}
* stop the publisher, but allow time for any Offline message to be
* transmitted
*/
- dmaapMgr.stopPublisher(properties.getOfflinePubWaitMs());
+ topicMessageManager.stopPublisher(properties.getOfflinePubWaitMs());
}
}
msg.checkValidity();
String txt = serializer.encodeMsg(msg);
- dmaapMgr.publish(txt);
+ topicMessageManager.publish(txt);
} catch (JsonParseException e) {
logger.error("failed to serialize message for topic {} channel {}", topic, channel, e);
* need to be forwarded, thus in that case, they are decoded and forwarded.
*
* <p>On the other hand, if the controller is not locked, then we just return immediately
- * and let {@link #beforeInsert(Object, String, String, Object) beforeInsert()} handle
+ * and let {@link #beforeInsert(String, Object) beforeInsert()} handle
* it instead, as it already has the decoded message.
*
* @param topic2 topic
* Creates a DMaaP manager.
*
* @param topic name of the internal DMaaP topic
- * @return a new DMaaP manager
+ * @return a new topic messages manager
* @throws PoolingFeatureException if an error occurs
*/
- protected DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
- return new DmaapManager(topic);
+ protected TopicMessageManager makeTopicMessagesManager(String topic) throws PoolingFeatureException {
+ return new TopicMessageManager(topic);
}
/**
* @param event event text to be decoded
* @return the decoded event
* @throws IllegalArgumentException illegal argument
- * @throw UnsupportedOperationException unsupported operation
+ * @throws UnsupportedOperationException unsupported operation
* @throws IllegalStateException illegal state
*/
protected Object decodeEventWrapper(DroolsController drools, String topic, String event) {