Support for kafka within xacml tutorial
[policy/xacml-pdp.git] / tutorials / tutorial-enforcement / src / main / java / org / onap / policy / tutorial / policyenforcement / App.java
index 78d0684..0697e82 100644 (file)
@@ -1,7 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2023 Nordix Foundation.
+ * Modifications Copyright (C) 2023-2024 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -21,9 +21,9 @@ package org.onap.policy.tutorial.policyenforcement;
 
 import jakarta.ws.rs.client.Entity;
 import jakarta.ws.rs.core.MediaType;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Scanner;
@@ -46,11 +46,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class App extends Thread implements TopicListener {
-    private static Logger logger                           = LoggerFactory.getLogger(App.class);
-    private static final String MY_POLICYTYPEID = "onap.policies.monitoring.MyAnalytic";
-    private String xacmlPdpHost;
-    private String xacmlPdpPort;
-    private DecisionRequest decisionRequest = new DecisionRequest();
+    private static final Logger logger = LoggerFactory.getLogger(App.class);
+    private static final String MY_POLICY_TYPE_ID = "onap.policies.monitoring.MyAnalytic";
+    private final String xacmlPdpHost;
+    private final String xacmlPdpPort;
+    private final DecisionRequest decisionRequest = new DecisionRequest();
     private Integer requestId = 1;
     private HttpClient client = null;
 
@@ -64,23 +64,23 @@ public class App extends Thread implements TopicListener {
         xacmlPdpPort = args[1];
 
         var params = new TopicParameters();
-        params.setTopicCommInfrastructure("dmaap");
+        params.setTopicCommInfrastructure("kafka");
         params.setFetchLimit(1);
         params.setFetchTimeout(5000);
-        params.setTopic("POLICY-NOTIFICATION");
-        params.setServers(Arrays.asList(args[2] + ":" + args[3]));
+        params.setTopic("policy-notification");
+        params.setServers(List.of(args[2] + ":" + args[3]));
         var topicParams = new TopicParameterGroup();
-        topicParams.setTopicSources(Arrays.asList(params));
+        topicParams.setTopicSources(List.of(params));
 
         TopicEndpointManager.getManager().addTopics(topicParams);
-        TopicEndpointManager.getManager().getDmaapTopicSource("POLICY-NOTIFICATION").register(this);
+        TopicEndpointManager.getManager().getKafkaTopicSource("policy-notification").register(this);
 
         decisionRequest.setOnapComponent("myComponent");
         decisionRequest.setOnapName("myName");
         decisionRequest.setOnapInstance("myInstanceId");
         decisionRequest.setAction("configure");
         Map<String, Object> resources = new HashMap<>();
-        resources.put("policy-type", MY_POLICYTYPEID);
+        resources.put("policy-type", MY_POLICY_TYPE_ID);
         decisionRequest.setResource(resources);
     }
 
@@ -88,19 +88,19 @@ public class App extends Thread implements TopicListener {
      * Thread run method that creates a connection and gets an initial Decision on which policy(s)
      * we should be enforcing.
      * Then sits waiting for the user to enter q or Q from the keyboard to quit. While waiting,
-     * listen on Dmaap topic for notification that the policy has changed.
+     * listen on a topic for notification that the policy has changed.
      */
     @Override
     public void run() {
         logger.info("running - type q to stdin to quit");
         try {
             client = HttpClientFactoryInstance.getClientFactory().build(BusTopicParams.builder()
-                    .clientName("myClientName").useHttps(true).allowSelfSignedCerts(true)
-                    .hostname(xacmlPdpHost).port(Integer.parseInt(xacmlPdpPort))
-                    .userName("healthcheck").password("zb!XztG34").basePath("policy/pdpx/v1")
-                    .managed(true)
-                    .serializationProvider("org.onap.policy.common.gson.GsonMessageBodyHandler")
-                    .build());
+                .clientName("myClientName").useHttps(true).allowSelfSignedCerts(true)
+                .hostname(xacmlPdpHost).port(Integer.parseInt(xacmlPdpPort))
+                .userName("healthcheck").password("zb!XztG34").basePath("policy/pdpx/v1")
+                .managed(true)
+                .serializationProvider("org.onap.policy.common.gson.GsonMessageBodyHandler")
+                .build());
         } catch (NumberFormatException | HttpClientConfigException e) {
             logger.error("Could not create Http client", e);
             return;
@@ -116,7 +116,7 @@ public class App extends Thread implements TopicListener {
 
         TopicEndpointManager.getManager().start();
 
-        @SuppressWarnings("resource") // never close System.in
+        // never close System.in
         var input = new Scanner(System.in);
         while (!Thread.currentThread().isInterrupted()) {
             String quit = input.nextLine();
@@ -148,23 +148,23 @@ public class App extends Thread implements TopicListener {
     }
 
     /**
-     * Helper method that parses a DMaap message event for POLICY-NOTIFICATION
+     * Helper method that parses a message event for policy-notification
      * looking for our supported policy type to enforce.
      *
-     * @param msg Dmaap Message
-     * @return true if MY_POLICYTYPEID is in the message
+     * @param msg topic message
+     * @return true if MY_POLICY_TYPE_ID is in the message
      */
     private boolean scanForPolicyType(String msg) {
         var gson = new StandardCoder();
         try {
             PolicyNotification notification = gson.decode(msg, PolicyNotification.class);
             for (PolicyStatus added : notification.getAdded()) {
-                if (MY_POLICYTYPEID.equals(added.getPolicyTypeId())) {
+                if (MY_POLICY_TYPE_ID.equals(added.getPolicyTypeId())) {
                     return true;
                 }
             }
             for (PolicyStatus deleted : notification.getDeleted()) {
-                if (MY_POLICYTYPEID.equals(deleted.getPolicyTypeId())) {
+                if (MY_POLICY_TYPE_ID.equals(deleted.getPolicyTypeId())) {
                     return true;
                 }
             }
@@ -179,7 +179,7 @@ public class App extends Thread implements TopicListener {
      * Helper method that calls the XACML PDP Decision API to get a Decision
      * as to which policy we should be enforcing.
      *
-     * @param client HttpClient to use to make REST call
+     * @param client          HttpClient to use to make REST call
      * @param decisionRequest DecisionRequest object to send
      * @return The Map of policies that was in the DecisionResponse object
      */
@@ -188,12 +188,12 @@ public class App extends Thread implements TopicListener {
         requestId++;
 
         Entity<DecisionRequest> entityRequest =
-                Entity.entity(decisionRequest, MediaType.APPLICATION_JSON);
+            Entity.entity(decisionRequest, MediaType.APPLICATION_JSON);
         var response = client.post("/decision", entityRequest, Collections.emptyMap());
 
         if (response.getStatus() != 200) {
             logger.error(
-                    "Decision API failed - is the IP/port correct? {}", response.getStatus());
+                "Decision API failed - is the IP/port correct? {}", response.getStatus());
             return Collections.emptyMap();
         }