dfdc7b3cc65dcfff0f6b5cccee8324f98643f81f
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
7  * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
8  * Modifications Copyright (C) 2023-2024 Nordix Foundation.
9 * ================================================================================
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  *      http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.policy.common.endpoints.event.comm.bus.internal;
25
26 import java.util.UUID;
27 import lombok.Getter;
28 import lombok.Setter;
29 import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSink;
30 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
31 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * Transport Agnostic Bus Topic Sink to carry out the core functionality to interact with a sink
37  * regardless if it is UEB or Kafka.
38  *
39  */
40 public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopicSink {
41
42     /**
43      * Loggers.
44      */
45     private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
46
47     /**
48      * The partition key to publish to.
49      */
50     @Getter
51     @Setter
52     protected String partitionKey;
53
54     /**
55      * Message bus publisher.
56      */
57     protected BusPublisher publisher;
58
59     /**
60      * Constructor for abstract sink.
61      * @param busTopicParams contains below listed attributes
62      *     servers servers
63      *     topic topic
64      *     apiKey api secret
65      *     apiSecret api secret
66      *     partitionId partition id
67      *     useHttps does connection use HTTPS?
68      *     allowTracing is tracing allowed?
69      *     allowSelfSignedCerts are self-signed certificates allow     *
70      * @throws IllegalArgumentException if invalid parameters are passed in
71      */
72     protected InlineBusTopicSink(BusTopicParams busTopicParams) {
73
74         super(busTopicParams);
75
76         if (busTopicParams.isPartitionIdInvalid()) {
77             this.partitionKey = UUID.randomUUID().toString();
78         } else {
79             this.partitionKey = busTopicParams.getPartitionId();
80         }
81     }
82
83     /**
84      * Initialize the Bus publisher.
85      */
86     public abstract void init();
87
88     @Override
89     public boolean start() {
90         logger.info("{}: starting", this);
91
92         synchronized (this) {
93             if (!this.alive) {
94                 if (locked) {
95                     throw new IllegalStateException(this + " is locked.");
96                 }
97
98                 this.init();
99                 this.alive = true;
100             }
101         }
102
103         return true;
104     }
105
106     @Override
107     public boolean stop() {
108
109         BusPublisher publisherCopy;
110         synchronized (this) {
111             this.alive = false;
112             publisherCopy = this.publisher;
113             this.publisher = null;
114         }
115
116         if (publisherCopy != null) {
117             try {
118                 publisherCopy.close();
119             } catch (Exception e) {
120                 logger.warn("{}: cannot stop publisher because of {}", this, e.getMessage(), e);
121             }
122         } else {
123             logger.warn("{}: there is no publisher", this);
124             return false;
125         }
126
127         return true;
128     }
129
130     @Override
131     public boolean send(String message) {
132
133         if (message == null || message.isEmpty()) {
134             throw new IllegalArgumentException("Message to send is empty");
135         }
136
137         if (!this.alive) {
138             throw new IllegalStateException(this + " is stopped");
139         }
140
141         try {
142             synchronized (this) {
143                 this.recentEvents.add(message);
144             }
145
146             NetLoggerUtil.log(EventType.OUT, this.getTopicCommInfrastructure(), this.topic, message);
147
148             publisher.send(this.partitionKey, message);
149             broadcast(message);
150         } catch (Exception e) {
151             logger.warn("{}: cannot send because of {}", this, e.getMessage(), e);
152             return false;
153         }
154
155         return true;
156     }
157
158     @Override
159     public void shutdown() {
160         this.stop();
161     }
162
163     @Override
164     public String toString() {
165         return "InlineBusTopicSink [partitionId=" + partitionKey + ", alive=" + alive + ", publisher=" + publisher
166                         + "]";
167     }
168 }