54e0bcb2d95e6888f8aef4d902cb0e6104b947da
[policy/drools-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.policy.drools.event.comm.bus.internal;
22
23 import java.net.MalformedURLException;
24 import java.security.GeneralSecurityException;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Properties;
29 import java.util.concurrent.TimeUnit;
30
31 import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSinkFactory;
32 import org.openecomp.policy.drools.properties.PolicyProperties;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
37 import com.att.nsa.cambria.client.CambriaClientBuilders;
38 import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
39 import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
40 import com.att.nsa.mr.client.response.MRPublisherResponse;
41 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
42 import com.fasterxml.jackson.annotation.JsonIgnore;
43
44 public interface BusPublisher {
45         
46         /**
47          * sends a message
48          * 
49          * @param partition id
50          * @param message the message
51          * @return true if success, false otherwise
52          * @throws IllegalArgumentException if no message provided
53          */
54         public boolean send(String partitionId, String message) throws IllegalArgumentException;
55         
56         /**
57          * closes the publisher
58          */
59         public void close();
60         
61         /**
62          * Cambria based library publisher
63          */
64         public static class CambriaPublisherWrapper implements BusPublisher {
65                 
66                 private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class);
67
68                 /**
69                  * The actual Cambria publisher
70                  */
71                 @JsonIgnore
72                 protected volatile CambriaBatchingPublisher publisher;
73                 
74                 public CambriaPublisherWrapper(List<String> servers, String topic,
75                                                                String apiKey,
76                                                                String apiSecret, boolean useHttps) throws IllegalArgumentException {
77                         PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder();
78                 
79
80                         if (useHttps){
81                                 
82                                 builder.usingHosts(servers)
83                                .onTopic(topic)
84                                .usingHttps();
85                         }
86                         else{
87                                 builder.usingHosts(servers)
88                                .onTopic(topic);
89                         }
90                 
91                         
92                         if (apiKey != null && !apiKey.isEmpty() &&
93                                 apiSecret != null && !apiSecret.isEmpty()) {
94                                 builder.authenticatedBy(apiKey, apiSecret);
95                         }
96                         
97                         try {
98                                 this.publisher = builder.build();
99                         } catch (MalformedURLException | GeneralSecurityException e) {
100                                 throw new IllegalArgumentException(e);
101                         }
102                 }
103                 
104                 /**
105                  * {@inheritDoc}
106                  */
107                 @Override
108                 public boolean send(String partitionId, String message) 
109                                 throws IllegalArgumentException {
110                         if (message == null)
111                                 throw new IllegalArgumentException("No message provided");
112                         
113                         try {
114                                 this.publisher.send(partitionId, message);
115                         } catch (Exception e) {
116                                 logger.warn("{}: SEND of {} cannot be performed because of {}",
117                                                     this, message, e.getMessage(), e);
118                                 return false;
119                         }
120                         return true;                    
121                 }
122                 
123                 /**
124                  * {@inheritDoc}
125                  */
126                 @Override
127                 public void close() {
128                         logger.info("{}: CLOSE", this);
129                         
130                         try {
131                                 this.publisher.close();
132                         } catch (Exception e) {
133                                 logger.warn("{}: CLOSE FAILED because of {}", 
134                                                     this, e.getMessage(),e);
135                         }
136                 }
137                 
138                 
139                 @Override
140                 public String toString() {
141                         StringBuilder builder = new StringBuilder();
142                         builder.append("CambriaPublisherWrapper [").
143                         append("publisher.getPendingMessageCount()=").
144                         append(publisher.getPendingMessageCount()).
145                         append("]");
146                         return builder.toString();
147                 }
148                 
149         }
150         
151         /**
152          * DmaapClient library wrapper
153          */
154         public abstract class DmaapPublisherWrapper implements BusPublisher {
155                 
156                 private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class);
157                 
158                 /**
159                  * MR based Publisher
160                  */             
161                 protected MRSimplerBatchPublisher publisher;
162                 protected Properties props;
163                 
164                 /**
165                  * MR Publisher Wrapper
166                  *
167                  * @param servers messaging bus hosts
168                  * @param topic topic
169                  * @param username AAF or DME2 Login
170                  * @param password AAF or DME2 Password
171                  */
172                 public DmaapPublisherWrapper(ProtocolTypeConstants protocol,
173                                                                          List<String> servers, String topic,
174                                                      String username,
175                                                      String password, boolean useHttps) throws IllegalArgumentException {
176
177                         
178                         if (topic == null || topic.isEmpty())
179                                 throw new IllegalArgumentException("No topic for DMaaP");
180
181                         
182                         if (protocol == ProtocolTypeConstants.AAF_AUTH) {
183                                 if (servers == null || servers.isEmpty())
184                                         throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
185                                 
186                                 ArrayList<String> dmaapServers = new ArrayList<String>();
187                                 if(useHttps){
188                                         for (String server: servers) {
189                                                 dmaapServers.add(server + ":3905");
190                                         }
191                                 
192                                 }
193                                 else{
194                                         for (String server: servers) {
195                                                 dmaapServers.add(server + ":3904");
196                                         }                               
197                                 }
198                                 
199                                                                                 
200                                 this.publisher = 
201                                         new MRSimplerBatchPublisher.Builder().
202                                                                                                 againstUrls(dmaapServers).
203                                                                                                 onTopic(topic).
204                                                                                                 build();
205                                 
206                                 this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
207                         } else if (protocol == ProtocolTypeConstants.DME2) {
208                                 ArrayList<String> dmaapServers = new ArrayList<String>();
209                                 dmaapServers.add("0.0.0.0:3904");
210                                                 
211                                 this.publisher = 
212                                         new MRSimplerBatchPublisher.Builder().
213                                                                                                 againstUrls(dmaapServers).
214                                                                                                 onTopic(topic).
215                                                                                                 build();
216                                 
217                                 this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
218                         }
219                         
220                         this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
221                         
222                         this.publisher.setUsername(username);
223                         this.publisher.setPassword(password);  
224                         
225                         props = new Properties();
226                                                 
227                         if (useHttps) {                         
228                                 props.setProperty("Protocol", "https");
229                         } else {                        
230                                 props.setProperty("Protocol", "http");
231                         }
232
233                         props.setProperty("contenttype", "application/json");
234                         props.setProperty("username", username);
235                         props.setProperty("password", password);
236                         
237                         props.setProperty("topic", topic);
238                         
239                         this.publisher.setProps(props);
240                         
241                         if (protocol == ProtocolTypeConstants.AAF_AUTH)
242                                 this.publisher.setHost(servers.get(0));
243                         
244                         logger.info("{}: CREATION: using protocol {}", this, protocol.getValue());
245                 }
246
247                 /**
248                  * {@inheritDoc}
249                  */
250                 @Override
251                 public void close() {
252                         logger.info("{}: CLOSE", this);
253                         
254                         try {
255                                 this.publisher.close(1, TimeUnit.SECONDS);
256                         } catch (Exception e) {
257                                 logger.warn("{}: CLOSE FAILED because of {}",
258                                                     this, e.getMessage(), e);
259                         }
260                 }
261                 
262                 /**
263                  * {@inheritDoc}
264                  */
265                 @Override
266                 public boolean send(String partitionId, String message) 
267                                 throws IllegalArgumentException {
268                         if (message == null)
269                                 throw new IllegalArgumentException("No message provided");
270                         
271                         this.publisher.setPubResponse(new MRPublisherResponse());
272                         this.publisher.send(partitionId, message);
273                         MRPublisherResponse response = this.publisher.sendBatchWithResponse();
274                         if (response != null) {
275                                 logger.debug("DMaaP publisher received {} : {}",
276                                                      response.getResponseCode(), 
277                                                      response.getResponseMessage());
278                         }
279
280                         return true;
281                 }
282                 
283                 @Override
284                 public String toString() {
285                         StringBuilder builder = new StringBuilder();
286                         builder.append("DmaapPublisherWrapper [").
287                         append("publisher.getAuthDate()=").append(publisher.getAuthDate()).
288                         append(", publisher.getAuthKey()=").append(publisher.getAuthKey()).
289                         append(", publisher.getHost()=").append(publisher.getHost()).
290                         append(", publisher.getProtocolFlag()=").append(publisher.getProtocolFlag()).
291                         append(", publisher.getUsername()=").append(publisher.getUsername()).
292                         append(", publisher.getPendingMessageCount()=").append(publisher.getPendingMessageCount()).
293                         append("]");
294                         return builder.toString();
295                 }
296         }
297         
298         /**
299          * DmaapClient library wrapper
300          */
301         public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
302                 /**
303                  * MR based Publisher
304                  */             
305                 protected MRSimplerBatchPublisher publisher;
306                 
307                 public DmaapAafPublisherWrapper(List<String> servers, String topic,
308                                                      String aafLogin,
309                                                      String aafPassword, boolean useHttps) {
310                         
311                         super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps);
312                 }
313         }
314         
315         public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
316                 public DmaapDmePublisherWrapper(List<String> servers, String topic,
317                                                                                 String username, String password,
318                                                                                 String environment, String aftEnvironment, String dme2Partner,
319                                                                                 String latitude, String longitude, Map<String,String> additionalProps, boolean useHttps) {
320                         
321                         super(ProtocolTypeConstants.DME2, servers, topic, username, password, useHttps);
322                         
323                         
324                         
325                         
326                         
327                         
328                         String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
329                         
330                          if (environment == null || environment.isEmpty()) {
331                                 throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
332                                                 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP");
333                         } if (aftEnvironment == null || aftEnvironment.isEmpty()) {
334                                 throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
335                                                 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP");
336                         } if (latitude == null || latitude.isEmpty()) {
337                                 throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
338                                                 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX + " property for DME2 in DMaaP");
339                         } if (longitude == null || longitude.isEmpty()) {
340                                 throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
341                                                 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX + " property for DME2 in DMaaP");
342                         }
343                         
344                         if ((dme2Partner == null || dme2Partner.isEmpty()) && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) {
345                                 throw new IllegalArgumentException("Must provide at least " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
346                                                 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " + 
347                                                 PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
348                         }
349                         
350                         String serviceName = servers.get(0);
351                         
352                         /* These are required, no defaults */
353                         props.setProperty("Environment", environment);
354                         props.setProperty("AFT_ENVIRONMENT", aftEnvironment);
355                         
356                         props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
357
358                         if (dme2Partner != null)
359                                 props.setProperty("Partner", dme2Partner);
360                         if (dme2RouteOffer != null)
361                                 props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
362                         
363                         props.setProperty("Latitude", latitude);
364                         props.setProperty("Longitude", longitude);
365                         
366                         // ServiceName also a default, found in additionalProps
367                         
368                         /* These are optional, will default to these values if not set in optionalProps */
369                         props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
370                         props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
371                         props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
372                         props.setProperty("Version", "1.0");
373                         props.setProperty("SubContextPath", "/");
374                         props.setProperty("sessionstickinessrequired", "no");
375                         
376                         /* These should not change */
377                         props.setProperty("TransportType", "DME2");
378                         props.setProperty("MethodType", "POST");
379                         
380                         for (String key : additionalProps.keySet()) {
381                                 String value = additionalProps.get(key);
382                                 
383                                 if (value != null)
384                                         props.setProperty(key, value);
385                         }
386                         
387                         this.publisher.setProps(props);
388                 }
389         }
390 }