2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.policy.pdp.rest.notifications;
23 import java.io.IOException;
24 import java.net.MalformedURLException;
26 import java.security.GeneralSecurityException;
27 import java.util.LinkedList;
28 import java.util.UUID;
30 import org.openecomp.policy.rest.XACMLRestProperties;
32 import com.att.nsa.cambria.client.CambriaClientFactory;
33 import com.att.nsa.cambria.client.CambriaConsumer;
34 import com.att.nsa.cambria.client.CambriaPublisher;
35 import org.openecomp.policy.xacml.api.XACMLErrorConstants;
36 import com.att.research.xacml.util.XACMLProperties;
38 import org.openecomp.policy.common.logging.flexlogger.*;
40 public class ManualNotificationUpdateThread implements Runnable {
41 private static final Logger logger = FlexLogger.getLogger(ManualNotificationUpdateThread.class);
42 // private static List<String> uebURLList = null;
43 private static String topic = null;
44 private static CambriaConsumer CConsumer = null;
45 // private static Collection<String> clusterList = null;
46 private static String clusterList = null;
47 // private Collection<String> urlList = null;
48 private static String update = null;
50 public volatile boolean isRunning = false;
52 public synchronized boolean isRunning() {
53 return this.isRunning;
56 public synchronized void terminate() {
57 this.isRunning = false;
62 * This is our thread that runs on startup if the system is configured to UEB to accept manual update requests
68 this.isRunning = true;
71 String group = UUID.randomUUID ().toString ();
73 String returnTopic = null;
75 ManualNotificationUpdateThread.clusterList = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_UEB_CLUSTER);
76 String url = XACMLProperties.getProperty(XACMLRestProperties.PROP_PDP_ID);
78 topic = aURL.getHost() + aURL.getPort();
79 } catch (NumberFormatException e) {
80 logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Unable to get UEB cluster list or pdp url: ", e);
81 this.isRunning = false;
82 } catch (MalformedURLException e) {
83 logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing URL to create topic for Notification ", e);
85 String consumerTopic = aURL.getHost() + aURL.getPort() + "UpdateRequest";
86 SendMessage(consumerTopic, "Starting-Topic");
87 final LinkedList<String> urlList = new LinkedList<String> ();
88 for ( String u : clusterList.split ( "," ) ){
93 CConsumer = CambriaClientFactory.createConsumer ( null, urlList, consumerTopic , group, id, 20*1000, 1000 );
94 } catch (MalformedURLException | GeneralSecurityException e1) {
95 // TODO Auto-generated catch block
100 while (this.isRunning()) {
101 logger.debug("While loop test _ take out ");
103 for ( String msg : CConsumer.fetch () ){
104 logger.debug("Manual Notification Recieved Message " + msg + " from UEB cluster : ");
105 returnTopic = processMessage(msg);
106 if(returnTopic != null){
107 SendMessage(returnTopic, update);
110 } catch (IOException e) {
111 logger.debug(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing UEB message");
114 logger.debug("Stopping UEB Consuer loop will not logger fetch messages from the cluser");
118 private void SendMessage( String topic, String message) {
119 CambriaPublisher pub = null;
121 pub = CambriaClientFactory.createSimplePublisher (null, clusterList, topic );
122 } catch (MalformedURLException e1) {
123 // TODO Auto-generated catch block
124 e1.printStackTrace();
125 } catch (GeneralSecurityException e1) {
126 // TODO Auto-generated catch block
127 e1.printStackTrace();
130 pub.send( "pdpReturnMessage", message );
131 logger.debug("Sending to Message to tpoic" + topic);
132 } catch (IOException e) {
133 logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update");
138 private String processMessage(String msg) {
139 logger.debug("notification message: " + msg);
140 String[] UID = msg.split("=")[1].split("\"");
141 String returnTopic = topic + UID[0];
142 if(msg.contains("Starting-Topic")){
147 public static void setUpdate(String update) {
148 ManualNotificationUpdateThread.update = update;