d71182c02e1023b3080769942ce549d48a758d19
[dmaap/messagerouter/messageservice.git] / src / main / java / com / att / nsa / dmaap / tools / ConfigTool.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.nsa.dmaap.tools;
23
24 import java.io.IOException;
25 import java.io.PrintStream;
26 import java.security.NoSuchAlgorithmException;
27 import java.util.Date;
28 import java.util.LinkedList;
29 import java.util.Map.Entry;
30
31 import org.json.JSONException;
32
33 import com.att.nsa.apiServer.CommonServlet;
34 import com.att.nsa.cambria.beans.DMaaPKafkaMetaBroker;
35 import com.att.nsa.cambria.metabroker.Topic;
36 import com.att.nsa.cmdtool.Command;
37 import com.att.nsa.cmdtool.CommandLineTool;
38 import com.att.nsa.cmdtool.CommandNotReadyException;
39 import com.att.nsa.configs.ConfigDb;
40 import com.att.nsa.configs.ConfigDbException;
41 import com.att.nsa.configs.ConfigPath;
42 import com.att.nsa.configs.confimpl.EncryptingLayer;
43 import com.att.nsa.configs.confimpl.ZkConfigDb;
44 import com.att.nsa.drumlin.till.data.rrConvertor;
45 import com.att.nsa.drumlin.till.data.uniqueStringGenerator;
46 import com.att.nsa.drumlin.till.nv.impl.nvWriteableTable;
47 import com.att.nsa.security.db.BaseNsaApiDbImpl;
48 import com.att.nsa.security.db.EncryptingApiDbImpl;
49 import com.att.nsa.security.db.NsaApiDb.KeyExistsException;
50 import com.att.nsa.security.db.simple.NsaSimpleApiKey;
51 import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory;
52 import com.att.nsa.util.NsaClock;
53
54 public class ConfigTool extends CommandLineTool<ConfigToolContext>
55 {
56         protected ConfigTool ()
57         {
58                 super ( "Cambria API Config Tool", "cambriaConfig> " );
59
60                 super.registerCommand ( new ListTopicCommand  () );
61                 super.registerCommand ( new WriteTopicCommand  () );
62                 super.registerCommand ( new ReadTopicCommand  () );
63                 super.registerCommand ( new SetTopicOwnerCommand () );
64                 super.registerCommand ( new InitSecureTopicCommand () );
65                 super.registerCommand ( new ListApiKeysCommand () );
66                 super.registerCommand ( new PutApiCommand () );
67                 super.registerCommand ( new writeApiKeyCommand () );
68                 super.registerCommand ( new EncryptApiKeysCommand () );
69                 super.registerCommand ( new DecryptApiKeysCommand () );
70                 super.registerCommand ( new NodeFetchCommand () );
71                 super.registerCommand ( new DropOldConsumerGroupsCommand () );
72         }
73
74         public static void main ( String[] args ) throws IOException
75         {
76                 final String connStr = args.length>0 ? args[0] : "localhost:2181"; 
77                 final ConfigDb db = new ZkConfigDb (
78                         connStr,
79                         args.length>1 ? args[1] : CommonServlet.getDefaultZkRoot ( "cambria" )
80                 );
81
82                 final ConfigToolContext context = new ConfigToolContext ( db, connStr, new nvWriteableTable() );
83                 final ConfigTool ct = new ConfigTool ();
84                 ct.runFromMain ( args, context );
85         }
86
87         private static class ListTopicCommand implements Command<ConfigToolContext>
88         {
89                 @Override
90                 public String[] getMatches ()
91                 {
92                         return new String[] { "topics", "list (\\S*)" };
93                 }
94
95                 @Override
96                 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
97                 {
98                 }
99
100                 @Override
101                 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
102                 {
103                         try
104                         {
105                                 final ConfigDb db = context.getDb();
106                                 final ConfigPath base = db.parse ( "/topics" );
107                                 
108                                 if ( parts.length > 0 )
109                                 {
110                                         final ConfigPath myTopic = base.getChild ( parts[0] );
111                                         final String data = db.load ( myTopic );
112                                         if ( data != null )
113                                         {
114                                                 out.println ( data );
115                                         }
116                                         else
117                                         {
118                                                 out.println ( "No topic [" + parts[0] + "]" );
119                                         }
120                                 }
121                                 else
122                                 {
123                                         for ( ConfigPath child : db.loadChildrenNames ( base ) )
124                                         {
125                                                 out.println ( child.getName () );
126                                         }
127                                 }
128                         }
129                         catch ( ConfigDbException e )
130                         {
131                                 out.println ( "Command failed: " + e);
132                         }
133                 }
134
135                 @Override
136                 public void displayHelp ( PrintStream out )
137                 {
138                         out.println ( "topics" );
139                         out.println ( "list <topic>" );
140                 }
141         }
142
143         private static class WriteTopicCommand implements Command<ConfigToolContext>
144         {
145                 @Override
146                 public String[] getMatches ()
147                 {
148                         return new String[] { "write (\\S*) (\\S*)" };
149                 }
150
151                 @Override
152                 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
153                 {
154                 }
155
156                 @Override
157                 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
158                 {
159                         try
160                         {
161                                 final ConfigDb db = context.getDb();
162                                 final ConfigPath base = db.parse ( "/topics" );
163                                 final ConfigPath myTopic = base.getChild ( parts[0] );
164                                 db.store ( myTopic, parts[1] );
165                                 out.println ( "wrote [" + parts[1] + "] to topic [" + parts[0] + "]" );
166                         }
167                         catch ( ConfigDbException e )
168                         {
169                                 out.println ( "Command failed: " + e.getMessage() );
170                                 throw new RuntimeException(e);
171                         }
172                 }
173
174                 @Override
175                 public void displayHelp ( PrintStream out )
176                 {
177                         out.println ( "write <topic> <string>" );
178                         out.println ( "\tBe careful with this. You can write data that's not compatible with Cambria's config db." );
179                 }
180         }
181
182         private static class ReadTopicCommand implements Command<ConfigToolContext>
183         {
184                 @Override
185                 public String[] getMatches ()
186                 {
187                         return new String[] { "read (\\S*)" };
188                 }
189
190                 @Override
191                 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
192                 {
193                 }
194
195                 @Override
196                 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
197                 {
198                         try
199                         {
200                                 final ConfigDb db = context.getDb();
201                                 final ConfigPath base = db.parse ( "/topics" );
202                                 final ConfigPath myTopic = base.getChild ( parts[0] );
203                                 db.store ( myTopic, parts[1] );
204                                 out.println ( "wrote [" + parts[1] + "] to topic [" + parts[0] + "]" );
205                         }
206                         catch ( ConfigDbException e )
207                         {
208                                 out.println ( "Command failed: " + e);
209                         }
210                 }
211
212                 @Override
213                 public void displayHelp ( PrintStream out )
214                 {
215                         out.println ( "read <topic>" );
216                         out.println ( "\tRead config data for a topic." );
217                 }
218         }
219
220         private static class InitSecureTopicCommand implements Command<ConfigToolContext>
221         {
222                 @Override
223                 public String[] getMatches ()
224                 {
225                         return new String[] { "initTopic (\\S*) (\\S*) (\\S*)" };
226                 }
227
228                 @Override
229                 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
230                 {
231                 }
232
233                 @Override
234                 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
235                 {
236                         try
237                         {
238                                 DMaaPKafkaMetaBroker.createTopicEntry ( context.getDb (),
239                                         context.getDb ().parse("/topics"), parts[0], parts[2], parts[1],true );
240                                 out.println ( "Topic [" + parts[0] + "] updated." );
241                         }
242                         catch ( ConfigDbException e )
243                         {
244                                 out.println ( "Command failed: " + e);
245                         }
246                 }
247
248                 @Override
249                 public void displayHelp ( PrintStream out )
250                 {
251                         out.println ( "initTopic <topic> <ownerApiKey> <description>" );
252                 }
253         }
254
255         private static class SetTopicOwnerCommand implements Command<ConfigToolContext>
256         {
257                 @Override
258                 public String[] getMatches ()
259                 {
260                         return new String[] { "setOwner (\\S*) (\\S*)" };
261                 }
262
263                 @Override
264                 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
265                 {
266                 }
267
268                 @Override
269                 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
270                 {
271                         try
272                         {
273                                 final Topic kt = DMaaPKafkaMetaBroker.getKafkaTopicConfig ( context.getDb(),
274                                         context.getDb().parse ( "/topics" ), parts[0] );
275                                 if ( kt != null )
276                                 {
277                                         final String desc = kt.getDescription ();
278
279                                         DMaaPKafkaMetaBroker.createTopicEntry ( context.getDb (),
280                                                 context.getDb ().parse("/topics"), parts[0], desc, parts[1], true );
281                                         out.println ( "Topic [" + parts[0] + "] updated." );
282                                 }
283                                 else
284                                 {
285                                         out.println ( "Topic [" + parts[0] + "] doesn't exist." );
286                                 }
287                         }
288                         catch ( ConfigDbException e )
289                         {
290                                 out.println ( "Command failed: " + e);
291                         }
292                 }
293
294                 @Override
295                 public void displayHelp ( PrintStream out )
296                 {
297                         out.println ( "setOwner <topic> <ownerApiKey>" );
298                 }
299         }
300
301         private static class ListApiKeysCommand implements Command<ConfigToolContext>
302         {
303                 @Override
304                 public String[] getMatches ()
305                 {
306                         return new String[] { "listApiKeys", "listApiKey (\\S*) (\\S*) (\\S*)", "listApiKey (\\S*)" };
307                 }
308
309                 @Override
310                 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
311                 {
312                 }
313
314                 @Override
315                 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
316                 {
317                         try
318                         {
319                                 final ConfigDb db = context.getDb ();
320                                 if ( parts.length == 0 )
321                                 {
322                                         final BaseNsaApiDbImpl<NsaSimpleApiKey> readFrom = new BaseNsaApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory () );
323                                         int count = 0;
324                                         for ( String key : readFrom.loadAllKeys () )
325                                         {
326                                                 out.println ( key );
327                                                 count++;
328                                         }
329                                         out.println ( "" + count + " records." );
330                                 }
331                                 else
332                                 {
333                                         BaseNsaApiDbImpl<NsaSimpleApiKey> readFrom = new BaseNsaApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory () );
334                                         if ( parts.length == 3 )
335                                         {
336                                                 readFrom = new EncryptingApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory (),
337                                                         EncryptingLayer.readSecretKey ( parts[1] ), rrConvertor.base64Decode ( parts[2] ) );
338                                         }
339                                         final NsaSimpleApiKey apikey = readFrom.loadApiKey ( parts[0] );
340                                         if ( apikey == null )
341                                         {
342                                                 out.println ( "Key '" + parts[0] + "' not found." );
343                                         }
344                                         else
345                                         {
346                                                 out.println ( apikey.asJsonObject ().toString () );
347                                         }
348                                 }
349                         }
350                         catch ( ConfigDbException e )
351                         {
352                                 out.println ( "Command failed: " + e.getMessage() );
353                         }
354                         catch ( JSONException e )
355                         {
356                                 out.println ( "Command failed: " + e.getMessage() );
357                         }
358                 }
359
360                 @Override
361                 public void displayHelp ( PrintStream out )
362                 {
363                         out.println ( "listApiKeys" );
364                         out.println ( "listApiKey <key>" );
365                         out.println ( "listApiKey <key> <dbKey> <dbIv>" );
366                 }
367         }
368
369         private static class PutApiCommand implements Command<ConfigToolContext>
370         {
371                 @Override
372                 public String[] getMatches ()
373                 {
374                         return new String[]
375                         {
376                                 // these are <key> <enckey> <encinit> <value>
377                                 "putApiKey (secret) (\\S*) (\\S*) (\\S*) (\\S*)",
378                                 "putApiKey (email) (\\S*) (\\S*) (\\S*) (\\S*)",
379                                 "putApiKey (description) (\\S*) (\\S*) (\\S*) (\\S*)"
380                         };
381                 }
382
383                 @Override
384                 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
385                 {
386                 }
387
388                 @Override
389                 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
390                 {
391                         try
392                         {
393                                 final ConfigDb db = context.getDb ();
394                                 if ( parts.length == 5 )
395                                 {
396                                         final BaseNsaApiDbImpl<NsaSimpleApiKey> apiKeyDb =
397                                                 new EncryptingApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory (),
398                                                         EncryptingLayer.readSecretKey ( parts[2] ), rrConvertor.base64Decode ( parts[3] ) );
399
400                                         final NsaSimpleApiKey apikey = apiKeyDb.loadApiKey ( parts[1] );
401                                         if ( apikey == null )
402                                         {
403                                                 out.println ( "Key '" + parts[1] + "' not found." );
404                                         }
405                                         else
406                                         {
407                                                 if ( parts[0].equalsIgnoreCase ( "secret" ) )
408                                                 {
409                                                         apikey.resetSecret ( parts[4] );
410                                                 }
411                                                 else if ( parts[0].equalsIgnoreCase ( "email" ) )
412                                                 {
413                                                         apikey.setContactEmail ( parts[4] );
414                                                 }
415                                                 else if ( parts[0].equalsIgnoreCase ( "description" ) )
416                                                 {
417                                                         apikey.setDescription ( parts[4] );
418                                                 }
419
420                                                 apiKeyDb.saveApiKey ( apikey );
421                                                 out.println ( apikey.asJsonObject ().toString () );
422                                         }
423                                 }
424                         }
425                         catch ( ConfigDbException e )
426                         {
427                                 out.println ( "Command failed: " + e.getMessage() );
428                         }
429                         catch ( JSONException e )
430                         {
431                                 out.println ( "Command failed: " + e.getMessage() );
432                         }
433                 }
434
435                 @Override
436                 public void displayHelp ( PrintStream out )
437                 {
438                         out.println ( "putApiKey secret <apiKey> <dbKey> <dbIv> <newSecret>" );
439                         out.println ( "putApiKey email <apiKey> <dbKey> <dbIv> <newEmail>" );
440                         out.println ( "putApiKey description <apiKey> <dbKey> <dbIv> <newDescription>" );
441                 }
442         }
443
444         private static class writeApiKeyCommand implements Command<ConfigToolContext>
445         {
446                 @Override
447                 public String[] getMatches ()
448                 {
449                         return new String[]
450                         {
451                                 // <enckey> <encinit> <key> <secret>
452                                 "writeApiKey (\\S*) (\\S*) (\\S*) (\\S*)",
453                         };
454                 }
455
456                 @Override
457                 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
458                 {
459                 }
460
461                 @Override
462                 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
463                 {
464                         try
465                         {
466                                 final ConfigDb db = context.getDb ();
467                                 if ( parts.length == 4 )
468                                 {
469                                         final BaseNsaApiDbImpl<NsaSimpleApiKey> apiKeyDb =
470                                                 new EncryptingApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory (),
471                                                         EncryptingLayer.readSecretKey ( parts[0] ), rrConvertor.base64Decode ( parts[1] ) );
472
473                                         apiKeyDb.deleteApiKey ( parts[2] );
474                                         final NsaSimpleApiKey apikey = apiKeyDb.createApiKey ( parts[2], parts[3] );
475                                         out.println ( apikey.asJsonObject ().toString () );
476                                 }
477                         }
478                         catch ( ConfigDbException e )
479                         {
480                                 out.println ( "Command failed: " + e.getMessage() );
481                         }
482                         catch ( JSONException e )
483                         {
484                                 out.println ( "Command failed: " + e.getMessage() );
485                         }
486                         catch ( KeyExistsException e )
487                         {
488                                 out.println ( "Command failed: " + e.getMessage() );
489                         }
490                 }
491
492                 @Override
493                 public void displayHelp ( PrintStream out )
494                 {
495                         out.println ( "writeApiKey <dbKey> <dbIv> <newApiKey> <newSecret>" );
496                 }
497         }
498
499         private static class EncryptApiKeysCommand implements Command<ConfigToolContext>
500         {
501                 @Override
502                 public String[] getMatches ()
503                 {
504                         return new String[] { "convertApiKeyDb", "convertApiKeyDb (\\S*) (\\S*)" };
505                 }
506
507                 @Override
508                 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
509                 {
510                 }
511
512                 @Override
513                 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
514                 {
515                         try
516                         {
517                                 final String key = parts.length == 2 ? parts[0] : EncryptingLayer.createSecretKey ();
518                                 final String iv = parts.length == 2 ? parts[1] : rrConvertor.base64Encode ( uniqueStringGenerator.createValue ( 16 ) );
519
520                                 // This doesn't do well when the number of API keys is giant...
521                                 if ( parts.length == 0 )
522                                 {
523                                         out.println ( "YOU MUST RECORD THESE VALUES AND USE THEM IN THE SERVER CONFIG" );
524                                         out.println ( "Key: " + key );
525                                         out.println ( " IV: " + iv );
526                                         out.println ( "\n" );
527                                         out.println ( "Call again with key and IV on command line." );
528                                         out.println ( "\n" );
529                                         return; // because otherwise the values get lost 
530                                 }
531
532                                 final ConfigDb db = context.getDb ();
533                                 final BaseNsaApiDbImpl<NsaSimpleApiKey> readFrom = new BaseNsaApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory () );
534                                 final EncryptingApiDbImpl<NsaSimpleApiKey> writeTo = new EncryptingApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory (),
535                                         EncryptingLayer.readSecretKey ( key ), rrConvertor.base64Decode ( iv ) );
536
537                                 int count = 0;
538                                 for ( Entry<String, NsaSimpleApiKey> e : readFrom.loadAllKeyRecords ().entrySet () )
539                                 {
540                                         out.println ( "-------------------------------" );
541                                         out.println ( "Converting " + e.getKey () );
542                                         final String was = e.getValue ().asJsonObject ().toString ();
543                                         out.println ( was );
544
545                                         writeTo.saveApiKey ( e.getValue () );
546                                         count++;
547                                 }
548
549                                 out.println ( "Conversion complete, converted " + count + " records." );
550                         }
551                         catch ( ConfigDbException e )
552                         {
553                                 out.println ( "Command failed: " + e.getMessage() );
554                         }
555                         catch ( NoSuchAlgorithmException e )
556                         {
557                                 out.println ( "Command failed: " + e.getMessage() );
558                         }
559                 }
560
561                 @Override
562                 public void displayHelp ( PrintStream out )
563                 {
564                         out.println ( "convertApiKeyDb" );
565                         out.println ( "\tconvert an API key DB to an encrypted DB and output the cipher details" );
566                 }
567         }
568
569         private static class DecryptApiKeysCommand implements Command<ConfigToolContext>
570         {
571                 @Override
572                 public String[] getMatches ()
573                 {
574                         return new String[] { "revertApiKeyDb (\\S*) (\\S*)" };
575                 }
576
577                 @Override
578                 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
579                 {
580                 }
581
582                 @Override
583                 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
584                 {
585                         try
586                         {
587                                 final String keyStr = parts[0];
588                                 final String iv = parts[1];
589                                 final byte[] ivBytes = rrConvertor.base64Decode ( iv );
590
591                                 final ConfigDb db = context.getDb ();
592                                 final EncryptingApiDbImpl<NsaSimpleApiKey> readFrom = new EncryptingApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory (),
593                                         EncryptingLayer.readSecretKey ( keyStr ), ivBytes );
594                                 final BaseNsaApiDbImpl<NsaSimpleApiKey> writeTo = new BaseNsaApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory () );
595
596                                 int count = 0;
597                                 for ( String apiKey : readFrom.loadAllKeys () )
598                                 {
599                                         out.println ( "Converting " + apiKey );
600                                         final NsaSimpleApiKey record = readFrom.loadApiKey ( apiKey );
601                                         if ( record == null )
602                                         {
603                                                 out.println ( "Couldn't load " + apiKey );
604                                         }
605                                         else
606                                         {
607                                                 writeTo.saveApiKey ( record );
608                                                 count++;
609                                         }
610                                 }
611                                 out.println ( "Conversion complete, converted " + count + " records." );
612                         }
613                         catch ( ConfigDbException e )
614                         {
615                                 out.println ( "Command failed: " + e.getMessage() );
616                         }
617                 }
618
619                 @Override
620                 public void displayHelp ( PrintStream out )
621                 {
622                         out.println ( "revertApiKeyDb <keyBase64> <ivBase64>" );
623                         out.println ( "\trevert an API key DB to a deencrypted DB" );
624                 }
625         }
626
627         private static class NodeFetchCommand implements Command<ConfigToolContext>
628         {
629                 @Override
630                 public String[] getMatches ()
631                 {
632                         return new String[] { "node (\\S*)" };
633                 }
634
635                 @Override
636                 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
637                 {
638                 }
639
640                 @Override
641                 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
642                 {
643                         try
644                         {
645                                 final String node = parts[0];
646
647                                 final ConfigDb db = context.getDb ();
648                                 final ConfigPath cp = db.parse ( node );
649
650                                 boolean doneOne = false;
651                                 for ( ConfigPath child : db.loadChildrenNames ( cp ) )
652                                 {
653                                         out.println ( "\t- " + child.getName () );
654                                         doneOne = true;
655                                 }
656                                 if ( doneOne )
657                                 {
658                                         out.println ();
659                                 }
660                                 else
661                                 {
662                                         out.println ( "(No child nodes of '" + node + "')" );
663                                 }
664
665                                 final String val = db.load ( cp );
666                                 if ( val == null )
667                                 {
668                                         out.println ( "(No data at '" + node + "')" );
669                                 }
670                                 else
671                                 {
672                                         out.println ( val );
673                                 }
674                         }
675                         catch ( ConfigDbException e )
676                         {
677                                 out.println ( "Command failed: " + e.getMessage() );
678                         }
679                         catch ( IllegalArgumentException e )
680                         {
681                                 out.println ( "Command failed: " + e.getMessage() );
682                         }
683                 }
684
685                 @Override
686                 public void displayHelp ( PrintStream out )
687                 {
688                         out.println ( "node <nodeName>" );
689                         out.println ( "\tread a config db node" );
690                 }
691         }
692
693         private static class DropOldConsumerGroupsCommand implements Command<ConfigToolContext>
694         {
695                 private final long kMaxRemovals = 500;
696                 
697                 @Override
698                 public String[] getMatches ()
699                 {
700                         return new String[] { "(dropOldConsumers) (\\S*)", "(showOldConsumers) (\\S*)" };
701                 }
702
703                 @Override
704                 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
705                 {
706                 }
707
708                 @Override
709                 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
710                 {
711                         try
712                         {
713                                 final boolean runDrops = parts[0].equalsIgnoreCase ( "dropOldConsumers" );
714                                 final String maxAgeInDaysStr = parts[1];
715                                 final int maxAgeInDays = Integer.parseInt ( maxAgeInDaysStr );
716                                 final long oldestEpochSecs = ( NsaClock.now () / 1000 ) - ( 24 * 60 * 60 * maxAgeInDays );
717
718                                 out.println ( "Dropping consumer groups older than " + new Date ( oldestEpochSecs * 1000 ) );
719
720                                 final ConfigDb db = context.getDb ();
721
722                                 // kafka updates consumer partition records in ZK each time a message
723                                 // is served. we can determine which consumers are old based on a lack
724                                 // of update to the partition entries
725                                 // (see https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper)
726
727                                 // kafka only works with ZK, and our configDb was constructed with a non-kafka
728                                 // root node. We have to switch it to get to the right content...
729                                 if ( ! ( db instanceof ZkConfigDb ) )
730                                 {
731                                         throw new ConfigDbException ( "You can only show/drop old consumers against a ZK config db." );
732                                 }
733
734                                 final ZkConfigDb newZkDb = new ZkConfigDb ( context.getConnectionString (), "" );
735                                 long cgCount = 0;
736
737                                 final LinkedList<ConfigPath> removals = new LinkedList<ConfigPath> ();
738                                 for ( ConfigPath consumerGroupName : newZkDb.loadChildrenNames ( newZkDb.parse ( "/consumers" ) ) )
739                                 {
740                                         cgCount++;
741                                         if ( cgCount % 500 == 0 )
742                                         {
743                                                 out.println ( "" + cgCount + " groups examined" );
744                                         }
745
746                                         boolean foundAnything = false;
747                                         boolean foundRecentUse = false;
748                                         long mostRecent = -1;
749
750                                         // each consumer group has an "offsets" entry, which contains 0 or more topic entries.
751                                         // each topic contains partition nodes.
752                                         for ( ConfigPath topic : newZkDb.loadChildrenNames ( consumerGroupName.getChild ( "offsets" ) ) )
753                                         {
754                                                 for ( ConfigPath offset : newZkDb.loadChildrenNames ( topic ) )
755                                                 {
756                                                         foundAnything = true;
757
758                                                         final long modTime = newZkDb.getLastModificationTime ( offset );
759                                                         mostRecent = Math.max ( mostRecent, modTime );
760
761                                                         foundRecentUse = ( modTime > oldestEpochSecs );
762                                                         if ( foundRecentUse ) break;
763                                                 }
764                                                 if ( foundRecentUse ) break;
765                                         }
766
767                                         // decide if this consumer group is old
768                                         out.println ( "Group " + consumerGroupName.getName () + " was most recently used " + new Date ( mostRecent*1000 ) );
769                                         if ( foundAnything && !foundRecentUse )
770                                         {
771                                                 removals.add ( consumerGroupName );
772                                         }
773
774                                         if ( removals.size () >= kMaxRemovals )
775                                         {
776                                                 break;
777                                         }
778                                 }
779
780                                 // removals
781                                 for ( ConfigPath consumerGroupName : removals )
782                                 {
783                                         out.println ( "Group " + consumerGroupName.getName () + " has no recent activity." );
784                                         if ( runDrops )
785                                         {
786                                                 out.println ( "Removing group " + consumerGroupName.getName () + "..." );
787                                                 newZkDb.clear ( consumerGroupName );
788                                         }
789                                 }
790                         }
791                         catch ( ConfigDbException e )
792                         {
793                                 out.println ( "Command failed: " + e.getMessage() );
794                         }
795                         catch ( NumberFormatException e )
796                         {
797                                 out.println ( "Command failed: " + e.getMessage() );
798                         }
799                         catch ( JSONException e )
800                         {
801                                 out.println ( "Command failed: " + e.getMessage() );
802                         }
803                 }
804
805                 @Override
806                 public void displayHelp ( PrintStream out )
807                 {
808                         out.println ( "showOldConsumers <minAgeInDays>" );
809                         out.println ( "dropOldConsumers <minAgeInDays>" );
810                         out.println ( "\tDrop (or just show) any consumer group that has been inactive longer than <minAgeInDays> days." );
811                         out.println ();
812                         out.println ( "\tTo be safe, <minAgeInDays> should be much higher than the maximum storage time on the Kafka topics." );
813                         out.println ( "\tA very old consumer will potentially miss messages, but will resume at the oldest message, while a" );
814                         out.println ( "\tdeleted consumer will start at the current message if it ever comes back." );
815                         out.println ();
816                         out.println ( "\tNote that show/drops are limited to " + kMaxRemovals + " records per invocation." );
817                 }
818         }
819 }