import org.openecomp.policy.common.logging.eelf.PolicyLogger;
import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
import org.openecomp.policy.common.logging.flexlogger.Logger;
+import org.openecomp.policy.pdp.rest.api.services.NotificationService;
import org.openecomp.policy.rest.XACMLRestProperties;
import org.openecomp.policy.utils.BusPublisher;
import org.openecomp.policy.xacml.api.XACMLErrorConstants;
@ServerEndpoint(value = "/notifications")
public class NotificationServer {
private static final Logger LOGGER = FlexLogger.getLogger(NotificationServer.class);
- private static Queue<Session> queue = new ConcurrentLinkedQueue<Session>();
+ private static Queue<Session> queue = new ConcurrentLinkedQueue<>();
private static String update = null;
private static String hosts = null;
private static URL aURL = null;
session.getBasicRemote().sendText(update);
session.close();
} catch (IOException e) {
- LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage());
+ LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage() + e);
LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending Message update");
}
}
LOGGER.debug("Notification set to " + propNotificationType);
if (propNotificationType.equals("ueb")){
-
+
String topic = null;
try {
aURL = new URL(pdpURL);
hosts = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS);
String apiKey = XACMLProperties.getProperty(XACMLRestProperties.PROP_UEB_API_KEY);
String apiSecret = XACMLProperties.getProperty(XACMLRestProperties.PROP_UEB_API_SECRET);
-
+
LOGGER.debug("Creating Publisher for host: " + hosts + " with topic: " + topic);
CambriaBatchingPublisher pub = null;
try {
LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "UEB properties are missing from the property file ");
throw new Exception(XACMLErrorConstants.ERROR_DATA_ISSUE + "UEB properties are missing from the property file ");
}
-
+
hosts.trim();
topic.trim();
apiKey.trim();
.onTopic ( topic )
.authenticatedBy ( apiKey, apiSecret )
.build ()
- ;
-
+ ;
+
} catch (MalformedURLException e1) {
LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage());
} catch (GeneralSecurityException e1) {
- LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage());
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage() +e1);
}
+ if(pub != null){
+ try {
+ pub.send( "MyPartitionKey", notification );
+ } catch (IOException e) {
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e.getMessage() + e);
+ }
+ // close the publisher. The batching publisher does not send events
+ // immediately, so you MUST use close to send any remaining messages.
+ // You provide the amount of time you're willing to wait for the sends
+ // to succeed before giving up. If any messages are unsent after that time,
+ // they're returned to your app. You could, for example, persist to disk
+ // and try again later.
+ final List<?> stuck = pub.close ( 20, TimeUnit.SECONDS );
- try {
- pub.send( "MyPartitionKey", notification );
- } catch (IOException e) {
- LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e.getMessage());
- }
-
- // close the publisher. The batching publisher does not send events
- // immediately, so you MUST use close to send any remaining messages.
- // You provide the amount of time you're willing to wait for the sends
- // to succeed before giving up. If any messages are unsent after that time,
- // they're returned to your app. You could, for example, persist to disk
- // and try again later.
- final List<?> stuck = pub.close ( 20, TimeUnit.SECONDS );
-
- if ( stuck.size () > 0 ){
- LOGGER.error( stuck.size() + " messages unsent" );
- }else{
- LOGGER.info( "Clean exit; all messages sent: " + notification );
+ if (!stuck.isEmpty()){
+ LOGGER.error( stuck.size() + " messages unsent" );
+ }else{
+ LOGGER.info( "Clean exit; all messages sent: " + notification );
+ }
}
-
} else if (propNotificationType.equals("dmaap")) {
// Setting up the Publisher for DMaaP MR
List<String> dmaapList = null;
if(dmaapServers.contains(",")) {
- dmaapList = new ArrayList<String>(Arrays.asList(dmaapServers.split("\\s*,\\s*")));
+ dmaapList = new ArrayList<>(Arrays.asList(dmaapServers.split("\\s*,\\s*")));
} else {
dmaapList = new ArrayList<>();
dmaapList.add(dmaapServers);
publisher.close();
} catch (Exception e) {
- LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e.getMessage());
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e.getMessage() + e);
}
}
try {
session.getBasicRemote().sendText(notification);
} catch (IOException e) {
- LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage());
+ LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage() + e);
}
}
+ NotificationService.sendNotification(notification);
}
-
+
public static void setUpdate(String update) {
NotificationServer.update = update;
}
+
}