import com.att.nsa.cambria.client.CambriaPublisher;
import com.att.research.xacml.util.XACMLProperties;
+@SuppressWarnings("deprecation")
public class ManualNotificationUpdateThread implements Runnable {
private static final Logger LOGGER = FlexLogger.getLogger(ManualNotificationUpdateThread.class);
} catch (MalformedURLException e) {
LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing URL to create topic for Notification ", e);
}
-
- String consumerTopic = aURL.getHost() + aURL.getPort() + "UpdateRequest";
- SendMessage(consumerTopic, "Starting-Topic");
- final LinkedList<String> urlList = new LinkedList<String> ();
- for ( String u : clusterList.split ( "," ) ){
- urlList.add ( u );
- }
-
- try {
- CConsumer = CambriaClientFactory.createConsumer ( null, urlList, consumerTopic , group, id, 20*1000, 1000 );
- } catch (MalformedURLException | GeneralSecurityException e1) {
- LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create UEB Consumer: ", e1);
- }
+ if(aURL != null){
+ String consumerTopic = aURL.getHost() + aURL.getPort() + "UpdateRequest";
+ SendMessage(consumerTopic, "Starting-Topic");
+ final LinkedList<String> urlList = new LinkedList<> ();
+ for ( String u : clusterList.split ( "," ) ){
+ urlList.add ( u );
+ }
- while (this.isRunning()) {
- LOGGER.debug("While loop test _ take out ");
try {
- for ( String msg : CConsumer.fetch () ){
- LOGGER.debug("Manual Notification Recieved Message " + msg + " from UEB cluster : ");
- returnTopic = processMessage(msg);
- if(returnTopic != null){
- SendMessage(returnTopic, update);
+ CConsumer = CambriaClientFactory.createConsumer ( null, urlList, consumerTopic , group, id, 20*1000, 1000 );
+ } catch (MalformedURLException | GeneralSecurityException e1) {
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create UEB Consumer: ", e1);
+ }
+
+ while (this.isRunning()) {
+ LOGGER.debug("While loop test _ take out ");
+ try {
+ for ( String msg : CConsumer.fetch () ){
+ LOGGER.debug("Manual Notification Recieved Message " + msg + " from UEB cluster : ");
+ returnTopic = processMessage(msg);
+ if(returnTopic != null){
+ SendMessage(returnTopic, update);
+ }
}
+ } catch (IOException e) {
+ LOGGER.debug(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing UEB message" + e);
}
- } catch (IOException e) {
- LOGGER.debug(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing UEB message");
}
+ LOGGER.debug("Stopping UEB Consumer loop will no longer fetch messages from the cluster");
}
- LOGGER.debug("Stopping UEB Consumer loop will no longer fetch messages from the cluster");
} else if ("dmaap".equals(propNotificationType)) {
String dmaapServers = null;
try {
try {
throw new Exception(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
} catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ LOGGER.error(e);
}
}
String consumerTopic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC).trim();
SendMessage(consumerTopic, "Starting-Topic");
- dmaapList = new ArrayList<String>();
+ dmaapList = new ArrayList<>();
for ( String u : dmaapServers.split ( "," ) ){
dmaapList.add ( u );
}
} catch (Exception e) {
LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update: ", e);
}
-
- try {
- pub.send( "pdpReturnMessage", message );
- LOGGER.debug("Sending to Message to tpoic" + topic);
- } catch (IOException e) {
- LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update");
- }
- pub.close();
+ if(pub != null){
+ try {
+ pub.send( "pdpReturnMessage", message );
+ LOGGER.debug("Sending to Message to tpoic" + topic);
+ pub.close();
+ } catch (IOException e) {
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update" +e);
+ }
+ }
}
private String processMessage(String msg) {