- public static String getRemoteAddress(DMaaPContext ctx)
- {
- String reqAddr = ctx.getRequest().getRemoteAddr();
- String fwdHeader = getFirstHeader("X-Forwarded-For",ctx);
- return ((fwdHeader != null) ? fwdHeader : reqAddr);
- }
- public static String getFirstHeader(String h,DMaaPContext ctx)
- {
- List l = getHeader(h,ctx);
- return ((l.size() > 0) ? (String)l.iterator().next() : null);
- }
- public static List<String> getHeader(String h,DMaaPContext ctx)
- {
- LinkedList list = new LinkedList();
- Enumeration e = ctx.getRequest().getHeaders(h);
- while (e.hasMoreElements())
- {
- list.add(e.nextElement().toString());
- }
- return list;
- }
-
- public static String getKafkaproperty(){
- InputStream input = new Utils().getClass().getResourceAsStream("/kafka.properties");
- Properties props = new Properties();
- try {
- props.load(input);
- } catch (IOException e) {
- log.error("failed to read kafka.properties");
- }
- return props.getProperty("key");
-
-
- }
-
- public static boolean isCadiEnabled(){
- boolean enableCadi=false;
- if(System.getenv("enableCadi")!=null&&System.getenv("enableCadi").equals("true")){
- enableCadi=true;
- }
-
- return enableCadi;
- }
-
+ public static String getRemoteAddress(DMaaPContext ctx)
+ {
+ String reqAddr = ctx.getRequest().getRemoteAddr();
+ String fwdHeader = getFirstHeader("X-Forwarded-For",ctx);
+ return ((fwdHeader != null) ? fwdHeader : reqAddr);
+ }
+ public static String getFirstHeader(String h,DMaaPContext ctx)
+ {
+ List l = getHeader(h,ctx);
+ return ((l.size() > 0) ? (String)l.iterator().next() : null);
+ }
+ public static List<String> getHeader(String h,DMaaPContext ctx)
+ {
+ LinkedList list = new LinkedList();
+ Enumeration e = ctx.getRequest().getHeaders(h);
+ while (e.hasMoreElements())
+ {
+ list.add(e.nextElement().toString());
+ }
+ return list;
+ }
+
+ public static String getKafkaproperty(){
+ InputStream input = new Utils().getClass().getResourceAsStream("/kafka.properties");
+ Properties props = new Properties();
+ try {
+ props.load(input);
+ } catch (IOException e) {
+ log.error("failed to read kafka.properties");
+ }
+ return props.getProperty("key");
+
+
+ }
+
+ public static boolean isCadiEnabled(){
+ boolean enableCadi=false;
+ if(System.getenv("enableCadi")!=null&&System.getenv("enableCadi").equals("true")){
+ enableCadi=true;
+ }
+
+ return enableCadi;
+ }
+
+ public static Properties addSaslProps(){
+ Properties props = new Properties();
+ String saslMech = System.getenv("SASLMECH");
+ if (saslMech != null && saslMech.equals("scram-sha-512")) {
+ props.put("sasl.jaas.config", System.getenv("JAASLOGIN"));
+ props.put(SASL_MECH, saslMech.toUpperCase());
+ }
+ else {
+ props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='" + getKafkaproperty() + "';");
+ props.put(SASL_MECH, "PLAIN");
+ }
+ props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+ log.info("KafkaAdmin sasl.mechanism set to " + props.getProperty(SASL_MECH));
+ return props;
+
+ }