Merge "Close old UEB/DMaaP consumer"
[policy/drools-pdp.git] / policy-endpoints / src / main / java / org / onap / policy / drools / event / comm / bus / internal / InlineBusTopicSink.java
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.event.comm.bus.internal;
22
23 import java.util.List;
24 import java.util.UUID;
25
26 import org.onap.policy.drools.event.comm.bus.BusTopicSink;
27 import org.slf4j.LoggerFactory;
28 import org.slf4j.Logger;
29
30 /**
31  * Transport Agnostic Bus Topic Sink to carry out the core functionality
32  * to interact with a sink regardless if it is UEB or DMaaP.
33  *
34  */
35 public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopicSink {
36         
37         /**
38          * loggers
39          */
40         private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
41         private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER);
42         
43         /**
44          * The partition key to publish to
45          */
46         protected String partitionId;
47         
48         /**
49          * message bus publisher
50          */
51         protected BusPublisher publisher;
52
53         /**
54          * constructor for abstract sink
55          * 
56          * @param servers servers
57          * @param topic topic
58          * @param apiKey api secret
59          * @param apiSecret api secret
60          * @param partitionId partition id
61          * @param useHttps does connection use HTTPS?
62          * @param allowSelfSignedCerts are self-signed certificates allow
63          * @throws IllegalArgumentException in invalid parameters are passed in
64          */
65         public InlineBusTopicSink(List<String> servers, String topic, 
66                                           String apiKey, String apiSecret, String partitionId, boolean useHttps, boolean allowSelfSignedCerts) {
67                 
68                 super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts);               
69                 
70                 if (partitionId == null || partitionId.isEmpty()) {
71                         this.partitionId = UUID.randomUUID ().toString();
72                 }
73         }
74         
75         /**
76          * Initialize the Bus publisher
77          */
78         public abstract void init();
79         
80         /**
81          * {@inheritDoc}
82          */
83         @Override
84         public boolean start() {                
85                 logger.info("{}: starting", this);
86                 
87                 synchronized(this) {
88                         
89                         if (this.alive)
90                                 return true;
91                         
92                         if (locked)
93                                 throw new IllegalStateException(this + " is locked.");
94                         
95                         this.alive = true;
96                 }
97                                 
98                 this.init();
99                 return true;
100         }
101         
102         /**
103          * {@inheritDoc}
104          */
105         @Override
106         public boolean stop() {
107                 
108                 BusPublisher publisherCopy;
109                 synchronized(this) {
110                         this.alive = false;
111                         publisherCopy = this.publisher;
112                         this.publisher = null;
113                 }
114                 
115                 if (publisherCopy != null) {
116                         try {
117                                 publisherCopy.close();
118                         } catch (Exception e) {
119                                 logger.warn("{}: cannot stop publisher because of {}", 
120                                                     this, e.getMessage(), e);
121                         }
122                 } else {
123                         logger.warn("{}: there is no publisher", this);
124                         return false;
125                 }
126                 
127                 return true;
128         }       
129         
130         /**
131          * {@inheritDoc}
132          */
133         @Override
134         public boolean send(String message) {
135                 
136                 if (message == null || message.isEmpty()) {
137                         throw new IllegalArgumentException("Message to send is empty");
138                 }
139
140                 if (!this.alive) {
141                         throw new IllegalStateException(this + " is stopped");
142                 }
143                 
144                 try {
145                         synchronized (this) {
146                                 this.recentEvents.add(message);
147                         }
148                         
149                         netLogger.info("[OUT|{}|{}]{}{}", this.getTopicCommInfrastructure(), 
150                                        this.topic, System.lineSeparator(), message);
151                         
152                         publisher.send(this.partitionId, message);                      
153                         broadcast(message);
154                 } catch (Exception e) {
155                         logger.warn("{}: cannot send because of {}", this, e.getMessage(), e);
156                         return false;
157                 }
158                 
159                 return true;
160         }
161         
162
163         /**
164          * {@inheritDoc}
165          */
166         @Override
167         public void setPartitionKey(String partitionKey) {
168                 this.partitionId = partitionKey;
169         }
170
171         /**
172          * {@inheritDoc}
173          */
174         @Override
175         public String getPartitionKey() {
176                 return this.partitionId;
177         }
178
179         /**
180          * {@inheritDoc}
181          */
182         @Override
183         public void shutdown() {
184                 this.stop();
185         }
186
187     protected boolean anyNullOrEmpty(String... args) {
188         for (String arg : args) {
189             if (arg == null || arg.isEmpty()) {
190                 return true;
191             }
192         }
193
194         return false;
195     }
196
197     protected boolean allNullOrEmpty(String... args) {
198         for (String arg : args) {
199             if (!(arg == null || arg.isEmpty())) {
200                 return false;
201             }
202         }
203
204         return true;
205     }
206         
207
208         @Override
209         public String toString() {
210         return "InlineBusTopicSink [partitionId=" + partitionId + ", alive=" + alive + ", publisher=" + publisher + "]";
211         }
212 }