bd88818b2e77c3617d5fd08c92d4ecff784c52d0
[policy/drools-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.policy.drools.event.comm.bus.internal;
22
23 import java.util.List;
24 import java.util.UUID;
25
26 import org.apache.log4j.Logger;
27
28 import org.openecomp.policy.drools.event.comm.bus.BusTopicSink;
29 import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
30 import org.openecomp.policy.common.logging.eelf.MessageCodes;
31
32 /**
33  * Transport Agnostic Bus Topic Sink to carry out the core functionality
34  * to interact with a sink regardless if it is UEB or DMaaP.
35  *
36  */
37 public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopicSink {
38         
39         /**
40          * logger 
41          */
42         private static org.openecomp.policy.common.logging.flexlogger.Logger logger = 
43                                                                                 FlexLogger.getLogger(InlineBusTopicSink.class);
44         
45         /**
46          * Not to be converted to PolicyLogger.
47          * This will contain all in/out traffic and only that in a single file in a concise format.
48          */
49         protected static final Logger networkLogger = Logger.getLogger(NETWORK_LOGGER);
50         
51         /**
52          * The partition key to publish to
53          */
54         protected String partitionId;
55         
56         /**
57          * Am I running?
58          * reflects invocation of start()/stop() 
59          * !locked & start() => alive
60          * stop() => !alive
61          */
62         protected volatile boolean alive = false;
63         
64         /**
65          * Am I locked?
66          * reflects invocation of lock()/unlock() operations
67          * locked => !alive (but not in the other direction necessarily)
68          * locked => !offer, !run, !start, !stop (but this last one is obvious
69          *                                        since locked => !alive)
70          */
71         protected volatile boolean locked = false;
72         
73         /**
74          * message bus publisher
75          */
76         protected BusPublisher publisher;
77
78         /**
79          * constructor for abstract sink
80          * 
81          * @param servers servers
82          * @param topic topic
83          * @param apiKey api secret
84          * @param apiSecret api secret
85          * @param partitionId partition id
86          * @throws IllegalArgumentException in invalid parameters are passed in
87          */
88         public InlineBusTopicSink(List<String> servers, String topic, 
89                                           String apiKey, String apiSecret, String partitionId)
90                         throws IllegalArgumentException {
91                 
92                 super(servers, topic, apiKey, apiSecret);               
93                 
94                 if (partitionId == null || partitionId.isEmpty()) {
95                         this.partitionId = UUID.randomUUID ().toString();
96                 }
97         }
98         
99         /**
100          * Initialize the Bus publisher
101          */
102         public abstract void init();
103         
104         /**
105          * {@inheritDoc}
106          */
107         @Override
108         public boolean start() throws IllegalStateException {
109                 
110                 if (logger.isInfoEnabled())
111                         logger.info("START: " + this);
112                 
113                 synchronized(this) {
114                         
115                         if (this.alive)
116                                 return true;
117                         
118                         if (locked)
119                                 throw new IllegalStateException(this + " is locked.");
120                         
121                         this.alive = true;
122                 }
123                                 
124                 this.init();
125                 return true;
126         }
127         
128         /**
129          * {@inheritDoc}
130          */
131         @Override
132         public boolean stop() {
133                 
134                 BusPublisher publisherCopy;
135                 synchronized(this) {
136                         this.alive = false;
137                         publisherCopy = this.publisher;
138                         this.publisher = null;
139                 }
140                 
141                 if (publisherCopy != null) {
142                         try {
143                                 publisherCopy.close();
144                         } catch (Exception e) {
145                                 logger.warn(MessageCodes.EXCEPTION_ERROR, e, "PUBLISHER.CLOSE", this.toString());
146                                 e.printStackTrace();
147                         }
148                 } else {
149                         logger.warn("No publisher to close: " + this);
150                         return false;
151                 }
152                 
153                 return true;
154         }
155         
156         /**
157          * {@inheritDoc}
158          */
159         @Override
160         public boolean lock() {
161                 
162                 if (logger.isInfoEnabled())
163                         logger.info("LOCK: " + this);   
164                 
165                 synchronized (this) {
166                         if (this.locked)
167                                 return true;
168                         
169                         this.locked = true;
170                 }
171                 
172                 return this.stop();
173         }
174
175         /**
176          * {@inheritDoc}
177          */
178         @Override
179         public boolean unlock() {
180                 
181                 if (logger.isInfoEnabled())
182                         logger.info("UNLOCK: " + this);
183                 
184                 synchronized(this) {
185                         if (!this.locked)
186                                 return true;
187                         
188                         this.locked = false;
189                 }
190                 
191                 try {
192                         return this.start();
193                 } catch (Exception e) {
194                         logger.warn("can't start after unlocking " + this + 
195                                              " because of " + e.getMessage());
196                         e.printStackTrace();
197                         return false;
198                 }
199         }
200
201         /**
202          * {@inheritDoc}
203          */
204         @Override
205         public boolean isLocked() {
206                 return this.locked;
207         }       
208         
209         /**
210          * {@inheritDoc}
211          */
212         @Override
213         public boolean isAlive() {
214                 return this.alive;
215         }       
216         
217         /**
218          * {@inheritDoc}
219          */
220         @Override
221         public boolean send(String message) throws IllegalArgumentException, IllegalStateException {
222                 
223                 if (message == null || message.isEmpty()) {
224                         throw new IllegalArgumentException("Message to send is empty");
225                 }
226
227                 if (!this.alive) {
228                         throw new IllegalStateException(this + " is stopped");
229                 }
230                 
231                 try {
232                         synchronized (this) {
233                                 this.recentEvents.add(message);
234                         }
235                         
236                         if (networkLogger.isInfoEnabled()) {
237                                 networkLogger.info("[OUT|" + this.getTopicCommInfrastructure() + "|" + 
238                                                this.topic + "]:" + 
239                                                message);
240                         }
241                         
242                         publisher.send(this.partitionId, message);
243                 } catch (Exception e) {
244                         logger.error("can't start after unlocking " + this + 
245                                          " because of " + e.getMessage());
246                         e.printStackTrace();
247                         return false;
248                 }
249                 
250                 return true;
251         }
252         
253
254         /**
255          * {@inheritDoc}
256          */
257         @Override
258         public void setPartitionKey(String partitionKey) {
259                 this.partitionId = partitionKey;
260         }
261
262         /**
263          * {@inheritDoc}
264          */
265         @Override
266         public String getPartitionKey() {
267                 return this.partitionId;
268         }
269
270         /**
271          * {@inheritDoc}
272          */
273         @Override
274         public void shutdown() throws IllegalStateException {
275                 this.stop();
276         }
277         
278         /**
279          * {@inheritDoc}
280          */
281         @Override
282         public abstract CommInfrastructure getTopicCommInfrastructure();
283
284 }