02626d34efa46c738f5f7183c04fa58ec745e397
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
7  * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
8  * Modifications Copyright (C) 2023 Nordix Foundation.
9 * ================================================================================
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  *      http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.policy.common.endpoints.event.comm.bus.internal;
25
26 import java.util.UUID;
27 import lombok.Getter;
28 import lombok.Setter;
29 import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSink;
30 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
31 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * Transport Agnostic Bus Topic Sink to carry out the core functionality to interact with a sink
37  * regardless if it is UEB or DMaaP.
38  *
39  */
40 public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopicSink {
41
42     /**
43      * Loggers.
44      */
45     private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
46
47     /**
48      * The partition key to publish to.
49      */
50     @Getter
51     @Setter
52     protected String partitionKey;
53
54     /**
55      * Message bus publisher.
56      */
57     protected BusPublisher publisher;
58
59     /**
60      * Constructor for abstract sink.
61      * @param busTopicParams contains below listed attributes
62      *     servers servers
63      *     topic topic
64      *     apiKey api secret
65      *     apiSecret api secret
66      *     partitionId partition id
67      *     useHttps does connection use HTTPS?
68      *     allowSelfSignedCerts are self-signed certificates allow     *
69      * @throws IllegalArgumentException if invalid parameters are passed in
70      */
71     protected InlineBusTopicSink(BusTopicParams busTopicParams) {
72
73         super(busTopicParams);
74
75         if (busTopicParams.isPartitionIdInvalid()) {
76             this.partitionKey = UUID.randomUUID().toString();
77         } else {
78             this.partitionKey = busTopicParams.getPartitionId();
79         }
80     }
81
82     /**
83      * Initialize the Bus publisher.
84      */
85     public abstract void init();
86
87     @Override
88     public boolean start() {
89         logger.info("{}: starting", this);
90
91         synchronized (this) {
92             if (!this.alive) {
93                 if (locked) {
94                     throw new IllegalStateException(this + " is locked.");
95                 }
96
97                 this.init();
98                 this.alive = true;
99             }
100         }
101
102         return true;
103     }
104
105     @Override
106     public boolean stop() {
107
108         BusPublisher publisherCopy;
109         synchronized (this) {
110             this.alive = false;
111             publisherCopy = this.publisher;
112             this.publisher = null;
113         }
114
115         if (publisherCopy != null) {
116             try {
117                 publisherCopy.close();
118             } catch (Exception e) {
119                 logger.warn("{}: cannot stop publisher because of {}", this, e.getMessage(), e);
120             }
121         } else {
122             logger.warn("{}: there is no publisher", this);
123             return false;
124         }
125
126         return true;
127     }
128
129     @Override
130     public boolean send(String message) {
131
132         if (message == null || message.isEmpty()) {
133             throw new IllegalArgumentException("Message to send is empty");
134         }
135
136         if (!this.alive) {
137             throw new IllegalStateException(this + " is stopped");
138         }
139
140         try {
141             synchronized (this) {
142                 this.recentEvents.add(message);
143             }
144
145             NetLoggerUtil.log(EventType.OUT, this.getTopicCommInfrastructure(), this.topic, message);
146
147             publisher.send(this.partitionKey, message);
148             broadcast(message);
149         } catch (Exception e) {
150             logger.warn("{}: cannot send because of {}", this, e.getMessage(), e);
151             return false;
152         }
153
154         return true;
155     }
156
157     @Override
158     public void shutdown() {
159         this.stop();
160     }
161
162     @Override
163     public String toString() {
164         return "InlineBusTopicSink [partitionId=" + partitionKey + ", alive=" + alive + ", publisher=" + publisher
165                         + "]";
166     }
167 }