- try {
- pub = new BusPublisher.DmaapPublisherWrapper(dmaapList, topic, aafLogin, aafPassword);
- final JSONObject msg1 = new JSONObject ();
- msg1.put ( "JSON", "DMaaP Update Request UID=" + uniqueID);
- pub.send ( "MyPartitionKey", msg1.toString () );
- } catch (Exception e) {
- logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create DMaaP Publisher: ", e);
- }
- if(pub != null){
- pub.close ();
- }
- }
+ try {
+ pub = new BusPublisher.DmaapPublisherWrapper(dmaapList, topic, aafLogin, aafPassword);
+ final JSONObject msg1 = new JSONObject ();
+ msg1.put ( "JSON", "DMaaP Update Request UID=" + uniqueID);
+ pub.send ( "MyPartitionKey", msg1.toString () );
+ } catch (Exception e) {
+ logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create DMaaP Publisher: ", e);
+ }
+ if(pub != null){
+ pub.close ();
+ }
+ }
+
+ //NOTE: should be able to remove this for DMAAP since we will not be creating topics dynamically
+ public static void createTopic (String topic, String uniquID, List<String> dmaapList, String aafLogin, String aafPassword){
+ ManualClientEndDMAAP.topic = topic;
+ publishMessage(topic, uniquID, dmaapList, aafLogin, aafPassword);
+ }
+
+
+ public static void start(List<String> dmaapList, String topic, String aafLogin, String aafPassword, String uniqueID) {
+
+ ManualClientEndDMAAP.uniquID = uniqueID;
+ ManualClientEndDMAAP.topic = topic;
+
+ String id = "0";
+
+ try {
+ dmaapConsumer = new BusConsumer.DmaapConsumerWrapper(dmaapList, topic, aafLogin, aafPassword, "clientGroup", id, 15*1000, 1000);
+ } catch (Exception e) {
+ logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create DMaaP Consumer: ", e);
+ }
+
+ int retries = 1;
+ boolean isSuccess = false;
+ while (retries < RETRY_LIMIT && !isSuccess) {
+ isSuccess = publishMessageAndSetNotification(dmaapList, topic, aafLogin, aafPassword);
+ retries++;
+ }
+ }