64037749c3f41f20a46ff7f6829b2cc28793886f
[policy/drools-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.policy.drools.event.comm.bus.internal;
22
23 import java.util.List;
24 import java.util.UUID;
25
26 import org.openecomp.policy.drools.event.comm.bus.BusTopicSink;
27 import org.slf4j.LoggerFactory;
28 import org.slf4j.Logger;
29
30 /**
31  * Transport Agnostic Bus Topic Sink to carry out the core functionality
32  * to interact with a sink regardless if it is UEB or DMaaP.
33  *
34  */
35 public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopicSink {
36         
37         /**
38          * loggers
39          */
40         private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
41         private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER);
42         
43         /**
44          * The partition key to publish to
45          */
46         protected String partitionId;
47         
48         /**
49          * Am I running?
50          * reflects invocation of start()/stop() 
51          * !locked & start() => alive
52          * stop() => !alive
53          */
54         protected volatile boolean alive = false;
55         
56         /**
57          * Am I locked?
58          * reflects invocation of lock()/unlock() operations
59          * locked => !alive (but not in the other direction necessarily)
60          * locked => !offer, !run, !start, !stop (but this last one is obvious
61          *                                        since locked => !alive)
62          */
63         protected volatile boolean locked = false;
64         
65         /**
66          * message bus publisher
67          */
68         protected BusPublisher publisher;
69
70         /**
71          * constructor for abstract sink
72          * 
73          * @param servers servers
74          * @param topic topic
75          * @param apiKey api secret
76          * @param apiSecret api secret
77          * @param partitionId partition id
78          * @param useHttps does connection use HTTPS?
79          * @param allowSelfSignedCerts are self-signed certificates allow
80          * @throws IllegalArgumentException in invalid parameters are passed in
81          */
82         public InlineBusTopicSink(List<String> servers, String topic, 
83                                           String apiKey, String apiSecret, String partitionId, boolean useHttps, boolean allowSelfSignedCerts)
84                         throws IllegalArgumentException {
85                 
86                 super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts);               
87                 
88                 if (partitionId == null || partitionId.isEmpty()) {
89                         this.partitionId = UUID.randomUUID ().toString();
90                 }
91         }
92         
93         /**
94          * Initialize the Bus publisher
95          */
96         public abstract void init();
97         
98         /**
99          * {@inheritDoc}
100          */
101         @Override
102         public boolean start() throws IllegalStateException {
103                 
104                 logger.info("{}: starting", this);
105                 
106                 synchronized(this) {
107                         
108                         if (this.alive)
109                                 return true;
110                         
111                         if (locked)
112                                 throw new IllegalStateException(this + " is locked.");
113                         
114                         this.alive = true;
115                 }
116                                 
117                 this.init();
118                 return true;
119         }
120         
121         /**
122          * {@inheritDoc}
123          */
124         @Override
125         public boolean stop() {
126                 
127                 BusPublisher publisherCopy;
128                 synchronized(this) {
129                         this.alive = false;
130                         publisherCopy = this.publisher;
131                         this.publisher = null;
132                 }
133                 
134                 if (publisherCopy != null) {
135                         try {
136                                 publisherCopy.close();
137                         } catch (Exception e) {
138                                 logger.warn("{}: cannot stop publisher because of {}", 
139                                                     this, e.getMessage(), e);
140                         }
141                 } else {
142                         logger.warn("{}: there is no publisher", this);
143                         return false;
144                 }
145                 
146                 return true;
147         }
148         
149         /**
150          * {@inheritDoc}
151          */
152         @Override
153         public boolean lock() {
154                 
155                 logger.info("{}: locking", this);       
156                 
157                 synchronized (this) {
158                         if (this.locked)
159                                 return true;
160                         
161                         this.locked = true;
162                 }
163                 
164                 return this.stop();
165         }
166
167         /**
168          * {@inheritDoc}
169          */
170         @Override
171         public boolean unlock() {
172                 
173                 logger.info("{}: unlocking", this);
174                 
175                 synchronized(this) {
176                         if (!this.locked)
177                                 return true;
178                         
179                         this.locked = false;
180                 }
181                 
182                 try {
183                         return this.start();
184                 } catch (Exception e) {
185                         logger.warn("{}: cannot start after unlocking because of {}", 
186                                             this, e.getMessage(), e);
187                         return false;
188                 }
189         }
190
191         /**
192          * {@inheritDoc}
193          */
194         @Override
195         public boolean isLocked() {
196                 return this.locked;
197         }       
198         
199         /**
200          * {@inheritDoc}
201          */
202         @Override
203         public boolean isAlive() {
204                 return this.alive;
205         }       
206         
207         /**
208          * {@inheritDoc}
209          */
210         @Override
211         public boolean send(String message) throws IllegalArgumentException, IllegalStateException {
212                 
213                 if (message == null || message.isEmpty()) {
214                         throw new IllegalArgumentException("Message to send is empty");
215                 }
216
217                 if (!this.alive) {
218                         throw new IllegalStateException(this + " is stopped");
219                 }
220                 
221                 try {
222                         synchronized (this) {
223                                 this.recentEvents.add(message);
224                         }
225                         
226                         netLogger.info("[OUT|{}|{}]{}{}", this.getTopicCommInfrastructure(), 
227                                        this.topic, System.lineSeparator(), message);
228                         
229                         publisher.send(this.partitionId, message);
230                 } catch (Exception e) {
231                         logger.warn("{}: cannot send because of {}", this, e.getMessage(), e);
232                         return false;
233                 }
234                 
235                 return true;
236         }
237         
238
239         /**
240          * {@inheritDoc}
241          */
242         @Override
243         public void setPartitionKey(String partitionKey) {
244                 this.partitionId = partitionKey;
245         }
246
247         /**
248          * {@inheritDoc}
249          */
250         @Override
251         public String getPartitionKey() {
252                 return this.partitionId;
253         }
254
255         /**
256          * {@inheritDoc}
257          */
258         @Override
259         public void shutdown() throws IllegalStateException {
260                 this.stop();
261         }
262         
263         /**
264          * {@inheritDoc}
265          */
266         @Override
267         public abstract CommInfrastructure getTopicCommInfrastructure();
268
269 }