<artifactId>policy-models-errors</artifactId>
<version>${policy.models.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.onap.policy.common</groupId>
+ <artifactId>message-bus</artifactId>
+ <version>${policy.common.version}</version>
+ </dependency>
<dependency>
<groupId>jakarta.ws.rs</groupId>
<artifactId>jakarta.ws.rs-api</artifactId>
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2025 Nordix Foundation.
+ * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.policy.clamp.acm.runtime.main.utils;
+package org.onap.policy.clamp.common.acm.utils;
import lombok.Getter;
import org.onap.policy.common.message.bus.event.Topic;
--- /dev/null
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.common.acm.utils;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.junit.jupiter.api.Test;
+import org.onap.policy.common.message.bus.event.Topic;
+
+class NetLoggerUtilTest {
+
+ @Test
+ void testLog() {
+ NetLoggerUtil.log(NetLoggerUtil.EventType.IN, Topic.CommInfrastructure.KAFKA, "someTopic", "message1");
+
+ NetLoggerUtil.log(NetLoggerUtil.EventType.OUT, Topic.CommInfrastructure.REST, null, "message2");
+
+ assertEquals("acm-network", NetLoggerUtil.getNetworkLogger().getName());
+ }
+}
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2023 Nordix Foundation.
+ * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import java.util.function.UnaryOperator;
import lombok.Data;
import lombok.NoArgsConstructor;
-import lombok.ToString;
import org.onap.policy.models.base.PfUtils;
/**
*/
@NoArgsConstructor
@Data
-@ToString
public class AutomationCompositionElementInfo {
private UUID automationCompositionElementId;
this.useState = otherElement.useState;
this.outProperties = PfUtils.mapMap(otherElement.outProperties, UnaryOperator.identity());
}
+
+ @Override
+ public String toString() {
+ return "AutomationCompositionElementInfo{"
+ + "automationCompositionElementId=" + automationCompositionElementId
+ + ", deployState=" + deployState
+ + ", lockState=" + lockState
+ + ", operationalState='" + operationalState + '\''
+ + ", useState='" + useState + '\''
+ + '}';
+ }
}
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2021,2024 Nordix Foundation.
+ * Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved.
* Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
import java.util.function.Consumer;
import org.onap.policy.clamp.acm.participant.intermediary.handler.Listener;
import org.onap.policy.clamp.acm.participant.intermediary.handler.ParticipantHandler;
+import org.onap.policy.clamp.common.acm.utils.NetLoggerUtil;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessage;
import org.onap.policy.common.endpoints.listeners.ScoListener;
import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure;
@Override
public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco, T message) {
if (participantHandler.appliesTo(message)) {
+ NetLoggerUtil.log(NetLoggerUtil.EventType.IN, infra, topic,
+ String.format("{\"type\":\"IN\", \"topic\":\"%s\", \"message\":%s}", topic, message));
consumer.accept(message);
}
}
import lombok.Getter;
import org.onap.policy.clamp.acm.participant.intermediary.handler.Publisher;
import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException;
+import org.onap.policy.clamp.common.acm.utils.NetLoggerUtil;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeployAck;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantDeregister;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrimeAck;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatus;
import org.onap.policy.common.message.bus.event.TopicSink;
import org.onap.policy.common.message.bus.event.client.TopicSinkClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
*/
@Component
public class ParticipantMessagePublisher implements Publisher {
- private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantMessagePublisher.class);
private static final String NOT_ACTIVE_TEXT = "Not Active!";
@Getter
public void sendParticipantReqSync(final ParticipantReqSync participantReqSync) {
validate();
topicSinkClient.send(participantReqSync);
- LOGGER.info("Sent Participant Request Sync to CLAMP - {}", participantReqSync);
+ NetLoggerUtil.log(NetLoggerUtil.EventType.OUT, topicSinkClient.getSink().getTopicCommInfrastructure(),
+ topicSinkClient.getTopic(), "Sent Participant Request Sync to CLAMP - "
+ + participantReqSync.toString());
}
/**
public void sendParticipantStatus(final ParticipantStatus participantStatus) {
validate();
topicSinkClient.send(participantStatus);
- LOGGER.info("Sent Participant Status message to CLAMP - {}", participantStatus);
+ NetLoggerUtil.log(NetLoggerUtil.EventType.OUT, topicSinkClient.getSink().getTopicCommInfrastructure(),
+ topicSinkClient.getTopic(), "Sent Participant Status message to CLAMP - "
+ + participantStatus.toString());
}
/**
public void sendParticipantRegister(final ParticipantRegister participantRegister) {
validate();
topicSinkClient.send(participantRegister);
- LOGGER.info("Sent Participant Register message to CLAMP - {}", participantRegister);
+ NetLoggerUtil.log(NetLoggerUtil.EventType.OUT, topicSinkClient.getSink().getTopicCommInfrastructure(),
+ topicSinkClient.getTopic(), "Sent Participant Register message to CLAMP - "
+ + participantRegister.toString());
}
/**
public void sendParticipantDeregister(final ParticipantDeregister participantDeregister) {
validate();
topicSinkClient.send(participantDeregister);
- LOGGER.debug("Sent Participant Deregister message to CLAMP - {}", participantDeregister);
+ NetLoggerUtil.log(NetLoggerUtil.EventType.OUT, topicSinkClient.getSink().getTopicCommInfrastructure(),
+ topicSinkClient.getTopic(), "Sent Participant Deregister message to CLAMP - "
+ + participantDeregister.toString());
}
/**
public void sendParticipantPrimeAck(final ParticipantPrimeAck participantPrimeAck) {
validate();
topicSinkClient.send(participantPrimeAck);
- LOGGER.debug("Sent Participant Prime Ack message to CLAMP - {}", participantPrimeAck);
+ NetLoggerUtil.log(NetLoggerUtil.EventType.OUT, topicSinkClient.getSink().getTopicCommInfrastructure(),
+ topicSinkClient.getTopic(), "Sent Participant Prime Ack message to CLAMP - "
+ + participantPrimeAck.toString());
}
/**
description = "AUTOMATION_COMPOSITION_UPDATE_ACK/AUTOMATION_COMPOSITION_STATECHANGE_ACK messages published")
public void sendAutomationCompositionAck(final AutomationCompositionDeployAck automationCompositionAck) {
validate();
- topicSinkClient.send(automationCompositionAck);
- LOGGER.debug("Sent AutomationComposition Update/StateChange Ack to runtime - {}", automationCompositionAck);
+ NetLoggerUtil.log(NetLoggerUtil.EventType.OUT, topicSinkClient.getSink().getTopicCommInfrastructure(),
+ topicSinkClient.getTopic(), "Sent AutomationComposition Update/StateChange Ack to runtime - "
+ + automationCompositionAck.toString());
}
private void validate() {
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2021-2024 Nordix Foundation.
+ * Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import java.util.Collections;
import java.util.List;
+import java.util.function.Consumer;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.onap.policy.clamp.acm.participant.intermediary.handler.ParticipantHandler;
import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeployAck;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantDeregister;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessage;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrimeAck;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRegister;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantReqSync;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatus;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatusReq;
+import org.onap.policy.common.message.bus.event.Topic;
import org.onap.policy.common.message.bus.event.TopicSink;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
class ParticipantCommTest {
assertFalse(messageSender.makeTimerPool().isTerminated());
messageSender.close();
}
+
+ @Test
+ void testOnTopicEvent() {
+ ParticipantHandler handler = Mockito.mock(ParticipantHandler.class);
+ Consumer<ParticipantMessage> consumer = Mockito.mock(Consumer.class);
+ ParticipantMessage message = Mockito.mock(ParticipantMessage.class);
+
+ Mockito.when(handler.appliesTo(message)).thenReturn(true);
+
+ ParticipantListener<ParticipantMessage> listener =
+ new ParticipantListener<>(ParticipantMessage.class, handler, consumer) {
+ @Override
+ public String getType() {
+ return "";
+ }
+ };
+ assertNotNull(listener);
+ listener.onTopicEvent(Mockito.mock(Topic.CommInfrastructure.class),
+ "topic", Mockito.mock(StandardCoderObject.class), message);
+ Mockito.verify(handler).appliesTo(message);
+ }
}
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2021,2025 Nordix Foundation.
+ * Copyright (C) 2021,2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import jakarta.ws.rs.core.Response.Status;
import org.onap.policy.clamp.acm.runtime.config.messaging.Publisher;
-import org.onap.policy.clamp.acm.runtime.main.utils.NetLoggerUtil;
import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException;
+import org.onap.policy.clamp.common.acm.utils.NetLoggerUtil;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessage;
import org.onap.policy.common.message.bus.event.TopicSink;
import org.onap.policy.common.message.bus.event.client.TopicSinkClient;
package org.onap.policy.clamp.acm.runtime.supervision.comm;
import org.onap.policy.clamp.acm.runtime.config.messaging.Listener;
-import org.onap.policy.clamp.acm.runtime.main.utils.NetLoggerUtil;
import org.onap.policy.clamp.acm.runtime.supervision.SupervisionAcHandler;
+import org.onap.policy.clamp.common.acm.utils.NetLoggerUtil;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeployAck;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType;
import org.onap.policy.common.endpoints.listeners.ScoListener;
package org.onap.policy.clamp.acm.runtime.supervision.comm;
import org.onap.policy.clamp.acm.runtime.config.messaging.Listener;
-import org.onap.policy.clamp.acm.runtime.main.utils.NetLoggerUtil;
import org.onap.policy.clamp.acm.runtime.supervision.SupervisionAcHandler;
+import org.onap.policy.clamp.common.acm.utils.NetLoggerUtil;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeployAck;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType;
import org.onap.policy.common.endpoints.listeners.ScoListener;
package org.onap.policy.clamp.acm.runtime.supervision.comm;
import org.onap.policy.clamp.acm.runtime.config.messaging.Listener;
-import org.onap.policy.clamp.acm.runtime.main.utils.NetLoggerUtil;
import org.onap.policy.clamp.acm.runtime.supervision.SupervisionParticipantHandler;
+import org.onap.policy.clamp.common.acm.utils.NetLoggerUtil;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantDeregister;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType;
import org.onap.policy.common.endpoints.listeners.ScoListener;
package org.onap.policy.clamp.acm.runtime.supervision.comm;
import org.onap.policy.clamp.acm.runtime.config.messaging.Listener;
-import org.onap.policy.clamp.acm.runtime.main.utils.NetLoggerUtil;
import org.onap.policy.clamp.acm.runtime.supervision.SupervisionHandler;
+import org.onap.policy.clamp.common.acm.utils.NetLoggerUtil;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrimeAck;
import org.onap.policy.common.endpoints.listeners.ScoListener;
package org.onap.policy.clamp.acm.runtime.supervision.comm;
import org.onap.policy.clamp.acm.runtime.config.messaging.Listener;
-import org.onap.policy.clamp.acm.runtime.main.utils.NetLoggerUtil;
import org.onap.policy.clamp.acm.runtime.supervision.SupervisionParticipantHandler;
+import org.onap.policy.clamp.common.acm.utils.NetLoggerUtil;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRegister;
import org.onap.policy.common.endpoints.listeners.ScoListener;
package org.onap.policy.clamp.acm.runtime.supervision.comm;
import org.onap.policy.clamp.acm.runtime.config.messaging.Listener;
-import org.onap.policy.clamp.acm.runtime.main.utils.NetLoggerUtil;
import org.onap.policy.clamp.acm.runtime.supervision.SupervisionParticipantHandler;
+import org.onap.policy.clamp.common.acm.utils.NetLoggerUtil;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantReqSync;
import org.onap.policy.common.endpoints.listeners.ScoListener;
package org.onap.policy.clamp.acm.runtime.supervision.comm;
import org.onap.policy.clamp.acm.runtime.config.messaging.Listener;
-import org.onap.policy.clamp.acm.runtime.main.utils.NetLoggerUtil;
import org.onap.policy.clamp.acm.runtime.supervision.SupervisionParticipantHandler;
+import org.onap.policy.clamp.common.acm.utils.NetLoggerUtil;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatus;
import org.onap.policy.common.endpoints.listeners.ScoListener;