Changes to support Kafka over SSL in APEX 60/121160/1
authora.sreekumar <ajith.sreekumar@bell.ca>
Wed, 21 Apr 2021 14:01:30 +0000 (15:01 +0100)
committerAjith Sreekumar <ajith.sreekumar@bell.ca>
Fri, 7 May 2021 10:59:26 +0000 (10:59 +0000)
The Kafka plugin in APEX-PDP already has a field "kafkaProperties" that
can take in any properties required for the Kafka client. This is apt to
store the SSL related properties as they could change based on the
server implementation.

To make the communication possible, the only change required in APEX is
to allow a property to have an empty value.
For e.g., to disable the server host name verification, ssl.endpoint.identification.algorithm must to be explicitly set to an empty string on the client.

Change-Id: If3adaeaf11e5fbbd2aff582b5d4ba7824017ea8a
Issue-ID: POLICY-3194
Signed-off-by: a.sreekumar <ajith.sreekumar@bell.ca>
(cherry picked from commit bd21c78ed6953de4e8f9fd43311fd22bee7a5472)

plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java

index b37455d..eb1f15c 100644 (file)
@@ -2,6 +2,7 @@
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
  *  Modifications Copyright (C) 2019 Nordix Foundation.
+ *  Modifications Copyright (C) 2021 Bell Canada. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -281,9 +282,10 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
                         ENTRY + i + " invalid, key is null or blank");
             }
 
-            if (StringUtils.isBlank(kafkaProperties[i][1])) {
+            // the value of a property has to be specified as empty in some cases, but should never be null.
+            if (null == kafkaProperties[i][1]) {
                 result.setResult(KAFKA_PROPERTIES, ValidationStatus.INVALID,
-                        ENTRY + i + " invalid, value is null or blank");
+                        ENTRY + i + " invalid, value is null");
             }
         }
     }
index 9287f52..14f13bc 100644 (file)
@@ -2,6 +2,7 @@
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2019 Samsung. All rights reserved.
  *  Modifications Copyright (C) 2019 Nordix Foundation.
+ *  Modifications Copyright (C) 2021 Bell Canada. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -275,6 +276,18 @@ public class KafkaCarrierTechnologyParametersTest {
 
         kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties2);
         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+
+        // @formatter:off
+        String[][] kafkaPropertiesWithEmptyValue = {
+            {
+                "Property1", ""
+            }
+        };
+        // @formatter:on
+
+        kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaPropertiesWithEmptyValue);
+        assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
         kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());