update the package name
[dmaap/messagerouter/messageservice.git] / src / main / java / org / onap / dmaap / tools / ConfigTool.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22  package org.onap.dmaap.tools;
23
24 import java.io.IOException;
25 import java.io.PrintStream;
26 import java.security.NoSuchAlgorithmException;
27 import java.util.Date;
28 import java.util.LinkedList;
29 import java.util.Map.Entry;
30
31 import org.json.JSONException;
32
33 import com.att.nsa.apiServer.CommonServlet;
34 import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaMetaBroker;
35 import org.onap.dmaap.dmf.mr.metabroker.Topic;
36 import com.att.nsa.cmdtool.Command;
37 import com.att.nsa.cmdtool.CommandLineTool;
38 import com.att.nsa.cmdtool.CommandNotReadyException;
39 import com.att.nsa.configs.ConfigDb;
40 import com.att.nsa.configs.ConfigDbException;
41 import com.att.nsa.configs.ConfigPath;
42 import com.att.nsa.configs.confimpl.EncryptingLayer;
43 import com.att.nsa.configs.confimpl.ZkConfigDb;
44 import com.att.nsa.drumlin.till.data.rrConvertor;
45 import com.att.nsa.drumlin.till.data.uniqueStringGenerator;
46 import com.att.nsa.drumlin.till.nv.impl.nvWriteableTable;
47 import com.att.nsa.security.db.BaseNsaApiDbImpl;
48 import com.att.nsa.security.db.EncryptingApiDbImpl;
49 import com.att.nsa.security.db.NsaApiDb.KeyExistsException;
50 import com.att.nsa.security.db.simple.NsaSimpleApiKey;
51 import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory;
52 import com.att.nsa.util.NsaClock;
53
54 public class ConfigTool extends CommandLineTool<ConfigToolContext>
55 {
56         protected ConfigTool ()
57         {
58                 super ( "Cambria API Config Tool", "cambriaConfig> " );
59
60                 super.registerCommand ( new ListTopicCommand  () );
61                 super.registerCommand ( new WriteTopicCommand  () );
62                 super.registerCommand ( new ReadTopicCommand  () );
63                 super.registerCommand ( new SetTopicOwnerCommand () );
64                 super.registerCommand ( new InitSecureTopicCommand () );
65                 super.registerCommand ( new ListApiKeysCommand () );
66                 super.registerCommand ( new PutApiCommand () );
67                 super.registerCommand ( new writeApiKeyCommand () );
68                 super.registerCommand ( new EncryptApiKeysCommand () );
69                 super.registerCommand ( new DecryptApiKeysCommand () );
70                 super.registerCommand ( new NodeFetchCommand () );
71                 super.registerCommand ( new DropOldConsumerGroupsCommand () );
72         }
73
74         public static void main ( String[] args ) throws IOException
75         {
76                 final String connStr = args.length>0 ? args[0] : "localhost:2181"; 
77                 final ConfigDb db = new ZkConfigDb (
78                         connStr,
79                         args.length>1 ? args[1] : CommonServlet.getDefaultZkRoot ( "cambria" )
80                 );
81
82                 final ConfigToolContext context = new ConfigToolContext ( db, connStr, new nvWriteableTable() );
83                 final ConfigTool ct = new ConfigTool ();
84                 ct.runFromMain ( args, context );
85         }
86
87         private static class ListTopicCommand implements Command<ConfigToolContext>
88         {
89                 @Override
90                 public String[] getMatches ()
91                 {
92                         return new String[] { "topics", "list (\\S*)" };
93                 }
94
95                 @Override
96                 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
97                 {
98                 }
99
100                 @Override
101                 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
102                 {
103                         try
104                         {
105                                 final ConfigDb db = context.getDb();
106                                 final ConfigPath base = db.parse ( "/topics" );
107                                 
108                                 if ( parts.length > 0 )
109                                 {
110                                         final ConfigPath myTopic = base.getChild ( parts[0] );
111                                         final String data = db.load ( myTopic );
112                                         if ( data != null )
113                                         {
114                                                 out.println ( data );
115                                         }
116                                         else
117                                         {
118                                                 out.println ( "No topic [" + parts[0] + "]" );
119                                         }
120                                 }
121                                 else
122                                 {
123                                         for ( ConfigPath child : db.loadChildrenNames ( base ) )
124                                         {
125                                                 out.println ( child.getName () );
126                                         }
127                                 }
128                         }
129                         catch ( ConfigDbException e )
130                         {
131                                 out.println ( "Command failed: " + e.getMessage() );
132                         }
133                 }
134
135                 @Override
136                 public void displayHelp ( PrintStream out )
137                 {
138                         out.println ( "topics" );
139                         out.println ( "list <topic>" );
140                 }
141         }
142
143         private static class WriteTopicCommand implements Command<ConfigToolContext>
144         {
145                 @Override
146                 public String[] getMatches ()
147                 {
148                         return new String[] { "write (\\S*) (\\S*)" };
149                 }
150
151                 @Override
152                 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
153                 {
154                 }
155
156                 @Override
157                 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
158                 {
159                         try
160                         {
161                                 final ConfigDb db = context.getDb();
162                                 final ConfigPath base = db.parse ( "/topics" );
163                                 final ConfigPath myTopic = base.getChild ( parts[0] );
164                                 db.store ( myTopic, parts[1] );
165                                 out.println ( "wrote [" + parts[1] + "] to topic [" + parts[0] + "]" );
166                         }
167                         catch ( ConfigDbException e )
168                         {
169                                 out.println ( "Command failed: " + e.getMessage() );
170                         }
171                 }
172
173                 @Override
174                 public void displayHelp ( PrintStream out )
175                 {
176                         out.println ( "write <topic> <string>" );
177                         out.println ( "\tBe careful with this. You can write data that's not compatible with Cambria's config db." );
178                 }
179         }
180
181         private static class ReadTopicCommand implements Command<ConfigToolContext>
182         {
183                 @Override
184                 public String[] getMatches ()
185                 {
186                         return new String[] { "read (\\S*)" };
187                 }
188
189                 @Override
190                 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
191                 {
192                 }
193
194                 @Override
195                 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
196                 {
197                         try
198                         {
199                                 final ConfigDb db = context.getDb();
200                                 final ConfigPath base = db.parse ( "/topics" );
201                                 final ConfigPath myTopic = base.getChild ( parts[0] );
202                                 db.store ( myTopic, parts[1] );
203                                 out.println ( "wrote [" + parts[1] + "] to topic [" + parts[0] + "]" );
204                         }
205                         catch ( ConfigDbException e )
206                         {
207                                 out.println ( "Command failed: " + e.getMessage() );
208                         }
209                 }
210
211                 @Override
212                 public void displayHelp ( PrintStream out )
213                 {
214                         out.println ( "read <topic>" );
215                         out.println ( "\tRead config data for a topic." );
216                 }
217         }
218
219         private static class InitSecureTopicCommand implements Command<ConfigToolContext>
220         {
221                 @Override
222                 public String[] getMatches ()
223                 {
224                         return new String[] { "initTopic (\\S*) (\\S*) (\\S*)" };
225                 }
226
227                 @Override
228                 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
229                 {
230                 }
231
232                 @Override
233                 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
234                 {
235                         try
236                         {
237                                 DMaaPKafkaMetaBroker.createTopicEntry ( context.getDb (),
238                                         context.getDb ().parse("/topics"), parts[0], parts[2], parts[1],true );
239                                 out.println ( "Topic [" + parts[0] + "] updated." );
240                         }
241                         catch ( ConfigDbException e )
242                         {
243                                 out.println ( "Command failed: " + e.getMessage () );
244                         }
245                 }
246
247                 @Override
248                 public void displayHelp ( PrintStream out )
249                 {
250                         out.println ( "initTopic <topic> <ownerApiKey> <description>" );
251                 }
252         }
253
254         private static class SetTopicOwnerCommand implements Command<ConfigToolContext>
255         {
256                 @Override
257                 public String[] getMatches ()
258                 {
259                         return new String[] { "setOwner (\\S*) (\\S*)" };
260                 }
261
262                 @Override
263                 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
264                 {
265                 }
266
267                 @Override
268                 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
269                 {
270                         try
271                         {
272                                 final Topic kt = DMaaPKafkaMetaBroker.getKafkaTopicConfig ( context.getDb(),
273                                         context.getDb().parse ( "/topics" ), parts[0] );
274                                 if ( kt != null )
275                                 {
276                                         final String desc = kt.getDescription ();
277
278                                         DMaaPKafkaMetaBroker.createTopicEntry ( context.getDb (),
279                                                 context.getDb ().parse("/topics"), parts[0], desc, parts[1], true );
280                                         out.println ( "Topic [" + parts[0] + "] updated." );
281                                 }
282                                 else
283                                 {
284                                         out.println ( "Topic [" + parts[0] + "] doesn't exist." );
285                                 }
286                         }
287                         catch ( ConfigDbException e )
288                         {
289                                 out.println ( "Command failed: " + e.getMessage () );
290                         }
291                 }
292
293                 @Override
294                 public void displayHelp ( PrintStream out )
295                 {
296                         out.println ( "setOwner <topic> <ownerApiKey>" );
297                 }
298         }
299
300         private static class ListApiKeysCommand implements Command<ConfigToolContext>
301         {
302                 @Override
303                 public String[] getMatches ()
304                 {
305                         return new String[] { "listApiKeys", "listApiKey (\\S*) (\\S*) (\\S*)", "listApiKey (\\S*)" };
306                 }
307
308                 @Override
309                 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
310                 {
311                 }
312
313                 @Override
314                 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
315                 {
316                         try
317                         {
318                                 final ConfigDb db = context.getDb ();
319                                 if ( parts.length == 0 )
320                                 {
321                                         final BaseNsaApiDbImpl<NsaSimpleApiKey> readFrom = new BaseNsaApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory () );
322                                         int count = 0;
323                                         for ( String key : readFrom.loadAllKeys () )
324                                         {
325                                                 out.println ( key );
326                                                 count++;
327                                         }
328                                         out.println ( "" + count + " records." );
329                                 }
330                                 else
331                                 {
332                                         BaseNsaApiDbImpl<NsaSimpleApiKey> readFrom = new BaseNsaApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory () );
333                                         if ( parts.length == 3 )
334                                         {
335                                                 readFrom = new EncryptingApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory (),
336                                                         EncryptingLayer.readSecretKey ( parts[1] ), rrConvertor.base64Decode ( parts[2] ) );
337                                         }
338                                         final NsaSimpleApiKey apikey = readFrom.loadApiKey ( parts[0] );
339                                         if ( apikey == null )
340                                         {
341                                                 out.println ( "Key '" + parts[0] + "' not found." );
342                                         }
343                                         else
344                                         {
345                                                 out.println ( apikey.asJsonObject ().toString () );
346                                         }
347                                 }
348                         }
349                         catch ( ConfigDbException e )
350                         {
351                                 out.println ( "Command failed: " + e.getMessage() );
352                         }
353                         catch ( JSONException e )
354                         {
355                                 out.println ( "Command failed: " + e.getMessage() );
356                         }
357                 }
358
359                 @Override
360                 public void displayHelp ( PrintStream out )
361                 {
362                         out.println ( "listApiKeys" );
363                         out.println ( "listApiKey <key>" );
364                         out.println ( "listApiKey <key> <dbKey> <dbIv>" );
365                 }
366         }
367
368         private static class PutApiCommand implements Command<ConfigToolContext>
369         {
370                 @Override
371                 public String[] getMatches ()
372                 {
373                         return new String[]
374                         {
375                                 // these are <key> <enckey> <encinit> <value>
376                                 "putApiKey (secret) (\\S*) (\\S*) (\\S*) (\\S*)",
377                                 "putApiKey (email) (\\S*) (\\S*) (\\S*) (\\S*)",
378                                 "putApiKey (description) (\\S*) (\\S*) (\\S*) (\\S*)"
379                         };
380                 }
381
382                 @Override
383                 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
384                 {
385                 }
386
387                 @Override
388                 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
389                 {
390                         try
391                         {
392                                 final ConfigDb db = context.getDb ();
393                                 if ( parts.length == 5 )
394                                 {
395                                         final BaseNsaApiDbImpl<NsaSimpleApiKey> apiKeyDb =
396                                                 new EncryptingApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory (),
397                                                         EncryptingLayer.readSecretKey ( parts[2] ), rrConvertor.base64Decode ( parts[3] ) );
398
399                                         final NsaSimpleApiKey apikey = apiKeyDb.loadApiKey ( parts[1] );
400                                         if ( apikey == null )
401                                         {
402                                                 out.println ( "Key '" + parts[1] + "' not found." );
403                                         }
404                                         else
405                                         {
406                                                 if ( parts[0].equalsIgnoreCase ( "secret" ) )
407                                                 {
408                                                         apikey.resetSecret ( parts[4] );
409                                                 }
410                                                 else if ( parts[0].equalsIgnoreCase ( "email" ) )
411                                                 {
412                                                         apikey.setContactEmail ( parts[4] );
413                                                 }
414                                                 else if ( parts[0].equalsIgnoreCase ( "description" ) )
415                                                 {
416                                                         apikey.setDescription ( parts[4] );
417                                                 }
418
419                                                 apiKeyDb.saveApiKey ( apikey );
420                                                 out.println ( apikey.asJsonObject ().toString () );
421                                         }
422                                 }
423                         }
424                         catch ( ConfigDbException e )
425                         {
426                                 out.println ( "Command failed: " + e.getMessage() );
427                         }
428                         catch ( JSONException e )
429                         {
430                                 out.println ( "Command failed: " + e.getMessage() );
431                         }
432                 }
433
434                 @Override
435                 public void displayHelp ( PrintStream out )
436                 {
437                         out.println ( "putApiKey secret <apiKey> <dbKey> <dbIv> <newSecret>" );
438                         out.println ( "putApiKey email <apiKey> <dbKey> <dbIv> <newEmail>" );
439                         out.println ( "putApiKey description <apiKey> <dbKey> <dbIv> <newDescription>" );
440                 }
441         }
442
443         private static class writeApiKeyCommand implements Command<ConfigToolContext>
444         {
445                 @Override
446                 public String[] getMatches ()
447                 {
448                         return new String[]
449                         {
450                                 // <enckey> <encinit> <key> <secret>
451                                 "writeApiKey (\\S*) (\\S*) (\\S*) (\\S*)",
452                         };
453                 }
454
455                 @Override
456                 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
457                 {
458                 }
459
460                 @Override
461                 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
462                 {
463                         try
464                         {
465                                 final ConfigDb db = context.getDb ();
466                                 if ( parts.length == 4 )
467                                 {
468                                         final BaseNsaApiDbImpl<NsaSimpleApiKey> apiKeyDb =
469                                                 new EncryptingApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory (),
470                                                         EncryptingLayer.readSecretKey ( parts[0] ), rrConvertor.base64Decode ( parts[1] ) );
471
472                                         apiKeyDb.deleteApiKey ( parts[2] );
473                                         final NsaSimpleApiKey apikey = apiKeyDb.createApiKey ( parts[2], parts[3] );
474                                         out.println ( apikey.asJsonObject ().toString () );
475                                 }
476                         }
477                         catch ( ConfigDbException e )
478                         {
479                                 out.println ( "Command failed: " + e.getMessage() );
480                         }
481                         catch ( JSONException e )
482                         {
483                                 out.println ( "Command failed: " + e.getMessage() );
484                         }
485                         catch ( KeyExistsException e )
486                         {
487                                 out.println ( "Command failed: " + e.getMessage() );
488                         }
489                 }
490
491                 @Override
492                 public void displayHelp ( PrintStream out )
493                 {
494                         out.println ( "writeApiKey <dbKey> <dbIv> <newApiKey> <newSecret>" );
495                 }
496         }
497
498         private static class EncryptApiKeysCommand implements Command<ConfigToolContext>
499         {
500                 @Override
501                 public String[] getMatches ()
502                 {
503                         return new String[] { "convertApiKeyDb", "convertApiKeyDb (\\S*) (\\S*)" };
504                 }
505
506                 @Override
507                 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
508                 {
509                 }
510
511                 @Override
512                 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
513                 {
514                         try
515                         {
516                                 final String key = parts.length == 2 ? parts[0] : EncryptingLayer.createSecretKey ();
517                                 final String iv = parts.length == 2 ? parts[1] : rrConvertor.base64Encode ( uniqueStringGenerator.createValue ( 16 ) );
518
519                                 // This doesn't do well when the number of API keys is giant...
520                                 if ( parts.length == 0 )
521                                 {
522                                         out.println ( "YOU MUST RECORD THESE VALUES AND USE THEM IN THE SERVER CONFIG" );
523                                         out.println ( "Key: " + key );
524                                         out.println ( " IV: " + iv );
525                                         out.println ( "\n" );
526                                         out.println ( "Call again with key and IV on command line." );
527                                         out.println ( "\n" );
528                                         return; // because otherwise the values get lost 
529                                 }
530
531                                 final ConfigDb db = context.getDb ();
532                                 final BaseNsaApiDbImpl<NsaSimpleApiKey> readFrom = new BaseNsaApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory () );
533                                 final EncryptingApiDbImpl<NsaSimpleApiKey> writeTo = new EncryptingApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory (),
534                                         EncryptingLayer.readSecretKey ( key ), rrConvertor.base64Decode ( iv ) );
535
536                                 int count = 0;
537                                 for ( Entry<String, NsaSimpleApiKey> e : readFrom.loadAllKeyRecords ().entrySet () )
538                                 {
539                                         out.println ( "-------------------------------" );
540                                         out.println ( "Converting " + e.getKey () );
541                                         final String was = e.getValue ().asJsonObject ().toString ();
542                                         out.println ( was );
543
544                                         writeTo.saveApiKey ( e.getValue () );
545                                         count++;
546                                 }
547
548                                 out.println ( "Conversion complete, converted " + count + " records." );
549                         }
550                         catch ( ConfigDbException e )
551                         {
552                                 out.println ( "Command failed: " + e.getMessage() );
553                         }
554                         catch ( NoSuchAlgorithmException e )
555                         {
556                                 out.println ( "Command failed: " + e.getMessage() );
557                         }
558                 }
559
560                 @Override
561                 public void displayHelp ( PrintStream out )
562                 {
563                         out.println ( "convertApiKeyDb" );
564                         out.println ( "\tconvert an API key DB to an encrypted DB and output the cipher details" );
565                 }
566         }
567
568         private static class DecryptApiKeysCommand implements Command<ConfigToolContext>
569         {
570                 @Override
571                 public String[] getMatches ()
572                 {
573                         return new String[] { "revertApiKeyDb (\\S*) (\\S*)" };
574                 }
575
576                 @Override
577                 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
578                 {
579                 }
580
581                 @Override
582                 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
583                 {
584                         try
585                         {
586                                 final String keyStr = parts[0];
587                                 final String iv = parts[1];
588                                 final byte[] ivBytes = rrConvertor.base64Decode ( iv );
589
590                                 final ConfigDb db = context.getDb ();
591                                 final EncryptingApiDbImpl<NsaSimpleApiKey> readFrom = new EncryptingApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory (),
592                                         EncryptingLayer.readSecretKey ( keyStr ), ivBytes );
593                                 final BaseNsaApiDbImpl<NsaSimpleApiKey> writeTo = new BaseNsaApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory () );
594
595                                 int count = 0;
596                                 for ( String apiKey : readFrom.loadAllKeys () )
597                                 {
598                                         out.println ( "Converting " + apiKey );
599                                         final NsaSimpleApiKey record = readFrom.loadApiKey ( apiKey );
600                                         if ( record == null )
601                                         {
602                                                 out.println ( "Couldn't load " + apiKey );
603                                         }
604                                         else
605                                         {
606                                                 writeTo.saveApiKey ( record );
607                                                 count++;
608                                         }
609                                 }
610                                 out.println ( "Conversion complete, converted " + count + " records." );
611                         }
612                         catch ( ConfigDbException e )
613                         {
614                                 out.println ( "Command failed: " + e.getMessage() );
615                         }
616                 }
617
618                 @Override
619                 public void displayHelp ( PrintStream out )
620                 {
621                         out.println ( "revertApiKeyDb <keyBase64> <ivBase64>" );
622                         out.println ( "\trevert an API key DB to a deencrypted DB" );
623                 }
624         }
625
626         private static class NodeFetchCommand implements Command<ConfigToolContext>
627         {
628                 @Override
629                 public String[] getMatches ()
630                 {
631                         return new String[] { "node (\\S*)" };
632                 }
633
634                 @Override
635                 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
636                 {
637                 }
638
639                 @Override
640                 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
641                 {
642                         try
643                         {
644                                 final String node = parts[0];
645
646                                 final ConfigDb db = context.getDb ();
647                                 final ConfigPath cp = db.parse ( node );
648
649                                 boolean doneOne = false;
650                                 for ( ConfigPath child : db.loadChildrenNames ( cp ) )
651                                 {
652                                         out.println ( "\t- " + child.getName () );
653                                         doneOne = true;
654                                 }
655                                 if ( doneOne )
656                                 {
657                                         out.println ();
658                                 }
659                                 else
660                                 {
661                                         out.println ( "(No child nodes of '" + node + "')" );
662                                 }
663
664                                 final String val = db.load ( cp );
665                                 if ( val == null )
666                                 {
667                                         out.println ( "(No data at '" + node + "')" );
668                                 }
669                                 else
670                                 {
671                                         out.println ( val );
672                                 }
673                         }
674                         catch ( ConfigDbException e )
675                         {
676                                 out.println ( "Command failed: " + e.getMessage() );
677                         }
678                         catch ( IllegalArgumentException e )
679                         {
680                                 out.println ( "Command failed: " + e.getMessage() );
681                         }
682                 }
683
684                 @Override
685                 public void displayHelp ( PrintStream out )
686                 {
687                         out.println ( "node <nodeName>" );
688                         out.println ( "\tread a config db node" );
689                 }
690         }
691
692         private static class DropOldConsumerGroupsCommand implements Command<ConfigToolContext>
693         {
694                 private final long kMaxRemovals = 500;
695                 
696                 @Override
697                 public String[] getMatches ()
698                 {
699                         return new String[] { "(dropOldConsumers) (\\S*)", "(showOldConsumers) (\\S*)" };
700                 }
701
702                 @Override
703                 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
704                 {
705                 }
706
707                 @Override
708                 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
709                 {
710                         try
711                         {
712                                 final boolean runDrops = parts[0].equalsIgnoreCase ( "dropOldConsumers" );
713                                 final String maxAgeInDaysStr = parts[1];
714                                 final int maxAgeInDays = Integer.parseInt ( maxAgeInDaysStr );
715                                 final long oldestEpochSecs = ( NsaClock.now () / 1000 ) - ( 24 * 60 * 60 * maxAgeInDays );
716
717                                 out.println ( "Dropping consumer groups older than " + new Date ( oldestEpochSecs * 1000 ) );
718
719                                 final ConfigDb db = context.getDb ();
720
721                                 // kafka updates consumer partition records in ZK each time a message
722                                 // is served. we can determine which consumers are old based on a lack
723                                 // of update to the partition entries
724                                 // (see https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper)
725
726                                 // kafka only works with ZK, and our configDb was constructed with a non-kafka
727                                 // root node. We have to switch it to get to the right content...
728                                 if ( ! ( db instanceof ZkConfigDb ) )
729                                 {
730                                         throw new ConfigDbException ( "You can only show/drop old consumers against a ZK config db." );
731                                 }
732
733                                 final ZkConfigDb newZkDb = new ZkConfigDb ( context.getConnectionString (), "" );
734                                 long cgCount = 0;
735
736                                 final LinkedList<ConfigPath> removals = new LinkedList<ConfigPath> ();
737                                 for ( ConfigPath consumerGroupName : newZkDb.loadChildrenNames ( newZkDb.parse ( "/consumers" ) ) )
738                                 {
739                                         cgCount++;
740                                         if ( cgCount % 500 == 0 )
741                                         {
742                                                 out.println ( "" + cgCount + " groups examined" );
743                                         }
744
745                                         boolean foundAnything = false;
746                                         boolean foundRecentUse = false;
747                                         long mostRecent = -1;
748
749                                         // each consumer group has an "offsets" entry, which contains 0 or more topic entries.
750                                         // each topic contains partition nodes.
751                                         for ( ConfigPath topic : newZkDb.loadChildrenNames ( consumerGroupName.getChild ( "offsets" ) ) )
752                                         {
753                                                 for ( ConfigPath offset : newZkDb.loadChildrenNames ( topic ) )
754                                                 {
755                                                         foundAnything = true;
756
757                                                         final long modTime = newZkDb.getLastModificationTime ( offset );
758                                                         mostRecent = Math.max ( mostRecent, modTime );
759
760                                                         foundRecentUse = ( modTime > oldestEpochSecs );
761                                                         if ( foundRecentUse ) break;
762                                                 }
763                                                 if ( foundRecentUse ) break;
764                                         }
765
766                                         // decide if this consumer group is old
767                                         out.println ( "Group " + consumerGroupName.getName () + " was most recently used " + new Date ( mostRecent*1000 ) );
768                                         if ( foundAnything && !foundRecentUse )
769                                         {
770                                                 removals.add ( consumerGroupName );
771                                         }
772
773                                         if ( removals.size () >= kMaxRemovals )
774                                         {
775                                                 break;
776                                         }
777                                 }
778
779                                 // removals
780                                 for ( ConfigPath consumerGroupName : removals )
781                                 {
782                                         out.println ( "Group " + consumerGroupName.getName () + " has no recent activity." );
783                                         if ( runDrops )
784                                         {
785                                                 out.println ( "Removing group " + consumerGroupName.getName () + "..." );
786                                                 newZkDb.clear ( consumerGroupName );
787                                         }
788                                 }
789                         }
790                         catch ( ConfigDbException e )
791                         {
792                                 out.println ( "Command failed: " + e.getMessage() );
793                         }
794                         catch ( NumberFormatException e )
795                         {
796                                 out.println ( "Command failed: " + e.getMessage() );
797                         }
798                         catch ( JSONException e )
799                         {
800                                 out.println ( "Command failed: " + e.getMessage() );
801                         }
802                 }
803
804                 @Override
805                 public void displayHelp ( PrintStream out )
806                 {
807                         out.println ( "showOldConsumers <minAgeInDays>" );
808                         out.println ( "dropOldConsumers <minAgeInDays>" );
809                         out.println ( "\tDrop (or just show) any consumer group that has been inactive longer than <minAgeInDays> days." );
810                         out.println ();
811                         out.println ( "\tTo be safe, <minAgeInDays> should be much higher than the maximum storage time on the Kafka topics." );
812                         out.println ( "\tA very old consumer will potentially miss messages, but will resume at the oldest message, while a" );
813                         out.println ( "\tdeleted consumer will start at the current message if it ever comes back." );
814                         out.println ();
815                         out.println ( "\tNote that show/drops are limited to " + kMaxRemovals + " records per invocation." );
816                 }
817         }
818 }