a78de7162ea6050e11e71ce7ca46eefa8b518f3c
[policy/drools-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.policy.drools.event.comm.bus.internal;
22
23 import java.util.List;
24 import java.util.UUID;
25
26 import org.apache.log4j.Logger;
27
28 import org.openecomp.policy.drools.event.comm.bus.BusTopicSink;
29 import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
30 import org.openecomp.policy.common.logging.eelf.MessageCodes;
31
32 /**
33  * Transport Agnostic Bus Topic Sink to carry out the core functionality
34  * to interact with a sink regardless if it is UEB or DMaaP.
35  *
36  */
37 public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopicSink {
38         
39         /**
40          * logger 
41          */
42         private static org.openecomp.policy.common.logging.flexlogger.Logger logger = 
43                                                                                 FlexLogger.getLogger(InlineBusTopicSink.class);
44         
45         /**
46          * Not to be converted to PolicyLogger.
47          * This will contain all in/out traffic and only that in a single file in a concise format.
48          */
49         protected static final Logger networkLogger = Logger.getLogger(NETWORK_LOGGER);
50         
51         /**
52          * The partition key to publish to
53          */
54         protected String partitionId;
55         
56         /**
57          * Am I running?
58          * reflects invocation of start()/stop() 
59          * !locked & start() => alive
60          * stop() => !alive
61          */
62         protected volatile boolean alive = false;
63         
64         /**
65          * Am I locked?
66          * reflects invocation of lock()/unlock() operations
67          * locked => !alive (but not in the other direction necessarily)
68          * locked => !offer, !run, !start, !stop (but this last one is obvious
69          *                                        since locked => !alive)
70          */
71         protected volatile boolean locked = false;
72         
73         /**
74          * message bus publisher
75          */
76         protected BusPublisher publisher;
77
78         /**
79          * constructor for abstract sink
80          * 
81          * @param servers servers
82          * @param topic topic
83          * @param apiKey api secret
84          * @param apiSecret api secret
85          * @param partitionId partition id
86          * @param useHttps does connection use HTTPS?
87          * @param allowSelfSignedCerts are self-signed certificates allow
88          * @throws IllegalArgumentException in invalid parameters are passed in
89          */
90         public InlineBusTopicSink(List<String> servers, String topic, 
91                                           String apiKey, String apiSecret, String partitionId, boolean useHttps, boolean allowSelfSignedCerts)
92                         throws IllegalArgumentException {
93                 
94                 super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts);               
95                 
96                 if (partitionId == null || partitionId.isEmpty()) {
97                         this.partitionId = UUID.randomUUID ().toString();
98                 }
99         }
100         
101         /**
102          * Initialize the Bus publisher
103          */
104         public abstract void init();
105         
106         /**
107          * {@inheritDoc}
108          */
109         @Override
110         public boolean start() throws IllegalStateException {
111                 
112                 if (logger.isInfoEnabled())
113                         logger.info("START: " + this);
114                 
115                 synchronized(this) {
116                         
117                         if (this.alive)
118                                 return true;
119                         
120                         if (locked)
121                                 throw new IllegalStateException(this + " is locked.");
122                         
123                         this.alive = true;
124                 }
125                                 
126                 this.init();
127                 return true;
128         }
129         
130         /**
131          * {@inheritDoc}
132          */
133         @Override
134         public boolean stop() {
135                 
136                 BusPublisher publisherCopy;
137                 synchronized(this) {
138                         this.alive = false;
139                         publisherCopy = this.publisher;
140                         this.publisher = null;
141                 }
142                 
143                 if (publisherCopy != null) {
144                         try {
145                                 publisherCopy.close();
146                         } catch (Exception e) {
147                                 logger.warn(MessageCodes.EXCEPTION_ERROR, e, "PUBLISHER.CLOSE", this.toString());
148                                 e.printStackTrace();
149                         }
150                 } else {
151                         logger.warn("No publisher to close: " + this);
152                         return false;
153                 }
154                 
155                 return true;
156         }
157         
158         /**
159          * {@inheritDoc}
160          */
161         @Override
162         public boolean lock() {
163                 
164                 if (logger.isInfoEnabled())
165                         logger.info("LOCK: " + this);   
166                 
167                 synchronized (this) {
168                         if (this.locked)
169                                 return true;
170                         
171                         this.locked = true;
172                 }
173                 
174                 return this.stop();
175         }
176
177         /**
178          * {@inheritDoc}
179          */
180         @Override
181         public boolean unlock() {
182                 
183                 if (logger.isInfoEnabled())
184                         logger.info("UNLOCK: " + this);
185                 
186                 synchronized(this) {
187                         if (!this.locked)
188                                 return true;
189                         
190                         this.locked = false;
191                 }
192                 
193                 try {
194                         return this.start();
195                 } catch (Exception e) {
196                         logger.warn("can't start after unlocking " + this + 
197                                              " because of " + e.getMessage());
198                         e.printStackTrace();
199                         return false;
200                 }
201         }
202
203         /**
204          * {@inheritDoc}
205          */
206         @Override
207         public boolean isLocked() {
208                 return this.locked;
209         }       
210         
211         /**
212          * {@inheritDoc}
213          */
214         @Override
215         public boolean isAlive() {
216                 return this.alive;
217         }       
218         
219         /**
220          * {@inheritDoc}
221          */
222         @Override
223         public boolean send(String message) throws IllegalArgumentException, IllegalStateException {
224                 
225                 if (message == null || message.isEmpty()) {
226                         throw new IllegalArgumentException("Message to send is empty");
227                 }
228
229                 if (!this.alive) {
230                         throw new IllegalStateException(this + " is stopped");
231                 }
232                 
233                 try {
234                         synchronized (this) {
235                                 this.recentEvents.add(message);
236                         }
237                         
238                         if (networkLogger.isInfoEnabled()) {
239                                 networkLogger.info("[OUT|" + this.getTopicCommInfrastructure() + "|" + 
240                                                this.topic + "]:" + 
241                                                message);
242                         }
243                         
244                         publisher.send(this.partitionId, message);
245                 } catch (Exception e) {
246                         logger.error("can't start after unlocking " + this + 
247                                          " because of " + e.getMessage());
248                         e.printStackTrace();
249                         return false;
250                 }
251                 
252                 return true;
253         }
254         
255
256         /**
257          * {@inheritDoc}
258          */
259         @Override
260         public void setPartitionKey(String partitionKey) {
261                 this.partitionId = partitionKey;
262         }
263
264         /**
265          * {@inheritDoc}
266          */
267         @Override
268         public String getPartitionKey() {
269                 return this.partitionId;
270         }
271
272         /**
273          * {@inheritDoc}
274          */
275         @Override
276         public void shutdown() throws IllegalStateException {
277                 this.stop();
278         }
279         
280         /**
281          * {@inheritDoc}
282          */
283         @Override
284         public abstract CommInfrastructure getTopicCommInfrastructure();
285
286 }