5493468a645400a111d889f6f6bd14976ae775d1
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  * 
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  * 
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import java.util.List;
25 import java.util.UUID;
26
27 import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSink;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 /**
32  * Transport Agnostic Bus Topic Sink to carry out the core functionality to interact with a sink
33  * regardless if it is UEB or DMaaP.
34  *
35  */
36 public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopicSink {
37
38     /**
39      * loggers
40      */
41     private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
42     private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER);
43
44     /**
45      * The partition key to publish to
46      */
47     protected String partitionId;
48
49     /**
50      * message bus publisher
51      */
52     protected BusPublisher publisher;
53
54     /**
55      * constructor for abstract sink
56      * @param busTopicParams contains below listed attributes
57      * servers servers
58      * topic topic
59      * apiKey api secret
60      * apiSecret api secret
61      * partitionId partition id
62      * useHttps does connection use HTTPS?
63      * allowSelfSignedCerts are self-signed certificates allow     *
64      * @throws IllegalArgumentException in invalid parameters are passed in
65      */
66     public InlineBusTopicSink(BusTopicParams busTopicParams) {
67
68         super(busTopicParams);
69
70         if (busTopicParams.isPartitionIdNullOrEmpty()) {
71             this.partitionId = UUID.randomUUID().toString();
72         } else {
73             this.partitionId = busTopicParams.getPartitionId();
74         }
75     }
76
77     /**
78      * Initialize the Bus publisher
79      */
80     public abstract void init();
81
82     /**
83      * {@inheritDoc}
84      */
85     @Override
86     public boolean start() {
87         logger.info("{}: starting", this);
88
89         synchronized (this) {
90
91             if (this.alive) {
92                 return true;
93             }
94
95             if (locked) {
96                 throw new IllegalStateException(this + " is locked.");
97             }
98
99             this.alive = true;
100         }
101
102         this.init();
103         return true;
104     }
105
106     /**
107      * {@inheritDoc}
108      */
109     @Override
110     public boolean stop() {
111
112         BusPublisher publisherCopy;
113         synchronized (this) {
114             this.alive = false;
115             publisherCopy = this.publisher;
116             this.publisher = null;
117         }
118
119         if (publisherCopy != null) {
120             try {
121                 publisherCopy.close();
122             } catch (Exception e) {
123                 logger.warn("{}: cannot stop publisher because of {}", this, e.getMessage(), e);
124             }
125         } else {
126             logger.warn("{}: there is no publisher", this);
127             return false;
128         }
129
130         return true;
131     }
132
133     /**
134      * {@inheritDoc}
135      */
136     @Override
137     public boolean send(String message) {
138
139         if (message == null || message.isEmpty()) {
140             throw new IllegalArgumentException("Message to send is empty");
141         }
142
143         if (!this.alive) {
144             throw new IllegalStateException(this + " is stopped");
145         }
146
147         try {
148             synchronized (this) {
149                 this.recentEvents.add(message);
150             }
151
152             netLogger.info("[OUT|{}|{}]{}{}", this.getTopicCommInfrastructure(), this.topic, System.lineSeparator(),
153                     message);
154
155             publisher.send(this.partitionId, message);
156             broadcast(message);
157         } catch (Exception e) {
158             logger.warn("{}: cannot send because of {}", this, e.getMessage(), e);
159             return false;
160         }
161
162         return true;
163     }
164
165
166     /**
167      * {@inheritDoc}
168      */
169     @Override
170     public void setPartitionKey(String partitionKey) {
171         this.partitionId = partitionKey;
172     }
173
174     /**
175      * {@inheritDoc}
176      */
177     @Override
178     public String getPartitionKey() {
179         return this.partitionId;
180     }
181
182     /**
183      * {@inheritDoc}
184      */
185     @Override
186     public void shutdown() {
187         this.stop();
188     }
189
190     @Override
191     protected boolean anyNullOrEmpty(String... args) {
192         for (String arg : args) {
193             if (arg == null || arg.isEmpty()) {
194                 return true;
195             }
196         }
197
198         return false;
199     }
200
201     @Override
202     protected boolean allNullOrEmpty(String... args) {
203         for (String arg : args) {
204             if (!(arg == null || arg.isEmpty())) {
205                 return false;
206             }
207         }
208
209         return true;
210     }
211
212
213     @Override
214     public String toString() {
215         return "InlineBusTopicSink [partitionId=" + partitionId + ", alive=" + alive + ", publisher=" + publisher + "]";
216     }
217 }