\r
package org.onap.dcaegen2.services.pmmapper.messagerouter;\r
\r
+import java.nio.charset.StandardCharsets;\r
+import java.util.Base64;\r
import java.util.List;\r
\r
import org.onap.dcaegen2.services.pmmapper.exceptions.MRPublisherException;\r
private void publish(String ves) {\r
try {\r
String topicUrl = config.getPublisherTopicUrl();\r
- sender.send("POST", topicUrl, ves);\r
+ ves = ves.replaceAll("\n", "");\r
+ String userCredentials = topicUrl.startsWith("https") ? Base64.getEncoder()\r
+ .encodeToString((this.config.getPublisherUserName() + ":" +\r
+ this.config.getPublisherPassword())\r
+ .getBytes(StandardCharsets.UTF_8))\r
+ : "";\r
+ sender.send("POST", topicUrl, ves, userCredentials);\r
} catch (Exception e) {\r
throw new MRPublisherException(e.getMessage(), e);\r
}\r
@GSONRequired
@SerializedName("dmaap_info")
DmaapInfo dmaapInfo;
+
+ @SerializedName("aaf_username")
+ private String aafUsername;
+
+ @SerializedName("aaf_password")
+ private String aafPassword;
}
@Getter
@SerializedName("subscriber_id")
private String subscriberId;
- @SerializedName("aaf_username")
- private String aafUsername;
-
- @SerializedName("aaf_password")
- private String aafPassword;
-
@SerializedName("client_role")
private String clientRole;
private String topicUrl;
}
+ public String getPublisherUserName() {
+ return this.getStreamsPublishes().getDmaapPublisher().getAafUsername();
+ }
+
+ public String getPublisherPassword() {
+ return this.getStreamsPublishes().getDmaapPublisher().getAafPassword();
+ }
+
@Override
public void reconfigure(MapperConfig mapperConfig) {
if(!this.equals(mapperConfig)) {
import java.net.HttpURLConnection;\r
import java.net.URL;\r
import java.nio.charset.StandardCharsets;\r
+import java.util.Base64;\r
+import java.util.Optional;\r
import java.util.UUID;\r
import java.util.stream.Collectors;\r
\r
\r
import javax.net.ssl.HttpsURLConnection;\r
import javax.net.ssl.SSLContext;\r
+import org.jboss.logging.MDC;\r
\r
public class RequestSender {\r
private static final int MAX_RETRIES = 5;\r
return send(method,urlString,"");\r
}\r
\r
+ /**\r
+ * Works just like {@link RequestSender#send(method,urlString,body, basicAuth)}, except {@code basicAuth }\r
+ * is set to empty String by default.\r
+ * @see RequestSender#send(String,String,String,String)\r
+ */\r
+ public String send(String method, final String urlString, final String body) throws Exception {\r
+ return send(method,urlString,body,"");\r
+ }\r
+\r
/**\r
* Sends an http request to a given endpoint.\r
* @param method of the outbound request\r
* @param urlString representing given endpoint\r
* @param body of the request as json\r
+ * @param encodedCredentials base64-encoded username password credentials\r
* @return http response body\r
* @throws Exception\r
*/\r
- public String send(String method, final String urlString, final String body) throws Exception {\r
+ public String send(String method, final String urlString, final String body, final String encodedCredentials) throws Exception {\r
final UUID invocationID = logger.invoke(ONAPLogConstants.InvocationMode.SYNCHRONOUS);\r
- final UUID requestID = UUID.randomUUID();\r
+ String requestID = Optional.ofNullable((String)MDC.get(ONAPLogConstants.MDCs.REQUEST_ID))\r
+ .orElse( UUID.randomUUID().toString());\r
String result = "";\r
\r
for (int i = 1; i <= MAX_RETRIES; i++) {\r
HttpsURLConnection.setDefaultSSLSocketFactory(SSLContext.getDefault().getSocketFactory());\r
}\r
\r
+ if(!encodedCredentials.isEmpty()) {\r
+ connection.setRequestProperty("Authorization", "Basic " + encodedCredentials);\r
+ }\r
+\r
if(!body.isEmpty()) {\r
setMessageBody(connection, body);\r
}\r
return result;\r
}\r
\r
- private HttpURLConnection getHttpURLConnection(String method, URL url, UUID invocationID, UUID requestID) throws IOException {\r
+ private HttpURLConnection getHttpURLConnection(String method, URL url, UUID invocationID, String requestID) throws IOException {\r
HttpURLConnection connection = (HttpURLConnection) url.openConnection();\r
connection.setReadTimeout(DEFAULT_READ_TIMEOUT);\r
- connection.setRequestProperty(ONAPLogConstants.Headers.REQUEST_ID, requestID.toString());\r
+ connection.setRequestProperty(ONAPLogConstants.Headers.REQUEST_ID, requestID);\r
connection.setRequestProperty(ONAPLogConstants.Headers.INVOCATION_ID, invocationID.toString());\r
connection.setRequestProperty(ONAPLogConstants.Headers.PARTNER_NAME, MapperConfig.CLIENT_NAME);\r
connection.setRequestMethod(method);\r
\r
Flux<Event> flux = sut.publish(events);\r
\r
- verify(sender, times(3)).send(Mockito.anyString(),Mockito.anyString(), Mockito.anyString());\r
+ verify(sender, times(3)).send(Mockito.anyString(),Mockito.anyString(), Mockito.anyString(), Mockito.anyString());\r
StepVerifier.create(flux)\r
.expectNextMatches(event::equals)\r
.expectComplete()\r
Event event = mock(Event.class);\r
List<Event> events = Arrays.asList(event,event,event);\r
when(event.getVes()).thenReturn(ves);\r
- when(sender.send("POST",topicURL,ves)).thenThrow(Exception.class);\r
+ when(sender.send("POST",topicURL,ves,"base64encoded")).thenThrow(Exception.class);\r
\r
Flux<Event> flux = sut.publish(events);\r
\r
StepVerifier.create(flux)\r
+ .expectNext(events.get(0))\r
.verifyComplete();\r
}\r
}\r