798bf98938835782046258a94af7934c2c5b487a
[policy/drools-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.policy.drools.event.comm.bus.internal;
22
23 import java.net.MalformedURLException;
24 import java.security.GeneralSecurityException;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.Properties;
28 import java.util.concurrent.TimeUnit;
29
30 import org.openecomp.policy.common.logging.eelf.PolicyLogger;
31 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
32 import com.att.nsa.cambria.client.CambriaClientBuilders;
33 import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
34 import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
35 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
36 import com.fasterxml.jackson.annotation.JsonIgnore;
37
38 public interface BusPublisher {
39         
40         /**
41          * sends a message
42          * 
43          * @param partition id
44          * @param message the message
45          * @return true if success, false otherwise
46          * @throws IllegalArgumentException if no message provided
47          */
48         public boolean send(String partitionId, String message) throws IllegalArgumentException;
49         
50         /**
51          * closes the publisher
52          */
53         public void close();
54         
55         /**
56          * Cambria based library publisher
57          */
58         public static class CambriaPublisherWrapper implements BusPublisher {
59
60                 /**
61                  * The actual Cambria publisher
62                  */
63                 @JsonIgnore
64                 protected volatile CambriaBatchingPublisher publisher;
65                 
66                 public CambriaPublisherWrapper(List<String> servers, String topic,
67                                                                String apiKey,
68                                                                String apiSecret) 
69                        throws IllegalArgumentException {
70                         PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder();
71                         
72                         builder.usingHosts(servers)
73                                .onTopic(topic);
74                         
75                                    // Only supported in 0.2.4 version
76                                // .logSendFailuresAfter(DEFAULT_LOG_SEND_FAILURES_AFTER);
77                         
78                         if (apiKey != null && !apiKey.isEmpty() &&
79                                 apiSecret != null && !apiSecret.isEmpty()) {
80                                 builder.authenticatedBy(apiKey, apiSecret);
81                         }
82                         
83                         try {
84                                 this.publisher = builder.build();
85                         } catch (MalformedURLException | GeneralSecurityException e) {
86                                 throw new IllegalArgumentException(e);
87                         }
88                 }
89                 
90                 /**
91                  * {@inheritDoc}
92                  */
93                 @Override
94                 public boolean send(String partitionId, String message) 
95                                 throws IllegalArgumentException {
96                         if (message == null)
97                                 throw new IllegalArgumentException("No message provided");
98                         
99                         try {
100                                 this.publisher.send(partitionId, message);
101                         } catch (Exception e) {
102                                 PolicyLogger.warn(CambriaPublisherWrapper.class.getName(), 
103                                           "SEND of " + message + " IN " +
104                                           this + " cannot be performed because of " + 
105                                                           e.getMessage());
106                                 return false;
107                         }
108                         return true;                    
109                 }
110                 
111                 /**
112                  * {@inheritDoc}
113                  */
114                 @Override
115                 public void close() {
116                         if (PolicyLogger.isInfoEnabled())
117                                 PolicyLogger.info(CambriaPublisherWrapper.class.getName(), 
118                                                   "CREATION: " + this);
119                         
120                         try {
121                                 this.publisher.close();
122                         } catch (Exception e) {
123                                 PolicyLogger.warn(CambriaPublisherWrapper.class.getName(), 
124                                                   "CLOSE on " + this + " FAILED because of " + 
125                                                                   e.getMessage());
126                         }
127                 }
128                 
129                 
130                 @Override
131                 public String toString() {
132                         StringBuilder builder = new StringBuilder();
133                         builder.append("CambriaPublisherWrapper [").
134                         append("publisher.getPendingMessageCount()=").
135                         append(publisher.getPendingMessageCount()).
136                         append("]");
137                         return builder.toString();
138                 }
139                 
140         }
141         
142         /**
143          * DmaapClient library wrapper
144          */
145         public static class DmaapPublisherWrapper implements BusPublisher {
146                 /**
147                  * MR based Publisher
148                  */             
149                 protected MRSimplerBatchPublisher publisher;
150                 
151                 public DmaapPublisherWrapper(List<String> servers, String topic,
152                                                      String aafLogin,
153                                                      String aafPassword) {
154                         
155                         ArrayList<String> dmaapServers = new ArrayList<String>();
156                         for (String server: servers) {
157                                 dmaapServers.add(server + ":3904");
158                         }
159                                         
160                         this.publisher = 
161                                 new MRSimplerBatchPublisher.Builder().
162                                                         againstUrls(dmaapServers).
163                                                         onTopic(topic).
164                                                         build();
165                         
166                         this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
167                         
168                         this.publisher.setUsername(aafLogin);
169                         this.publisher.setPassword(aafPassword);  
170                         
171                         Properties props = new Properties();
172                         props.setProperty("Protocol", "http");
173                         props.setProperty("contenttype", "application/json");
174                         
175                         this.publisher.setProps(props);
176                         
177                         this.publisher.setHost(servers.get(0));
178                         
179                         if (PolicyLogger.isInfoEnabled())
180                                 PolicyLogger.info(DmaapPublisherWrapper.class.getName(), 
181                                                           "CREATION: " + this);
182                 }
183
184                 /**
185                  * {@inheritDoc}
186                  */
187                 @Override
188                 public void close() {
189                         if (PolicyLogger.isInfoEnabled())
190                                 PolicyLogger.info(DmaapPublisherWrapper.class.getName(), 
191                                                   "CREATION: " + this);
192                         
193                         try {
194                                 this.publisher.close(1, TimeUnit.SECONDS);
195                         } catch (Exception e) {
196                                 PolicyLogger.warn(DmaapPublisherWrapper.class.getName(), 
197                                                   "CLOSE: " + this + " because of " + 
198                                                                   e.getMessage());
199                         }
200                 }
201                 
202                 /**
203                  * {@inheritDoc}
204                  */
205                 @Override
206                 public boolean send(String partitionId, String message) 
207                                 throws IllegalArgumentException {
208                         if (message == null)
209                                 throw new IllegalArgumentException("No message provided");
210                         
211                         this.publisher.send(partitionId, message);
212                         return true;
213                         
214                 }
215                 
216                 @Override
217                 public String toString() {
218                         StringBuilder builder = new StringBuilder();
219                         builder.append("DmaapPublisherWrapper [").
220                         append("publisher.getAuthDate()=").append(publisher.getAuthDate()).
221                         append(", publisher.getAuthKey()=").append(publisher.getAuthKey()).
222                         append(", publisher.getHost()=").append(publisher.getHost()).
223                         append(", publisher.getProtocolFlag()=").append(publisher.getProtocolFlag()).
224                         append(", publisher.getUsername()=").append(publisher.getUsername()).
225                         append(", publisher.getPendingMessageCount()=").append(publisher.getPendingMessageCount()).
226                         append("]");
227                         return builder.toString();
228                 }
229         }
230
231 }