2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.openecomp.appc.adapter.messaging.dmaap;
25 import java.util.List;
26 import java.util.Properties;
27 import java.util.UUID;
28 import java.util.concurrent.TimeUnit;
30 import com.att.nsa.mr.client.MRConsumer;
31 import org.json.JSONObject;
32 import org.openecomp.appc.adapter.messaging.dmaap.impl.DmaapUtil;
34 import com.att.nsa.mr.client.MRBatchingPublisher;
35 import com.att.nsa.mr.client.MRClientFactory;
36 import com.att.nsa.mr.client.MRPublisher.message;
40 *An example of how to use the Java publisher.
42 public class SimpleExamplePublisher
45 public static void main(String []args) throws InterruptedException, Exception{
47 SimpleExamplePublisher publisher = new SimpleExamplePublisher();
51 String topicProducerPropFileName = DmaapUtil.createProducerPropFile("org.openecomp.appc.UNIT-TEST", null);
54 publisher.publishMessage(topicProducerPropFileName,i);
62 public void publishMessage( String producerFilePath,int count ) throws IOException, InterruptedException, Exception
64 // create our publisher
65 final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher (producerFilePath);
66 // publish some messages
67 final JSONObject msg1 = new JSONObject ();
68 msg1.put ( "Partition:2", "Message:" +count);
69 //msg1.put ( "greeting", "Hello .." );
71 pub.send ( "2", msg1.toString());
72 // close the publisher to make sure everything's sent before exiting. The batching
73 // publisher interface allows the app to get the set of unsent messages. It could
74 // write them to disk, for example, to try to send them later.
75 final List<message> stuck = pub.close ( 20, TimeUnit.SECONDS );
76 if ( stuck.size () > 0 )
78 System.err.println ( stuck.size() + " messages unsent" );
82 System.out.println ( "Clean exit; all messages sent." );
87 public static void fetchMessage()
93 String topic = "org.openecomp.appc.UNIT-TEST";
94 Properties props = new Properties();
96 props.put("group", "group1");
97 String topicConsumerPropFileName1 = DmaapUtil.createConsumerPropFile(topic,props);
98 final MRConsumer consumer1 = MRClientFactory.createConsumer ( topicConsumerPropFileName1);
100 props = new Properties();
101 props.put("id", "2");
102 props.put("group", "group2");
103 String topicConsumerPropFileName2 = DmaapUtil.createConsumerPropFile(topic,props);
104 final MRConsumer consumer2 = MRClientFactory.createConsumer ( topicConsumerPropFileName2);
106 for ( String msg : consumer1.fetch () )
109 System.out.println ( "consumer1 "+count + ": " + msg );
111 for ( String msg : consumer2.fetch () )
114 System.out.println ( "consumer1 "+count + ": " + msg );
119 catch ( Exception x )
121 System.out.println("inside cons exc");
122 System.err.println ( x.getClass().getName () + ": " + x.getMessage () );