1e8ff7072e1a4c2224406f6240ee9cb3ae2112b5
[policy/clamp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2021,2024 Nordix Foundation.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.clamp.acm.runtime.supervision.comm;
22
23 import jakarta.ws.rs.core.Response.Status;
24 import org.onap.policy.clamp.acm.runtime.config.messaging.Publisher;
25 import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException;
26 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantAckMessage;
27 import org.onap.policy.common.endpoints.event.comm.TopicSink;
28 import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
29
30 public abstract class AbstractParticipantAckPublisher<E extends ParticipantAckMessage> implements Publisher {
31
32     private TopicSinkClient topicSinkClient;
33     private boolean active = false;
34
35     /**
36      * Method to send Participant message to participants on demand.
37      *
38      * @param participantMessage the Participant message
39      */
40     public void send(final E participantMessage) {
41         if (!active) {
42             throw new AutomationCompositionRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!");
43         }
44         topicSinkClient.send(participantMessage);
45     }
46
47
48     @Override
49     public void active(TopicSink topicSink) {
50         this.topicSinkClient = new TopicSinkClient(topicSink);
51         active = true;
52     }
53
54     @Override
55     public void stop() {
56         active = false;
57     }
58
59     /**
60      * Is default topic.
61      * @return true if default
62      */
63     @Override
64     public boolean isDefaultTopic() {
65         return true;
66     }
67 }