938968983e7738ed6a43e99db450c441a74fc7bb
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
7 * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import java.util.UUID;
25
26 import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSink;
27 import org.onap.policy.common.utils.slf4j.LoggerFactoryWrapper;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 /**
32  * Transport Agnostic Bus Topic Sink to carry out the core functionality to interact with a sink
33  * regardless if it is UEB or DMaaP.
34  *
35  */
36 public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopicSink {
37
38     /**
39      * Loggers.
40      */
41     private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
42     private static final Logger netLogger = LoggerFactoryWrapper.getNetworkLogger();
43
44     /**
45      * The partition key to publish to.
46      */
47     protected String partitionId;
48
49     /**
50      * Message bus publisher.
51      */
52     protected BusPublisher publisher;
53
54     /**
55      * Constructor for abstract sink.
56      * @param busTopicParams contains below listed attributes
57      *     servers servers
58      *     topic topic
59      *     apiKey api secret
60      *     apiSecret api secret
61      *     partitionId partition id
62      *     useHttps does connection use HTTPS?
63      *     allowSelfSignedCerts are self-signed certificates allow     *
64      * @throws IllegalArgumentException in invalid parameters are passed in
65      */
66     public InlineBusTopicSink(BusTopicParams busTopicParams) {
67
68         super(busTopicParams);
69
70         if (busTopicParams.isPartitionIdInvalid()) {
71             this.partitionId = UUID.randomUUID().toString();
72         } else {
73             this.partitionId = busTopicParams.getPartitionId();
74         }
75     }
76
77     /**
78      * Initialize the Bus publisher.
79      */
80     public abstract void init();
81
82     @Override
83     public boolean start() {
84         logger.info("{}: starting", this);
85
86         synchronized (this) {
87
88             if (this.alive) {
89                 return true;
90             }
91
92             if (locked) {
93                 throw new IllegalStateException(this + " is locked.");
94             }
95
96             this.alive = true;
97         }
98
99         this.init();
100         return true;
101     }
102
103     @Override
104     public boolean stop() {
105
106         BusPublisher publisherCopy;
107         synchronized (this) {
108             this.alive = false;
109             publisherCopy = this.publisher;
110             this.publisher = null;
111         }
112
113         if (publisherCopy != null) {
114             try {
115                 publisherCopy.close();
116             } catch (Exception e) {
117                 logger.warn("{}: cannot stop publisher because of {}", this, e.getMessage(), e);
118             }
119         } else {
120             logger.warn("{}: there is no publisher", this);
121             return false;
122         }
123
124         return true;
125     }
126
127     @Override
128     public boolean send(String message) {
129
130         if (message == null || message.isEmpty()) {
131             throw new IllegalArgumentException("Message to send is empty");
132         }
133
134         if (!this.alive) {
135             throw new IllegalStateException(this + " is stopped");
136         }
137
138         try {
139             synchronized (this) {
140                 this.recentEvents.add(message);
141             }
142
143             netLogger.info("[OUT|{}|{}]{}{}", this.getTopicCommInfrastructure(), this.topic, System.lineSeparator(),
144                     message);
145
146             publisher.send(this.partitionId, message);
147             broadcast(message);
148         } catch (Exception e) {
149             logger.warn("{}: cannot send because of {}", this, e.getMessage(), e);
150             return false;
151         }
152
153         return true;
154     }
155
156     @Override
157     public void setPartitionKey(String partitionKey) {
158         this.partitionId = partitionKey;
159     }
160
161     @Override
162     public String getPartitionKey() {
163         return this.partitionId;
164     }
165
166     @Override
167     public void shutdown() {
168         this.stop();
169     }
170
171     @Override
172     protected boolean anyNullOrEmpty(String... args) {
173         for (String arg : args) {
174             if (arg == null || arg.isEmpty()) {
175                 return true;
176             }
177         }
178
179         return false;
180     }
181
182     @Override
183     protected boolean allNullOrEmpty(String... args) {
184         for (String arg : args) {
185             if (!(arg == null || arg.isEmpty())) {
186                 return false;
187             }
188         }
189
190         return true;
191     }
192
193     @Override
194     public String toString() {
195         return "InlineBusTopicSink [partitionId=" + partitionId + ", alive=" + alive + ", publisher=" + publisher + "]";
196     }
197 }