1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
23 package com.att.sa.cambria.testClient;
25 import kafka.api.FetchRequest;
26 import kafka.api.FetchRequestBuilder;
27 import kafka.api.PartitionOffsetRequestInfo;
28 import kafka.cluster.Broker;
29 import kafka.common.ErrorMapping;
30 import kafka.common.TopicAndPartition;
31 import kafka.javaapi.*;
32 import kafka.javaapi.consumer.SimpleConsumer;
33 import kafka.message.MessageAndOffset;
35 import java.io.IOException;
36 import java.nio.ByteBuffer;
37 import java.util.ArrayList;
38 import java.util.HashMap;
39 import java.util.LinkedList;
40 import java.util.List;
43 public class SimpleExample
45 // public static void main ( String args[] )
47 // if ( args.length < 5 )
49 // System.err.println ( "usage: SimpleExample <maxReads> <topic> <partition> <host> <port>" );
53 // final long maxReads = Long.parseLong ( args[0] );
54 // final String topic = args[1];
55 // final int partition = Integer.parseInt ( args[2] );
57 // final int port = Integer.parseInt ( args[4] );
58 // final hostPort hp = new hostPort ( args[3], port );
59 // final LinkedList<hostPort> seeds = new LinkedList<hostPort> ();
64 // final SimpleExample example = new SimpleExample ();
65 // example.run ( maxReads, topic, partition, seeds );
67 // catch ( Exception e )
69 // System.out.println ( "Oops:" + e );
70 // e.printStackTrace ();
74 // public SimpleExample ()
76 // fReplicaBrokers = new ArrayList<hostPort> ();
79 // public void run ( long remainingAllowedReads, String a_topic, int a_partition, List<hostPort> seedHosts ) throws IOException
81 // // find the meta data about the topic and partition we are interested in
83 // hostPort leadBroker = findLeader ( seedHosts, a_topic, a_partition );
84 // if ( leadBroker == null )
86 // System.out.println ( "Can't find leader for Topic and Partition. Exiting" );
90 // final String clientName = "Client_" + a_topic + "_" + a_partition;
92 // SimpleConsumer consumer = new SimpleConsumer ( leadBroker.fHost, leadBroker.fPort, 100000, 64 * 1024, clientName );
93 // long readOffset = getLastOffset ( consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime (), clientName );
96 // while ( remainingAllowedReads > 0 )
98 // if ( consumer == null )
100 // consumer = new SimpleConsumer ( leadBroker.fHost, leadBroker.fPort, 100000, 64 * 1024, clientName );
103 // final FetchRequest req = new FetchRequestBuilder ()
104 // .clientId ( clientName )
105 // .addFetch ( a_topic, a_partition, readOffset, 100000 ).build ();
106 // final FetchResponse fetchResponse = consumer.fetch ( req );
108 // if ( fetchResponse.hasError () )
112 // // Something went wrong!
113 // final short code = fetchResponse.errorCode ( a_topic, a_partition );
114 // System.out.println ( "Error fetching data from the Broker:" + leadBroker + " Reason: " + code );
115 // if ( numErrors > 5 )
118 // if ( code == ErrorMapping.OffsetOutOfRangeCode () )
120 // // We asked for an invalid offset. For simple case ask for
121 // // the last element to reset
122 // readOffset = getLastOffset ( consumer, a_topic,
123 // a_partition, kafka.api.OffsetRequest.LatestTime (),
128 // consumer.close ();
131 // leadBroker = findNewLeader ( leadBroker, a_topic, a_partition );
137 // for ( MessageAndOffset messageAndOffset : fetchResponse.messageSet ( a_topic, a_partition ) )
139 // long currentOffset = messageAndOffset.offset ();
140 // if ( currentOffset < readOffset )
142 // System.out.println ( "Found an old offset: "
143 // + currentOffset + " Expecting: " + readOffset );
146 // readOffset = messageAndOffset.nextOffset ();
147 // ByteBuffer payload = messageAndOffset.message ().payload ();
149 // byte[] bytes = new byte [payload.limit ()];
150 // payload.get ( bytes );
151 // System.out.println ( String.valueOf ( messageAndOffset.offset () ) + ": " + new String ( bytes, "UTF-8" ) );
153 // remainingAllowedReads--;
156 // if ( numRead == 0 )
160 // Thread.sleep ( 1000 );
162 // catch ( InterruptedException ie )
168 // if ( consumer != null )
170 // consumer.close ();
174 // public static long getLastOffset ( SimpleConsumer consumer, String topic,
175 // int partition, long whichTime, String clientName )
177 // TopicAndPartition topicAndPartition = new TopicAndPartition ( topic,
179 // Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo> ();
180 // requestInfo.put ( topicAndPartition, new PartitionOffsetRequestInfo (
182 // kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest (
183 // requestInfo, kafka.api.OffsetRequest.CurrentVersion (), clientName );
184 // OffsetResponse response = consumer.getOffsetsBefore ( request );
186 // if ( response.hasError () )
188 // System.out.println ( "Error fetching data Offset Data the Broker. Reason: "
189 // + response.errorCode ( topic, partition ) );
193 // final long[] offsets = response.offsets ( topic, partition );
194 // return offsets[0];
198 // * Find a new leader for a topic/partition, including a pause for the coordinator to
199 // * find a new leader, as needed.
201 // * @param oldLeader
203 // * @param partition
205 // * @throws IOException
207 // private hostPort findNewLeader ( hostPort oldLeader, String topic, int partition ) throws IOException
211 // int attemptsLeft = 3;
212 // boolean haveSlept = false;
214 // while ( attemptsLeft-- > 0 )
216 // System.out.println ( "" + attemptsLeft + " attempts Left" ); // FIXME: make sure it's 3 attempts!
218 // // ask the brokers for a leader
219 // final hostPort newLeader = findLeader ( fReplicaBrokers, topic, partition );
220 // if ( newLeader != null )
222 // // we can use this leader if it's different (i.e. a new leader has been elected)
223 // // or it's the same leader, but we waited to allow ZK to get a new one, and
224 // // the original recovered
225 // if ( !oldLeader.equals ( newLeader ) || haveSlept )
233 // Thread.sleep ( 1000 );
236 // catch ( InterruptedException x )
241 // System.out.println ( "Unable to find new leader after Broker failure. Exiting" );
242 // throw new IOException ( "Unable to find new leader after Broker failure. Exiting" );
246 // * Given one or more seed brokers, find the leader for a given topic/partition
249 // * @param partition
250 // * @return partition metadata, or null
252 // private hostPort findLeader ( List<hostPort> seeds, String topic, int partition )
254 // final List<String> topics = new ArrayList<String> ();
255 // topics.add ( topic );
257 // for ( hostPort seed : seeds )
259 // final SimpleConsumer consumer = new SimpleConsumer ( seed.fHost, seed.fPort, 100000, 64 * 1024, "leaderLookup" );
260 // final TopicMetadataRequest req = new TopicMetadataRequest ( topics );
261 // final TopicMetadataResponse resp = consumer.send ( req );
262 // consumer.close ();
264 // final List<TopicMetadata> metaData = resp.topicsMetadata ();
265 // for ( TopicMetadata item : metaData )
267 // for ( PartitionMetadata part : item.partitionsMetadata () )
269 // if ( part.partitionId () == partition )
271 // // found our partition. load the details, then return it
272 // fReplicaBrokers.clear ();
273 // for ( kafka.cluster.Broker replica : part.replicas () )
275 // fReplicaBrokers.add ( new hostPort ( replica.host (), replica.port () ) );
277 // return new hostPort ( part.leader () );
286 // private static class hostPort
288 // public hostPort ( String host, int port ) { fHost = host; fPort = port; }
290 // public hostPort ( Broker leader )
292 // fHost = leader.host ();
293 // fPort = leader.port ();
297 // public final String fHost;
298 // public final int fPort;
301 // public int hashCode ()
303 // final int prime = 31;
305 // result = prime * result
306 // + ( ( fHost == null ) ? 0 : fHost.hashCode () );
307 // result = prime * result + fPort;
312 // public boolean equals ( Object obj )
314 // if ( this == obj )
316 // if ( obj == null )
318 // if ( getClass () != obj.getClass () )
320 // hostPort other = (hostPort) obj;
321 // if ( fHost == null )
323 // if ( other.fHost != null )
326 // else if ( !fHost.equals ( other.fHost ) )
328 // if ( fPort != other.fPort )
334 // private List<hostPort> fReplicaBrokers;