2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.dmaap.dbcapi.client;
23 import java.io.BufferedReader;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.io.InputStreamReader;
27 import java.io.OutputStream;
28 import java.net.ProtocolException;
30 import java.net.HttpURLConnection;
32 import javax.net.ssl.HttpsURLConnection;
33 import javax.net.ssl.SSLException;
35 import org.apache.commons.codec.binary.Base64;
36 import org.onap.dmaap.dbcapi.logging.BaseLoggingClass;
37 import org.onap.dmaap.dbcapi.model.ApiError;
38 import org.onap.dmaap.dbcapi.model.MR_Cluster;
39 import org.onap.dmaap.dbcapi.util.DmaapConfig;
41 public class MrTopicConnection extends BaseLoggingClass {
42 private String topicURL;
44 private HttpURLConnection uc;
47 private String mmProvCred;
48 private String unit_test;
49 private boolean useAAF;
52 public MrTopicConnection(String user, String pwd ) {
53 mmProvCred = new String( user + ":" + pwd );
54 DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
55 unit_test = p.getProperty( "UnitTest", "No" );
56 useAAF= "true".equalsIgnoreCase(p.getProperty("UseAAF", "false"));
59 public boolean makeTopicConnection( MR_Cluster cluster, String topic, String overrideFqdn ) {
60 String fqdn = overrideFqdn != null ? overrideFqdn : cluster.getFqdn();
61 logger.info( "connect to cluster: " + fqdn + " for topic: " + topic );
64 topicURL = cluster.getTopicProtocol() + "://" + fqdn + ":" + cluster.getTopicPort() + "/events/" + topic ;
66 if ( cluster.getTopicProtocol().equals( "https")) {
67 return makeSecureConnection( topicURL );
69 return makeConnection( topicURL );
72 private boolean makeSecureConnection( String pURL ) {
73 logger.info( "makeConnection to " + pURL );
76 URL u = new URL( pURL );
77 uc = (HttpsURLConnection) u.openConnection();
78 uc.setInstanceFollowRedirects(false);
79 logger.info( "open connection to " + pURL );
81 } catch (Exception e) {
82 logger.error("Unexpected error during openConnection of " + pURL );
83 logger.error("Error", e);;
88 private boolean makeConnection( String pURL ) {
89 logger.info( "makeConnection to " + pURL );
92 URL u = new URL( pURL );
93 uc = (HttpURLConnection) u.openConnection();
94 uc.setInstanceFollowRedirects(false);
95 logger.info( "open connection to " + pURL );
97 } catch (Exception e) {
98 logger.error("Unexpected error during openConnection of " + pURL );
99 logger.error("error", e);
105 static String bodyToString( InputStream is ) {
106 StringBuilder sb = new StringBuilder();
107 BufferedReader br = new BufferedReader( new InputStreamReader(is));
110 while ((line = br.readLine()) != null ) {
113 } catch (IOException ex ) {
114 errorLogger.error( "IOexception:" + ex);
117 return sb.toString();
120 public ApiError doPostMessage( String postMessage ) {
121 ApiError response = new ApiError();
122 String auth = "Basic " + Base64.encodeBase64String(mmProvCred.getBytes());
127 byte[] postData = postMessage.getBytes();
128 logger.info( "post fields=" + postMessage );
130 uc.setRequestProperty("Authorization", auth);
131 logger.info( "Authenticating with " + auth );
133 uc.setRequestMethod("POST");
134 uc.setRequestProperty("Content-Type", "application/json");
135 uc.setRequestProperty( "charset", "utf-8");
136 uc.setRequestProperty( "Content-Length", Integer.toString( postData.length ));
137 uc.setUseCaches(false);
138 uc.setDoOutput(true);
139 OutputStream os = null;
144 os = uc.getOutputStream();
145 os.write( postData );
147 } catch (ProtocolException pe) {
148 // Rcvd error instead of 100-Continue
149 callSetDoOutputOnError();
151 } catch ( SSLException se ) {
152 logger.error("Error", se);
153 response.setCode(500);
154 response.setMessage( se.getMessage());
158 response.setCode( uc.getResponseCode());
159 logger.info( "http response code:" + response.getCode());
160 response.setMessage( uc.getResponseMessage() );
161 logger.info( "response message=" + response.getMessage() );
164 if ( response.getMessage() == null) {
165 // work around for glitch in Java 1.7.0.21 and likely others
166 // When Expect: 100 is set and a non-100 response is received, the response message is not set but the response code is
167 String h0 = uc.getHeaderField(0);
169 int i = h0.indexOf(' ');
170 int j = h0.indexOf(' ', i + 1);
171 if (i != -1 && j != -1) {
172 response.setMessage( h0.substring(j + 1) );
176 if ( response.is2xx() ) {
177 response.setFields( bodyToString( uc.getInputStream() ) );
178 logger.info( "responseBody=" + response.getFields() );
183 } catch (Exception e) {
184 if ( unit_test.equals( "Yes" ) ) {
185 response.setCode(200);
186 response.setMessage( "simulated response");
187 logger.info( "artificial 200 response from doPostMessage because unit_test =" + unit_test );
190 response.setCode(500);
191 response.setMessage( "Unable to read response");
192 logger.warn( response.getMessage() );
193 logger.error("Error", e);
199 } catch ( Exception e ) {
200 logger.error("Error", e);
207 public void callSetDoOutputOnError() {
209 // work around glitch in Java 1.7.0.21 and likely others
210 // without this, Java will connect multiple times to the server to run the same request
211 uc.setDoOutput(false);
212 } catch (Exception e) {
213 logger.error("Error", e);