9b7240729abcbe0bb0c642bf44f59b6b69cb09d1
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
7  * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
8  * Modifications Copyright (C) 2023-2024 Nordix Foundation.
9 * ================================================================================
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  *      http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.policy.common.endpoints.event.comm.bus.internal;
25
26 import java.util.UUID;
27 import lombok.Getter;
28 import lombok.Setter;
29 import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSink;
30 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
31 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * Transport Agnostic Bus Topic Sink to carry out the core functionality to interact with a sink.
37  *
38  */
39 public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopicSink {
40
41     /**
42      * Loggers.
43      */
44     private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
45
46     /**
47      * The partition key to publish to.
48      */
49     @Getter
50     @Setter
51     protected String partitionKey;
52
53     /**
54      * Message bus publisher.
55      */
56     protected BusPublisher publisher;
57
58     /**
59      * Constructor for abstract sink.
60      * @param busTopicParams contains below listed attributes
61      *     servers servers
62      *     topic topic
63      *     apiKey api secret
64      *     apiSecret api secret
65      *     partitionId partition id
66      *     useHttps does connection use HTTPS?
67      *     allowTracing is tracing allowed?
68      *     allowSelfSignedCerts are self-signed certificates allow     *
69      * @throws IllegalArgumentException if invalid parameters are passed in
70      */
71     protected InlineBusTopicSink(BusTopicParams busTopicParams) {
72
73         super(busTopicParams);
74
75         if (busTopicParams.isPartitionIdInvalid()) {
76             this.partitionKey = UUID.randomUUID().toString();
77         } else {
78             this.partitionKey = busTopicParams.getPartitionId();
79         }
80     }
81
82     /**
83      * Initialize the Bus publisher.
84      */
85     public abstract void init();
86
87     @Override
88     public boolean start() {
89         logger.info("{}: starting", this);
90
91         synchronized (this) {
92             if (!this.alive) {
93                 if (locked) {
94                     throw new IllegalStateException(this + " is locked.");
95                 }
96
97                 this.init();
98                 this.alive = true;
99             }
100         }
101
102         return true;
103     }
104
105     @Override
106     public boolean stop() {
107
108         BusPublisher publisherCopy;
109         synchronized (this) {
110             this.alive = false;
111             publisherCopy = this.publisher;
112             this.publisher = null;
113         }
114
115         if (publisherCopy != null) {
116             try {
117                 publisherCopy.close();
118             } catch (Exception e) {
119                 logger.warn("{}: cannot stop publisher because of {}", this, e.getMessage(), e);
120             }
121         } else {
122             logger.warn("{}: there is no publisher", this);
123             return false;
124         }
125
126         return true;
127     }
128
129     @Override
130     public boolean send(String message) {
131
132         if (message == null || message.isEmpty()) {
133             throw new IllegalArgumentException("Message to send is empty");
134         }
135
136         if (!this.alive) {
137             throw new IllegalStateException(this + " is stopped");
138         }
139
140         try {
141             synchronized (this) {
142                 this.recentEvents.add(message);
143             }
144
145             NetLoggerUtil.log(EventType.OUT, this.getTopicCommInfrastructure(), this.topic, message);
146
147             publisher.send(this.partitionKey, message);
148             broadcast(message);
149         } catch (Exception e) {
150             logger.warn("{}: cannot send because of {}", this, e.getMessage(), e);
151             return false;
152         }
153
154         return true;
155     }
156
157     @Override
158     public void shutdown() {
159         this.stop();
160     }
161
162     @Override
163     public String toString() {
164         return "InlineBusTopicSink [partitionId=" + partitionKey + ", alive=" + alive + ", publisher=" + publisher
165                         + "]";
166     }
167 }