7d047437e1c2d3703d70570bf255fb796e078fcd
[appc.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * APPC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright (C) 2017 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  * 
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  * 
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  */
22
23 package org.openecomp.appc.adapter.messaging.dmaap;
24
25 import java.io.*;
26 import java.util.List;
27 import java.util.Properties;
28 import java.util.UUID;
29 import java.util.concurrent.TimeUnit;
30
31 import com.att.nsa.mr.client.MRConsumer;
32 import org.json.JSONObject;
33 import org.openecomp.appc.adapter.messaging.dmaap.impl.DmaapUtil;
34
35 import com.att.nsa.mr.client.MRBatchingPublisher;
36 import com.att.nsa.mr.client.MRClientFactory;
37 import com.att.nsa.mr.client.MRPublisher.message;
38
39
40 /**
41  *An example of how to use the Java publisher.
42  */
43 public class SimpleExamplePublisher
44 {
45
46     public static void main(String []args) throws InterruptedException, Exception{
47         int msgCount = 1;
48         SimpleExamplePublisher publisher = new SimpleExamplePublisher();
49
50         int i=0;
51
52         String topicProducerPropFileName = DmaapUtil.createProducerPropFile("org.openecomp.appc.UNIT-TEST", null);
53         while (i< msgCount)
54         {
55             publisher.publishMessage(topicProducerPropFileName,i);
56             i++;
57         }
58
59         fetchMessage();
60     }
61
62
63     public void publishMessage( String producerFilePath,int count  ) throws IOException, InterruptedException, Exception
64     {
65         // create our publisher
66         final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher (producerFilePath);
67         // publish some messages
68         final JSONObject msg1 = new JSONObject ();
69         msg1.put ( "Partition:2", "Message:" +count);
70         //msg1.put ( "greeting", "Hello  .." );
71
72         pub.send ( "2", msg1.toString());
73         // close the publisher to make sure everything's sent before exiting. The batching
74         // publisher interface allows the app to get the set of unsent messages. It could
75         // write them to disk, for example, to try to send them later.
76         final List<message> stuck = pub.close ( 20, TimeUnit.SECONDS );
77         if ( stuck.size () > 0 )
78         {
79             System.err.println ( stuck.size() + " messages unsent" );
80         }
81         else
82         {
83             System.out.println ( "Clean exit; all messages sent." );
84         }
85     }
86
87
88     public static void fetchMessage()
89     {
90         int count = 0;
91
92         try
93         {
94             String topic = "org.openecomp.appc.UNIT-TEST";
95             Properties props = new Properties();
96             props.put("id", "1");
97             props.put("group", "group1");
98             String topicConsumerPropFileName1 = DmaapUtil.createConsumerPropFile(topic,props);
99             final MRConsumer consumer1 = MRClientFactory.createConsumer ( topicConsumerPropFileName1);
100
101             props = new Properties();
102             props.put("id", "2");
103             props.put("group", "group2");
104             String topicConsumerPropFileName2 = DmaapUtil.createConsumerPropFile(topic,props);
105             final MRConsumer consumer2 = MRClientFactory.createConsumer ( topicConsumerPropFileName2);
106
107             for ( String msg : consumer1.fetch () )
108             {
109                 count++;
110                 System.out.println ( "consumer1 "+count + ": " + msg );
111             }
112             for ( String msg : consumer2.fetch () )
113             {
114                 count++;
115                 System.out.println ( "consumer1 "+count + ": " + msg );
116             }
117
118
119         }
120         catch ( Exception x )
121         {
122             System.out.println("inside cons exc");
123             System.err.println ( x.getClass().getName () + ": " + x.getMessage () );
124         }
125     }
126 }
127
128
129
130
131
132
133
134
135