b5595b2d9a420d61ca0ab4f8b7e853fbd140a75b
[policy/drools-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.policy.drools.event.comm.bus.internal;
22
23 import java.net.MalformedURLException;
24 import java.security.GeneralSecurityException;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Properties;
29 import java.util.concurrent.TimeUnit;
30
31 import org.openecomp.policy.common.logging.eelf.PolicyLogger;
32 import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSinkFactory;
33 import org.openecomp.policy.drools.properties.PolicyProperties;
34 import org.slf4j.LoggerFactory;
35
36 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
37 import com.att.nsa.cambria.client.CambriaClientBuilders;
38 import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
39 import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
40 import com.att.nsa.mr.client.response.MRPublisherResponse;
41 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
42 import com.fasterxml.jackson.annotation.JsonIgnore;
43
44 public interface BusPublisher {
45         
46         /**
47          * sends a message
48          * 
49          * @param partition id
50          * @param message the message
51          * @return true if success, false otherwise
52          * @throws IllegalArgumentException if no message provided
53          */
54         public boolean send(String partitionId, String message) throws IllegalArgumentException;
55         
56         /**
57          * closes the publisher
58          */
59         public void close();
60         
61         /**
62          * Cambria based library publisher
63          */
64         public static class CambriaPublisherWrapper implements BusPublisher {
65
66                 /**
67                  * The actual Cambria publisher
68                  */
69                 @JsonIgnore
70                 protected volatile CambriaBatchingPublisher publisher;
71                 
72                 public CambriaPublisherWrapper(List<String> servers, String topic,
73                                                                String apiKey,
74                                                                String apiSecret, boolean useHttps) throws IllegalArgumentException {
75                         PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder();
76                 
77
78                         if (useHttps){
79                                 
80                                 builder.usingHosts(servers)
81                                .onTopic(topic)
82                                .usingHttps();
83                         }
84                         else{
85                                 builder.usingHosts(servers)
86                                .onTopic(topic);
87                         }
88                 
89                         
90                         if (apiKey != null && !apiKey.isEmpty() &&
91                                 apiSecret != null && !apiSecret.isEmpty()) {
92                                 builder.authenticatedBy(apiKey, apiSecret);
93                         }
94                         
95                         try {
96                                 this.publisher = builder.build();
97                         } catch (MalformedURLException | GeneralSecurityException e) {
98                                 throw new IllegalArgumentException(e);
99                         }
100                 }
101                 
102                 /**
103                  * {@inheritDoc}
104                  */
105                 @Override
106                 public boolean send(String partitionId, String message) 
107                                 throws IllegalArgumentException {
108                         if (message == null)
109                                 throw new IllegalArgumentException("No message provided");
110                         
111                         try {
112                                 this.publisher.send(partitionId, message);
113                         } catch (Exception e) {
114                                 PolicyLogger.warn(CambriaPublisherWrapper.class.getName(), 
115                                           "SEND of " + message + " IN " +
116                                           this + " cannot be performed because of " + 
117                                                           e.getMessage());
118                                 return false;
119                         }
120                         return true;                    
121                 }
122                 
123                 /**
124                  * {@inheritDoc}
125                  */
126                 @Override
127                 public void close() {
128                         if (PolicyLogger.isInfoEnabled())
129                                 PolicyLogger.info(CambriaPublisherWrapper.class.getName(), 
130                                                   "CREATION: " + this);
131                         
132                         try {
133                                 this.publisher.close();
134                         } catch (Exception e) {
135                                 PolicyLogger.warn(CambriaPublisherWrapper.class.getName(), 
136                                                   "CLOSE on " + this + " FAILED because of " + 
137                                                                   e.getMessage());
138                         }
139                 }
140                 
141                 
142                 @Override
143                 public String toString() {
144                         StringBuilder builder = new StringBuilder();
145                         builder.append("CambriaPublisherWrapper [").
146                         append("publisher.getPendingMessageCount()=").
147                         append(publisher.getPendingMessageCount()).
148                         append("]");
149                         return builder.toString();
150                 }
151                 
152         }
153         
154         /**
155          * DmaapClient library wrapper
156          */
157         public abstract class DmaapPublisherWrapper implements BusPublisher {
158                 /**
159                  * MR based Publisher
160                  */             
161                 protected MRSimplerBatchPublisher publisher;
162                 protected Properties props;
163                 
164                 /**
165                  * MR Publisher Wrapper
166                  *
167                  * @param servers messaging bus hosts
168                  * @param topic topic
169                  * @param username AAF or DME2 Login
170                  * @param password AAF or DME2 Password
171                  */
172                 public DmaapPublisherWrapper(ProtocolTypeConstants protocol,
173                                                                          List<String> servers, String topic,
174                                                      String username,
175                                                      String password, boolean useHttps) throws IllegalArgumentException {
176
177                         
178                         if (topic == null || topic.isEmpty()) {
179                                 throw new IllegalArgumentException("No topic for DMaaP");
180                         }
181                         
182                         if (protocol == ProtocolTypeConstants.AAF_AUTH) {
183                                 if (servers == null || servers.isEmpty())
184                                         throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
185                                 
186                                 ArrayList<String> dmaapServers = new ArrayList<String>();
187                                 if(useHttps){
188                                         for (String server: servers) {
189                                                 dmaapServers.add(server + ":3905");
190                                         }
191                                 
192                                 }
193                                 else{
194                                         for (String server: servers) {
195                                                 dmaapServers.add(server + ":3904");
196                                         }                               
197                                 }
198                                 
199                                                                                 
200                                 this.publisher = 
201                                         new MRSimplerBatchPublisher.Builder().
202                                                                                                 againstUrls(dmaapServers).
203                                                                                                 onTopic(topic).
204                                                                                                 build();
205                                 
206                                 this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
207                         } else if (protocol == ProtocolTypeConstants.DME2) {
208                                 ArrayList<String> dmaapServers = new ArrayList<String>();
209                                 dmaapServers.add("0.0.0.0:3904");
210                                                 
211                                 this.publisher = 
212                                         new MRSimplerBatchPublisher.Builder().
213                                                                                                 againstUrls(dmaapServers).
214                                                                                                 onTopic(topic).
215                                                                                                 build();
216                                 
217                                 this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
218                                 
219                         }
220                         
221                         this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
222                         
223                         this.publisher.setUsername(username);
224                         this.publisher.setPassword(password);  
225                         
226                         props = new Properties();
227                                                 
228                         if(useHttps){
229                                 
230                                 props.setProperty("Protocol", "https");
231                         }
232                         else{
233                                 
234                                 props.setProperty("Protocol", "http");
235                                 
236                         }
237
238                         props.setProperty("contenttype", "application/json");
239                         props.setProperty("username", username);
240                         props.setProperty("password", password);
241                         
242                         props.setProperty("topic", topic);
243                         
244                         this.publisher.setProps(props);
245                         
246                         if (protocol == ProtocolTypeConstants.AAF_AUTH)
247                                 this.publisher.setHost(servers.get(0));
248                         
249                         if (PolicyLogger.isInfoEnabled()) {
250                                 PolicyLogger.info(DmaapPublisherWrapper.class.getName(), 
251                                                           "CREATION: " + this);
252                                 PolicyLogger.info(DmaapPublisherWrapper.class.getName(), 
253                                                 "BusPublisher.DmaapPublisherWrapper using Protocol: " + protocol.getValue());
254                         }
255                         
256                 }
257
258                 /**
259                  * {@inheritDoc}
260                  */
261                 @Override
262                 public void close() {
263                         if (PolicyLogger.isInfoEnabled())
264                                 PolicyLogger.info(DmaapPublisherWrapper.class.getName(), 
265                                                   "CREATION: " + this);
266                         
267                         try {
268                                 this.publisher.close(1, TimeUnit.SECONDS);
269                         } catch (Exception e) {
270                                 PolicyLogger.warn(DmaapPublisherWrapper.class.getName(), 
271                                                   "CLOSE: " + this + " because of " + 
272                                                                   e.getMessage());
273                         }
274                 }
275                 
276                 /**
277                  * {@inheritDoc}
278                  */
279                 @Override
280                 public boolean send(String partitionId, String message) 
281                                 throws IllegalArgumentException {
282                         if (message == null)
283                                 throw new IllegalArgumentException("No message provided");
284                         
285                         this.publisher.setPubResponse(new MRPublisherResponse());
286                         this.publisher.send(partitionId, message);
287                         MRPublisherResponse response = this.publisher.sendBatchWithResponse();
288                         if (PolicyLogger.isDebugEnabled() && response != null) {
289                                 PolicyLogger.debug(DmaapPublisherWrapper.class.getName(),
290                                                 "DMaaP publisher received " + response.getResponseCode() + ": "
291                                                 + response.getResponseMessage());
292                                 
293                         }
294
295                         return true;
296                         
297                 }
298                 
299                 @Override
300                 public String toString() {
301                         StringBuilder builder = new StringBuilder();
302                         builder.append("DmaapPublisherWrapper [").
303                         append("publisher.getAuthDate()=").append(publisher.getAuthDate()).
304                         append(", publisher.getAuthKey()=").append(publisher.getAuthKey()).
305                         append(", publisher.getHost()=").append(publisher.getHost()).
306                         append(", publisher.getProtocolFlag()=").append(publisher.getProtocolFlag()).
307                         append(", publisher.getUsername()=").append(publisher.getUsername()).
308                         append(", publisher.getPendingMessageCount()=").append(publisher.getPendingMessageCount()).
309                         append("]");
310                         return builder.toString();
311                 }
312         }
313         
314         /**
315          * DmaapClient library wrapper
316          */
317         public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
318                 /**
319                  * MR based Publisher
320                  */             
321                 protected MRSimplerBatchPublisher publisher;
322                 
323                 public DmaapAafPublisherWrapper(List<String> servers, String topic,
324                                                      String aafLogin,
325                                                      String aafPassword, boolean useHttps) {
326                         
327                         super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps);
328                 }
329         }
330         
331         public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
332                 public DmaapDmePublisherWrapper(List<String> servers, String topic,
333                                                                                 String username, String password,
334                                                                                 String environment, String aftEnvironment, String dme2Partner,
335                                                                                 String latitude, String longitude, Map<String,String> additionalProps, boolean useHttps) {
336                         
337                         super(ProtocolTypeConstants.DME2, servers, topic, username, password, useHttps);
338                         
339                         
340                         
341                         
342                         
343                         
344                         String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
345                         
346                          if (environment == null || environment.isEmpty()) {
347                                 throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
348                                                 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP");
349                         } if (aftEnvironment == null || aftEnvironment.isEmpty()) {
350                                 throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
351                                                 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP");
352                         } if (latitude == null || latitude.isEmpty()) {
353                                 throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
354                                                 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX + " property for DME2 in DMaaP");
355                         } if (longitude == null || longitude.isEmpty()) {
356                                 throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
357                                                 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX + " property for DME2 in DMaaP");
358                         }
359                         
360                         if ((dme2Partner == null || dme2Partner.isEmpty()) && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) {
361                                 throw new IllegalArgumentException("Must provide at least " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
362                                                 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " + 
363                                                 PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
364                         }
365                         
366                         String serviceName = servers.get(0);
367                         
368                         /* These are required, no defaults */
369                         props.setProperty("Environment", environment);
370                         props.setProperty("AFT_ENVIRONMENT", aftEnvironment);
371                         
372                         props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
373
374                         if (dme2Partner != null)
375                                 props.setProperty("Partner", dme2Partner);
376                         if (dme2RouteOffer != null)
377                                 props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
378                         
379                         props.setProperty("Latitude", latitude);
380                         props.setProperty("Longitude", longitude);
381                         
382                         // ServiceName also a default, found in additionalProps
383                         
384                         /* These are optional, will default to these values if not set in optionalProps */
385                         props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
386                         props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
387                         props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
388                         props.setProperty("Version", "1.0");
389                         props.setProperty("SubContextPath", "/");
390                         props.setProperty("sessionstickinessrequired", "no");
391                         
392                         /* These should not change */
393                         props.setProperty("TransportType", "DME2");
394                         props.setProperty("MethodType", "POST");
395                         
396                         for (String key : additionalProps.keySet()) {
397                                 String value = additionalProps.get(key);
398                                 
399                                 if (value != null)
400                                         props.setProperty(key, value);
401                         }
402                         
403                         this.publisher.setProps(props);
404                 }
405         }
406 }