bump the version
[dmaap/messagerouter/msgrtr.git] / src / test / java / com / att / sa / cambria / testClient / SimpleExample.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22
23 package com.att.sa.cambria.testClient;
24
25 import kafka.api.FetchRequest;
26 import kafka.api.FetchRequestBuilder;
27 import kafka.api.PartitionOffsetRequestInfo;
28 import kafka.cluster.Broker;
29 import kafka.common.ErrorMapping;
30 import kafka.common.TopicAndPartition;
31 import kafka.javaapi.*;
32 import kafka.javaapi.consumer.SimpleConsumer;
33 import kafka.message.MessageAndOffset;
34
35 import java.io.IOException;
36 import java.nio.ByteBuffer;
37 import java.util.ArrayList;
38 import java.util.HashMap;
39 import java.util.LinkedList;
40 import java.util.List;
41 import java.util.Map;
42
43 public class SimpleExample
44 {
45 //      public static void main ( String args[] )
46 //      {
47 //              if ( args.length < 5 )
48 //              {
49 //                      System.err.println ( "usage: SimpleExample <maxReads> <topic> <partition> <host> <port>" );
50 //                      return;
51 //              }
52 //              
53 //              final long maxReads = Long.parseLong ( args[0] );
54 //              final String topic = args[1];
55 //              final int partition = Integer.parseInt ( args[2] );
56 //
57 //              final int port = Integer.parseInt ( args[4] );
58 //              final hostPort hp = new hostPort ( args[3], port );
59 //              final LinkedList<hostPort> seeds = new LinkedList<hostPort> ();
60 //              seeds.add ( hp );
61 //
62 //              try
63 //              {
64 //                      final SimpleExample example = new SimpleExample ();
65 //                      example.run ( maxReads, topic, partition, seeds );
66 //              }
67 //              catch ( Exception e )
68 //              {
69 //                      System.out.println ( "Oops:" + e );
70 //                      e.printStackTrace ();
71 //              }
72 //      }
73 //
74 //      public SimpleExample ()
75 //      {
76 //              fReplicaBrokers = new ArrayList<hostPort> ();
77 //      }
78 //
79 //      public void run ( long remainingAllowedReads, String a_topic, int a_partition, List<hostPort> seedHosts ) throws IOException
80 //      {
81 //              // find the meta data about the topic and partition we are interested in
82 //
83 //              hostPort leadBroker = findLeader ( seedHosts, a_topic, a_partition );
84 //              if ( leadBroker == null )
85 //              {
86 //                      System.out.println ( "Can't find leader for Topic and Partition. Exiting" );
87 //                      return;
88 //              }
89 //
90 //              final String clientName = "Client_" + a_topic + "_" + a_partition;
91 //
92 //              SimpleConsumer consumer = new SimpleConsumer ( leadBroker.fHost, leadBroker.fPort, 100000, 64 * 1024, clientName );
93 //              long readOffset = getLastOffset ( consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime (), clientName );
94 //
95 //              int numErrors = 0;
96 //              while ( remainingAllowedReads > 0 )
97 //              {
98 //                      if ( consumer == null )
99 //                      {
100 //                              consumer = new SimpleConsumer ( leadBroker.fHost, leadBroker.fPort, 100000, 64 * 1024, clientName );
101 //                      }
102 //
103 //                      final FetchRequest req = new FetchRequestBuilder ()
104 //                              .clientId ( clientName )
105 //                              .addFetch ( a_topic, a_partition, readOffset, 100000 ).build ();
106 //                      final FetchResponse fetchResponse = consumer.fetch ( req );
107 //
108 //                      if ( fetchResponse.hasError () )
109 //                      {
110 //                              numErrors++;
111 //
112 //                              // Something went wrong!
113 //                              final short code = fetchResponse.errorCode ( a_topic, a_partition );
114 //                              System.out.println ( "Error fetching data from the Broker:" + leadBroker + " Reason: " + code );
115 //                              if ( numErrors > 5 )
116 //                                      break;
117 //
118 //                              if ( code == ErrorMapping.OffsetOutOfRangeCode () )
119 //                              {
120 //                                      // We asked for an invalid offset. For simple case ask for
121 //                                      // the last element to reset
122 //                                      readOffset = getLastOffset ( consumer, a_topic,
123 //                                              a_partition, kafka.api.OffsetRequest.LatestTime (),
124 //                                              clientName );
125 //                                      continue;
126 //                              }
127 //
128 //                              consumer.close ();
129 //                              consumer = null;
130 //
131 //                              leadBroker = findNewLeader ( leadBroker, a_topic, a_partition );
132 //                              continue;
133 //                      }
134 //                      numErrors = 0;
135 //
136 //                      long numRead = 0;
137 //                      for ( MessageAndOffset messageAndOffset : fetchResponse.messageSet ( a_topic, a_partition ) )
138 //                      {
139 //                              long currentOffset = messageAndOffset.offset ();
140 //                              if ( currentOffset < readOffset )
141 //                              {
142 //                                      System.out.println ( "Found an old offset: "
143 //                                              + currentOffset + " Expecting: " + readOffset );
144 //                                      continue;
145 //                              }
146 //                              readOffset = messageAndOffset.nextOffset ();
147 //                              ByteBuffer payload = messageAndOffset.message ().payload ();
148 //
149 //                              byte[] bytes = new byte [payload.limit ()];
150 //                              payload.get ( bytes );
151 //                              System.out.println ( String.valueOf ( messageAndOffset.offset () ) + ": " + new String ( bytes, "UTF-8" ) );
152 //                              numRead++;
153 //                              remainingAllowedReads--;
154 //                      }
155 //
156 //                      if ( numRead == 0 )
157 //                      {
158 //                              try
159 //                              {
160 //                                      Thread.sleep ( 1000 );
161 //                              }
162 //                              catch ( InterruptedException ie )
163 //                              {
164 //                              }
165 //                      }
166 //              }
167 //
168 //              if ( consumer != null )
169 //              {
170 //                      consumer.close ();
171 //              }
172 //      }
173 //
174 //      public static long getLastOffset ( SimpleConsumer consumer, String topic,
175 //              int partition, long whichTime, String clientName )
176 //      {
177 //              TopicAndPartition topicAndPartition = new TopicAndPartition ( topic,
178 //                      partition );
179 //              Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo> ();
180 //              requestInfo.put ( topicAndPartition, new PartitionOffsetRequestInfo (
181 //                      whichTime, 1 ) );
182 //              kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest (
183 //                      requestInfo, kafka.api.OffsetRequest.CurrentVersion (), clientName );
184 //              OffsetResponse response = consumer.getOffsetsBefore ( request );
185 //
186 //              if ( response.hasError () )
187 //              {
188 //                      System.out.println ( "Error fetching data Offset Data the Broker. Reason: "
189 //                                      + response.errorCode ( topic, partition ) );
190 //                      return 0;
191 //              }
192 //
193 //              final long[] offsets = response.offsets ( topic, partition );
194 //              return offsets[0];
195 //      }
196 //
197 //      /**
198 //       * Find a new leader for a topic/partition, including a pause for the coordinator to 
199 //       * find a new leader, as needed.
200 //       * 
201 //       * @param oldLeader
202 //       * @param topic
203 //       * @param partition
204 //       * @return
205 //       * @throws IOException
206 //       */
207 //      private hostPort findNewLeader ( hostPort oldLeader, String topic, int partition ) throws IOException
208 //      {
209 //              try
210 //              {
211 //                      int attemptsLeft = 3;
212 //                      boolean haveSlept = false;
213 //
214 //                      while ( attemptsLeft-- > 0 )
215 //                      {
216 //                              System.out.println ( "" + attemptsLeft + " attempts Left" );    // FIXME: make sure it's 3 attempts!
217 //
218 //                              // ask the brokers for a leader
219 //                              final hostPort newLeader = findLeader ( fReplicaBrokers, topic, partition );
220 //                              if ( newLeader != null )
221 //                              {
222 //                                      // we can use this leader if it's different (i.e. a new leader has been elected)
223 //                                      // or it's the same leader, but we waited to allow ZK to get a new one, and
224 //                                      // the original recovered
225 //                                      if ( !oldLeader.equals ( newLeader ) || haveSlept )
226 //                                      {
227 //                                              return newLeader;
228 //                                      }
229 //                              }
230 //
231 //                              // sleep
232 //                              haveSlept = true;
233 //                              Thread.sleep ( 1000 );
234 //                      }
235 //              }
236 //              catch ( InterruptedException x )
237 //              {
238 //                      // just give up
239 //              }
240 //
241 //              System.out.println ( "Unable to find new leader after Broker failure. Exiting" );
242 //              throw new IOException ( "Unable to find new leader after Broker failure. Exiting" );
243 //      }
244 //
245 //      /**
246 //       * Given one or more seed brokers, find the leader for a given topic/partition
247 //       * @param seeds
248 //       * @param topic
249 //       * @param partition
250 //       * @return partition metadata, or null
251 //       */
252 //      private hostPort findLeader ( List<hostPort> seeds, String topic, int partition )
253 //      {
254 //              final List<String> topics = new ArrayList<String> ();
255 //              topics.add ( topic );
256 //
257 //              for ( hostPort seed : seeds )
258 //              {
259 //                      final SimpleConsumer consumer = new SimpleConsumer ( seed.fHost, seed.fPort, 100000, 64 * 1024, "leaderLookup" );
260 //                      final TopicMetadataRequest req = new TopicMetadataRequest ( topics );
261 //                      final TopicMetadataResponse resp = consumer.send ( req );
262 //                      consumer.close ();
263 //
264 //                      final List<TopicMetadata> metaData = resp.topicsMetadata ();
265 //                      for ( TopicMetadata item : metaData )
266 //                      {
267 //                              for ( PartitionMetadata part : item.partitionsMetadata () )
268 //                              {
269 //                                      if ( part.partitionId () == partition )
270 //                                      {
271 //                                              // found our partition. load the details, then return it
272 //                                              fReplicaBrokers.clear ();
273 //                                              for ( kafka.cluster.Broker replica : part.replicas () )
274 //                                              {
275 //                                                      fReplicaBrokers.add ( new hostPort ( replica.host (), replica.port () ) );
276 //                                              }
277 //                                              return new hostPort ( part.leader () );
278 //                                      }
279 //                              }
280 //                      }
281 //              }
282 //
283 //              return null;
284 //      }
285 //
286 //      private static class hostPort
287 //      {
288 //              public hostPort ( String host, int port ) { fHost = host; fPort = port; }
289 //
290 //              public hostPort ( Broker leader )
291 //              {
292 //                      fHost = leader.host ();
293 //                      fPort = leader.port ();
294 //              }
295 //
296 //              
297 //              public final String fHost;
298 //              public final int fPort;
299 //
300 //              @Override
301 //              public int hashCode ()
302 //              {
303 //                      final int prime = 31;
304 //                      int result = 1;
305 //                      result = prime * result
306 //                              + ( ( fHost == null ) ? 0 : fHost.hashCode () );
307 //                      result = prime * result + fPort;
308 //                      return result;
309 //              }
310 //
311 //              @Override
312 //              public boolean equals ( Object obj )
313 //              {
314 //                      if ( this == obj )
315 //                              return true;
316 //                      if ( obj == null )
317 //                              return false;
318 //                      if ( getClass () != obj.getClass () )
319 //                              return false;
320 //                      hostPort other = (hostPort) obj;
321 //                      if ( fHost == null )
322 //                      {
323 //                              if ( other.fHost != null )
324 //                                      return false;
325 //                      }
326 //                      else if ( !fHost.equals ( other.fHost ) )
327 //                              return false;
328 //                      if ( fPort != other.fPort )
329 //                              return false;
330 //                      return true;
331 //              }
332 //      }
333 //      
334 //      private List<hostPort> fReplicaBrokers;
335 }