Support for kafka within xacml tutorial
[policy/xacml-pdp.git] / tutorials / tutorial-enforcement / src / main / java / org / onap / policy / tutorial / policyenforcement / App.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
4  * Modifications Copyright (C) 2023-2024 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  * ============LICENSE_END=========================================================
18  */
19
20 package org.onap.policy.tutorial.policyenforcement;
21
22 import jakarta.ws.rs.client.Entity;
23 import jakarta.ws.rs.core.MediaType;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Map.Entry;
29 import java.util.Scanner;
30 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
31 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
32 import org.onap.policy.common.endpoints.event.comm.TopicListener;
33 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
34 import org.onap.policy.common.endpoints.http.client.HttpClient;
35 import org.onap.policy.common.endpoints.http.client.HttpClientConfigException;
36 import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance;
37 import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
38 import org.onap.policy.common.endpoints.parameters.TopicParameters;
39 import org.onap.policy.common.utils.coder.CoderException;
40 import org.onap.policy.common.utils.coder.StandardCoder;
41 import org.onap.policy.models.decisions.concepts.DecisionRequest;
42 import org.onap.policy.models.decisions.concepts.DecisionResponse;
43 import org.onap.policy.models.pap.concepts.PolicyNotification;
44 import org.onap.policy.models.pap.concepts.PolicyStatus;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 public class App extends Thread implements TopicListener {
49     private static final Logger logger = LoggerFactory.getLogger(App.class);
50     private static final String MY_POLICY_TYPE_ID = "onap.policies.monitoring.MyAnalytic";
51     private final String xacmlPdpHost;
52     private final String xacmlPdpPort;
53     private final DecisionRequest decisionRequest = new DecisionRequest();
54     private Integer requestId = 1;
55     private HttpClient client = null;
56
57     /**
58      * Constructor.
59      *
60      * @param args Command line arguments
61      */
62     public App(String[] args) {
63         xacmlPdpHost = args[0];
64         xacmlPdpPort = args[1];
65
66         var params = new TopicParameters();
67         params.setTopicCommInfrastructure("kafka");
68         params.setFetchLimit(1);
69         params.setFetchTimeout(5000);
70         params.setTopic("policy-notification");
71         params.setServers(List.of(args[2] + ":" + args[3]));
72         var topicParams = new TopicParameterGroup();
73         topicParams.setTopicSources(List.of(params));
74
75         TopicEndpointManager.getManager().addTopics(topicParams);
76         TopicEndpointManager.getManager().getKafkaTopicSource("policy-notification").register(this);
77
78         decisionRequest.setOnapComponent("myComponent");
79         decisionRequest.setOnapName("myName");
80         decisionRequest.setOnapInstance("myInstanceId");
81         decisionRequest.setAction("configure");
82         Map<String, Object> resources = new HashMap<>();
83         resources.put("policy-type", MY_POLICY_TYPE_ID);
84         decisionRequest.setResource(resources);
85     }
86
87     /**
88      * Thread run method that creates a connection and gets an initial Decision on which policy(s)
89      * we should be enforcing.
90      * Then sits waiting for the user to enter q or Q from the keyboard to quit. While waiting,
91      * listen on a topic for notification that the policy has changed.
92      */
93     @Override
94     public void run() {
95         logger.info("running - type q to stdin to quit");
96         try {
97             client = HttpClientFactoryInstance.getClientFactory().build(BusTopicParams.builder()
98                 .clientName("myClientName").useHttps(true).allowSelfSignedCerts(true)
99                 .hostname(xacmlPdpHost).port(Integer.parseInt(xacmlPdpPort))
100                 .userName("healthcheck").password("zb!XztG34").basePath("policy/pdpx/v1")
101                 .managed(true)
102                 .serializationProvider("org.onap.policy.common.gson.GsonMessageBodyHandler")
103                 .build());
104         } catch (NumberFormatException | HttpClientConfigException e) {
105             logger.error("Could not create Http client", e);
106             return;
107         }
108
109         Map<String, Object> policies = getDecision(client, this.decisionRequest);
110         if (policies.isEmpty()) {
111             logger.info("Not enforcing any policies to start");
112         }
113         for (Entry<String, Object> entrySet : policies.entrySet()) {
114             logger.info("Enforcing: {}", entrySet.getKey());
115         }
116
117         TopicEndpointManager.getManager().start();
118
119         // never close System.in
120         var input = new Scanner(System.in);
121         while (!Thread.currentThread().isInterrupted()) {
122             String quit = input.nextLine();
123             if ("q".equalsIgnoreCase(quit)) {
124                 logger.info("quiting");
125                 break;
126             }
127         }
128
129         TopicEndpointManager.getManager().shutdown();
130
131     }
132
133     /**
134      * This method is called when a topic event is received.
135      */
136     @Override
137     public void onTopicEvent(CommInfrastructure infra, String topic, String event) {
138         logger.info("onTopicEvent {}", event);
139         if (scanForPolicyType(event)) {
140             Map<String, Object> newPolicies = getDecision(client, this.decisionRequest);
141             if (newPolicies.isEmpty()) {
142                 logger.info("Not enforcing any policies");
143             }
144             for (Entry<String, Object> entrySet : newPolicies.entrySet()) {
145                 logger.info("Now Enforcing: {}", entrySet.getKey());
146             }
147         }
148     }
149
150     /**
151      * Helper method that parses a message event for policy-notification
152      * looking for our supported policy type to enforce.
153      *
154      * @param msg topic message
155      * @return true if MY_POLICY_TYPE_ID is in the message
156      */
157     private boolean scanForPolicyType(String msg) {
158         var gson = new StandardCoder();
159         try {
160             PolicyNotification notification = gson.decode(msg, PolicyNotification.class);
161             for (PolicyStatus added : notification.getAdded()) {
162                 if (MY_POLICY_TYPE_ID.equals(added.getPolicyTypeId())) {
163                     return true;
164                 }
165             }
166             for (PolicyStatus deleted : notification.getDeleted()) {
167                 if (MY_POLICY_TYPE_ID.equals(deleted.getPolicyTypeId())) {
168                     return true;
169                 }
170             }
171         } catch (CoderException e) {
172             logger.error("StandardCoder failed to parse PolicyNotification", e);
173         }
174         return false;
175     }
176
177
178     /**
179      * Helper method that calls the XACML PDP Decision API to get a Decision
180      * as to which policy we should be enforcing.
181      *
182      * @param client          HttpClient to use to make REST call
183      * @param decisionRequest DecisionRequest object to send
184      * @return The Map of policies that was in the DecisionResponse object
185      */
186     private Map<String, Object> getDecision(HttpClient client, DecisionRequest decisionRequest) {
187         decisionRequest.setRequestId(requestId.toString());
188         requestId++;
189
190         Entity<DecisionRequest> entityRequest =
191             Entity.entity(decisionRequest, MediaType.APPLICATION_JSON);
192         var response = client.post("/decision", entityRequest, Collections.emptyMap());
193
194         if (response.getStatus() != 200) {
195             logger.error(
196                 "Decision API failed - is the IP/port correct? {}", response.getStatus());
197             return Collections.emptyMap();
198         }
199
200         var decisionResponse = HttpClient.getBody(response, DecisionResponse.class);
201
202         return decisionResponse.getPolicies();
203     }
204
205     /**
206      * Our Main application entry point.
207      *
208      * @param args command line arguments
209      */
210     public static void main(String[] args) {
211         logger.info("Hello Welcome to ONAP Enforcement Tutorial!");
212
213         var app = new App(args);
214
215         app.start();
216
217         try {
218             app.join();
219         } catch (InterruptedException e) {
220             Thread.currentThread().interrupt();
221             logger.warn("Thread interrupted");
222         }
223
224         logger.info("Tutorial ended");
225     }
226
227 }