[POLICY-52] pdp-d: PolicyEngine junits
[policy/drools-pdp.git] / policy-endpoints / src / main / java / org / onap / policy / drools / event / comm / bus / internal / InlineBusTopicSink.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.event.comm.bus.internal;
22
23 import java.util.List;
24 import java.util.UUID;
25
26 import org.onap.policy.drools.event.comm.bus.BusTopicSink;
27 import org.slf4j.LoggerFactory;
28 import org.slf4j.Logger;
29
30 /**
31  * Transport Agnostic Bus Topic Sink to carry out the core functionality
32  * to interact with a sink regardless if it is UEB or DMaaP.
33  *
34  */
35 public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopicSink {
36         
37         /**
38          * loggers
39          */
40         private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
41         private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER);
42         
43         /**
44          * The partition key to publish to
45          */
46         protected String partitionId;
47         
48         /**
49          * message bus publisher
50          */
51         protected BusPublisher publisher;
52
53         /**
54          * constructor for abstract sink
55          * 
56          * @param servers servers
57          * @param topic topic
58          * @param apiKey api secret
59          * @param apiSecret api secret
60          * @param partitionId partition id
61          * @param useHttps does connection use HTTPS?
62          * @param allowSelfSignedCerts are self-signed certificates allow
63          * @throws IllegalArgumentException in invalid parameters are passed in
64          */
65         public InlineBusTopicSink(List<String> servers, String topic, 
66                                           String apiKey, String apiSecret, String partitionId, boolean useHttps, boolean allowSelfSignedCerts)
67                         throws IllegalArgumentException {
68                 
69                 super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts);               
70                 
71                 if (partitionId == null || partitionId.isEmpty()) {
72                         this.partitionId = UUID.randomUUID ().toString();
73                 }
74         }
75         
76         /**
77          * Initialize the Bus publisher
78          */
79         public abstract void init();
80         
81         /**
82          * {@inheritDoc}
83          */
84         @Override
85         public boolean start() throws IllegalStateException {           
86                 logger.info("{}: starting", this);
87                 
88                 synchronized(this) {
89                         
90                         if (this.alive)
91                                 return true;
92                         
93                         if (locked)
94                                 throw new IllegalStateException(this + " is locked.");
95                         
96                         this.alive = true;
97                 }
98                                 
99                 this.init();
100                 return true;
101         }
102         
103         /**
104          * {@inheritDoc}
105          */
106         @Override
107         public boolean stop() {
108                 
109                 BusPublisher publisherCopy;
110                 synchronized(this) {
111                         this.alive = false;
112                         publisherCopy = this.publisher;
113                         this.publisher = null;
114                 }
115                 
116                 if (publisherCopy != null) {
117                         try {
118                                 publisherCopy.close();
119                         } catch (Exception e) {
120                                 logger.warn("{}: cannot stop publisher because of {}", 
121                                                     this, e.getMessage(), e);
122                         }
123                 } else {
124                         logger.warn("{}: there is no publisher", this);
125                         return false;
126                 }
127                 
128                 return true;
129         }       
130         
131         /**
132          * {@inheritDoc}
133          */
134         @Override
135         public boolean send(String message) throws IllegalArgumentException, IllegalStateException {
136                 
137                 if (message == null || message.isEmpty()) {
138                         throw new IllegalArgumentException("Message to send is empty");
139                 }
140
141                 if (!this.alive) {
142                         throw new IllegalStateException(this + " is stopped");
143                 }
144                 
145                 try {
146                         synchronized (this) {
147                                 this.recentEvents.add(message);
148                         }
149                         
150                         netLogger.info("[OUT|{}|{}]{}{}", this.getTopicCommInfrastructure(), 
151                                        this.topic, System.lineSeparator(), message);
152                         
153                         publisher.send(this.partitionId, message);                      
154                         broadcast(message);
155                 } catch (Exception e) {
156                         logger.warn("{}: cannot send because of {}", this, e.getMessage(), e);
157                         return false;
158                 }
159                 
160                 return true;
161         }
162         
163
164         /**
165          * {@inheritDoc}
166          */
167         @Override
168         public void setPartitionKey(String partitionKey) {
169                 this.partitionId = partitionKey;
170         }
171
172         /**
173          * {@inheritDoc}
174          */
175         @Override
176         public String getPartitionKey() {
177                 return this.partitionId;
178         }
179
180         /**
181          * {@inheritDoc}
182          */
183         @Override
184         public void shutdown() throws IllegalStateException {
185                 this.stop();
186         }
187         
188
189         @Override
190         public String toString() {
191                 StringBuilder builder = new StringBuilder();
192                 builder.append("InlineBusTopicSink [partitionId=").append(partitionId).append(", alive=").append(alive)
193                                 .append(", publisher=").append(publisher).append("]");
194                 return builder.toString();
195         }
196 }