1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.nsa.dmaap.tools;
24 import java.io.IOException;
25 import java.io.PrintStream;
26 import java.security.NoSuchAlgorithmException;
27 import java.util.Date;
28 import java.util.LinkedList;
29 import java.util.Map.Entry;
31 import org.json.JSONException;
33 import com.att.nsa.apiServer.CommonServlet;
34 import com.att.nsa.cambria.beans.DMaaPKafkaMetaBroker;
35 import com.att.nsa.cambria.metabroker.Topic;
36 import com.att.nsa.cmdtool.Command;
37 import com.att.nsa.cmdtool.CommandLineTool;
38 import com.att.nsa.cmdtool.CommandNotReadyException;
39 import com.att.nsa.configs.ConfigDb;
40 import com.att.nsa.configs.ConfigDbException;
41 import com.att.nsa.configs.ConfigPath;
42 import com.att.nsa.configs.confimpl.EncryptingLayer;
43 import com.att.nsa.configs.confimpl.ZkConfigDb;
44 import com.att.nsa.drumlin.till.data.rrConvertor;
45 import com.att.nsa.drumlin.till.data.uniqueStringGenerator;
46 import com.att.nsa.drumlin.till.nv.impl.nvWriteableTable;
47 import com.att.nsa.security.db.BaseNsaApiDbImpl;
48 import com.att.nsa.security.db.EncryptingApiDbImpl;
49 import com.att.nsa.security.db.NsaApiDb.KeyExistsException;
50 import com.att.nsa.security.db.simple.NsaSimpleApiKey;
51 import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory;
52 import com.att.nsa.util.NsaClock;
54 public class ConfigTool extends CommandLineTool<ConfigToolContext>
56 protected ConfigTool ()
58 super ( "Cambria API Config Tool", "cambriaConfig> " );
60 super.registerCommand ( new ListTopicCommand () );
61 super.registerCommand ( new WriteTopicCommand () );
62 super.registerCommand ( new ReadTopicCommand () );
63 super.registerCommand ( new SetTopicOwnerCommand () );
64 super.registerCommand ( new InitSecureTopicCommand () );
65 super.registerCommand ( new ListApiKeysCommand () );
66 super.registerCommand ( new PutApiCommand () );
67 super.registerCommand ( new writeApiKeyCommand () );
68 super.registerCommand ( new EncryptApiKeysCommand () );
69 super.registerCommand ( new DecryptApiKeysCommand () );
70 super.registerCommand ( new NodeFetchCommand () );
71 super.registerCommand ( new DropOldConsumerGroupsCommand () );
74 public static void main ( String[] args ) throws IOException
76 final String connStr = args.length>0 ? args[0] : "localhost:2181";
77 final ConfigDb db = new ZkConfigDb (
79 args.length>1 ? args[1] : CommonServlet.getDefaultZkRoot ( "cambria" )
82 final ConfigToolContext context = new ConfigToolContext ( db, connStr, new nvWriteableTable() );
83 final ConfigTool ct = new ConfigTool ();
84 ct.runFromMain ( args, context );
87 private static class ListTopicCommand implements Command<ConfigToolContext>
90 public String[] getMatches ()
92 return new String[] { "topics", "list (\\S*)" };
96 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
101 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
105 final ConfigDb db = context.getDb();
106 final ConfigPath base = db.parse ( "/topics" );
108 if ( parts.length > 0 )
110 final ConfigPath myTopic = base.getChild ( parts[0] );
111 final String data = db.load ( myTopic );
114 out.println ( data );
118 out.println ( "No topic [" + parts[0] + "]" );
123 for ( ConfigPath child : db.loadChildrenNames ( base ) )
125 out.println ( child.getName () );
129 catch ( ConfigDbException e )
131 out.println ( "Command failed: " + e);
136 public void displayHelp ( PrintStream out )
138 out.println ( "topics" );
139 out.println ( "list <topic>" );
143 private static class WriteTopicCommand implements Command<ConfigToolContext>
146 public String[] getMatches ()
148 return new String[] { "write (\\S*) (\\S*)" };
152 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
157 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
161 final ConfigDb db = context.getDb();
162 final ConfigPath base = db.parse ( "/topics" );
163 final ConfigPath myTopic = base.getChild ( parts[0] );
164 db.store ( myTopic, parts[1] );
165 out.println ( "wrote [" + parts[1] + "] to topic [" + parts[0] + "]" );
167 catch ( ConfigDbException e )
169 out.println ( "Command failed: " + e.getMessage() );
170 throw new RuntimeException(e);
175 public void displayHelp ( PrintStream out )
177 out.println ( "write <topic> <string>" );
178 out.println ( "\tBe careful with this. You can write data that's not compatible with Cambria's config db." );
182 private static class ReadTopicCommand implements Command<ConfigToolContext>
185 public String[] getMatches ()
187 return new String[] { "read (\\S*)" };
191 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
196 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
200 final ConfigDb db = context.getDb();
201 final ConfigPath base = db.parse ( "/topics" );
202 final ConfigPath myTopic = base.getChild ( parts[0] );
203 db.store ( myTopic, parts[1] );
204 out.println ( "wrote [" + parts[1] + "] to topic [" + parts[0] + "]" );
206 catch ( ConfigDbException e )
208 out.println ( "Command failed: " + e);
213 public void displayHelp ( PrintStream out )
215 out.println ( "read <topic>" );
216 out.println ( "\tRead config data for a topic." );
220 private static class InitSecureTopicCommand implements Command<ConfigToolContext>
223 public String[] getMatches ()
225 return new String[] { "initTopic (\\S*) (\\S*) (\\S*)" };
229 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
234 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
238 DMaaPKafkaMetaBroker.createTopicEntry ( context.getDb (),
239 context.getDb ().parse("/topics"), parts[0], parts[2], parts[1],true );
240 out.println ( "Topic [" + parts[0] + "] updated." );
242 catch ( ConfigDbException e )
244 out.println ( "Command failed: " + e);
249 public void displayHelp ( PrintStream out )
251 out.println ( "initTopic <topic> <ownerApiKey> <description>" );
255 private static class SetTopicOwnerCommand implements Command<ConfigToolContext>
258 public String[] getMatches ()
260 return new String[] { "setOwner (\\S*) (\\S*)" };
264 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
269 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
273 final Topic kt = DMaaPKafkaMetaBroker.getKafkaTopicConfig ( context.getDb(),
274 context.getDb().parse ( "/topics" ), parts[0] );
277 final String desc = kt.getDescription ();
279 DMaaPKafkaMetaBroker.createTopicEntry ( context.getDb (),
280 context.getDb ().parse("/topics"), parts[0], desc, parts[1], true );
281 out.println ( "Topic [" + parts[0] + "] updated." );
285 out.println ( "Topic [" + parts[0] + "] doesn't exist." );
288 catch ( ConfigDbException e )
290 out.println ( "Command failed: " + e);
295 public void displayHelp ( PrintStream out )
297 out.println ( "setOwner <topic> <ownerApiKey>" );
301 private static class ListApiKeysCommand implements Command<ConfigToolContext>
304 public String[] getMatches ()
306 return new String[] { "listApiKeys", "listApiKey (\\S*) (\\S*) (\\S*)", "listApiKey (\\S*)" };
310 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
315 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
319 final ConfigDb db = context.getDb ();
320 if ( parts.length == 0 )
322 final BaseNsaApiDbImpl<NsaSimpleApiKey> readFrom = new BaseNsaApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory () );
324 for ( String key : readFrom.loadAllKeys () )
329 out.println ( "" + count + " records." );
333 BaseNsaApiDbImpl<NsaSimpleApiKey> readFrom = new BaseNsaApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory () );
334 if ( parts.length == 3 )
336 readFrom = new EncryptingApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory (),
337 EncryptingLayer.readSecretKey ( parts[1] ), rrConvertor.base64Decode ( parts[2] ) );
339 final NsaSimpleApiKey apikey = readFrom.loadApiKey ( parts[0] );
340 if ( apikey == null )
342 out.println ( "Key '" + parts[0] + "' not found." );
346 out.println ( apikey.asJsonObject ().toString () );
350 catch ( ConfigDbException e )
352 out.println ( "Command failed: " + e.getMessage() );
354 catch ( JSONException e )
356 out.println ( "Command failed: " + e.getMessage() );
361 public void displayHelp ( PrintStream out )
363 out.println ( "listApiKeys" );
364 out.println ( "listApiKey <key>" );
365 out.println ( "listApiKey <key> <dbKey> <dbIv>" );
369 private static class PutApiCommand implements Command<ConfigToolContext>
372 public String[] getMatches ()
376 // these are <key> <enckey> <encinit> <value>
377 "putApiKey (secret) (\\S*) (\\S*) (\\S*) (\\S*)",
378 "putApiKey (email) (\\S*) (\\S*) (\\S*) (\\S*)",
379 "putApiKey (description) (\\S*) (\\S*) (\\S*) (\\S*)"
384 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
389 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
393 final ConfigDb db = context.getDb ();
394 if ( parts.length == 5 )
396 final BaseNsaApiDbImpl<NsaSimpleApiKey> apiKeyDb =
397 new EncryptingApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory (),
398 EncryptingLayer.readSecretKey ( parts[2] ), rrConvertor.base64Decode ( parts[3] ) );
400 final NsaSimpleApiKey apikey = apiKeyDb.loadApiKey ( parts[1] );
401 if ( apikey == null )
403 out.println ( "Key '" + parts[1] + "' not found." );
407 if ( parts[0].equalsIgnoreCase ( "secret" ) )
409 apikey.resetSecret ( parts[4] );
411 else if ( parts[0].equalsIgnoreCase ( "email" ) )
413 apikey.setContactEmail ( parts[4] );
415 else if ( parts[0].equalsIgnoreCase ( "description" ) )
417 apikey.setDescription ( parts[4] );
420 apiKeyDb.saveApiKey ( apikey );
421 out.println ( apikey.asJsonObject ().toString () );
425 catch ( ConfigDbException e )
427 out.println ( "Command failed: " + e.getMessage() );
429 catch ( JSONException e )
431 out.println ( "Command failed: " + e.getMessage() );
436 public void displayHelp ( PrintStream out )
438 out.println ( "putApiKey secret <apiKey> <dbKey> <dbIv> <newSecret>" );
439 out.println ( "putApiKey email <apiKey> <dbKey> <dbIv> <newEmail>" );
440 out.println ( "putApiKey description <apiKey> <dbKey> <dbIv> <newDescription>" );
444 private static class writeApiKeyCommand implements Command<ConfigToolContext>
447 public String[] getMatches ()
451 // <enckey> <encinit> <key> <secret>
452 "writeApiKey (\\S*) (\\S*) (\\S*) (\\S*)",
457 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
462 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
466 final ConfigDb db = context.getDb ();
467 if ( parts.length == 4 )
469 final BaseNsaApiDbImpl<NsaSimpleApiKey> apiKeyDb =
470 new EncryptingApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory (),
471 EncryptingLayer.readSecretKey ( parts[0] ), rrConvertor.base64Decode ( parts[1] ) );
473 apiKeyDb.deleteApiKey ( parts[2] );
474 final NsaSimpleApiKey apikey = apiKeyDb.createApiKey ( parts[2], parts[3] );
475 out.println ( apikey.asJsonObject ().toString () );
478 catch ( ConfigDbException e )
480 out.println ( "Command failed: " + e.getMessage() );
482 catch ( JSONException e )
484 out.println ( "Command failed: " + e.getMessage() );
486 catch ( KeyExistsException e )
488 out.println ( "Command failed: " + e.getMessage() );
493 public void displayHelp ( PrintStream out )
495 out.println ( "writeApiKey <dbKey> <dbIv> <newApiKey> <newSecret>" );
499 private static class EncryptApiKeysCommand implements Command<ConfigToolContext>
502 public String[] getMatches ()
504 return new String[] { "convertApiKeyDb", "convertApiKeyDb (\\S*) (\\S*)" };
508 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
513 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
517 final String key = parts.length == 2 ? parts[0] : EncryptingLayer.createSecretKey ();
518 final String iv = parts.length == 2 ? parts[1] : rrConvertor.base64Encode ( uniqueStringGenerator.createValue ( 16 ) );
520 // This doesn't do well when the number of API keys is giant...
521 if ( parts.length == 0 )
523 out.println ( "YOU MUST RECORD THESE VALUES AND USE THEM IN THE SERVER CONFIG" );
524 out.println ( "Key: " + key );
525 out.println ( " IV: " + iv );
526 out.println ( "\n" );
527 out.println ( "Call again with key and IV on command line." );
528 out.println ( "\n" );
529 return; // because otherwise the values get lost
532 final ConfigDb db = context.getDb ();
533 final BaseNsaApiDbImpl<NsaSimpleApiKey> readFrom = new BaseNsaApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory () );
534 final EncryptingApiDbImpl<NsaSimpleApiKey> writeTo = new EncryptingApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory (),
535 EncryptingLayer.readSecretKey ( key ), rrConvertor.base64Decode ( iv ) );
538 for ( Entry<String, NsaSimpleApiKey> e : readFrom.loadAllKeyRecords ().entrySet () )
540 out.println ( "-------------------------------" );
541 out.println ( "Converting " + e.getKey () );
542 final String was = e.getValue ().asJsonObject ().toString ();
545 writeTo.saveApiKey ( e.getValue () );
549 out.println ( "Conversion complete, converted " + count + " records." );
551 catch ( ConfigDbException e )
553 out.println ( "Command failed: " + e.getMessage() );
555 catch ( NoSuchAlgorithmException e )
557 out.println ( "Command failed: " + e.getMessage() );
562 public void displayHelp ( PrintStream out )
564 out.println ( "convertApiKeyDb" );
565 out.println ( "\tconvert an API key DB to an encrypted DB and output the cipher details" );
569 private static class DecryptApiKeysCommand implements Command<ConfigToolContext>
572 public String[] getMatches ()
574 return new String[] { "revertApiKeyDb (\\S*) (\\S*)" };
578 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
583 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
587 final String keyStr = parts[0];
588 final String iv = parts[1];
589 final byte[] ivBytes = rrConvertor.base64Decode ( iv );
591 final ConfigDb db = context.getDb ();
592 final EncryptingApiDbImpl<NsaSimpleApiKey> readFrom = new EncryptingApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory (),
593 EncryptingLayer.readSecretKey ( keyStr ), ivBytes );
594 final BaseNsaApiDbImpl<NsaSimpleApiKey> writeTo = new BaseNsaApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory () );
597 for ( String apiKey : readFrom.loadAllKeys () )
599 out.println ( "Converting " + apiKey );
600 final NsaSimpleApiKey record = readFrom.loadApiKey ( apiKey );
601 if ( record == null )
603 out.println ( "Couldn't load " + apiKey );
607 writeTo.saveApiKey ( record );
611 out.println ( "Conversion complete, converted " + count + " records." );
613 catch ( ConfigDbException e )
615 out.println ( "Command failed: " + e.getMessage() );
620 public void displayHelp ( PrintStream out )
622 out.println ( "revertApiKeyDb <keyBase64> <ivBase64>" );
623 out.println ( "\trevert an API key DB to a deencrypted DB" );
627 private static class NodeFetchCommand implements Command<ConfigToolContext>
630 public String[] getMatches ()
632 return new String[] { "node (\\S*)" };
636 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
641 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
645 final String node = parts[0];
647 final ConfigDb db = context.getDb ();
648 final ConfigPath cp = db.parse ( node );
650 boolean doneOne = false;
651 for ( ConfigPath child : db.loadChildrenNames ( cp ) )
653 out.println ( "\t- " + child.getName () );
662 out.println ( "(No child nodes of '" + node + "')" );
665 final String val = db.load ( cp );
668 out.println ( "(No data at '" + node + "')" );
675 catch ( ConfigDbException e )
677 out.println ( "Command failed: " + e.getMessage() );
679 catch ( IllegalArgumentException e )
681 out.println ( "Command failed: " + e.getMessage() );
686 public void displayHelp ( PrintStream out )
688 out.println ( "node <nodeName>" );
689 out.println ( "\tread a config db node" );
693 private static class DropOldConsumerGroupsCommand implements Command<ConfigToolContext>
695 private final long kMaxRemovals = 500;
698 public String[] getMatches ()
700 return new String[] { "(dropOldConsumers) (\\S*)", "(showOldConsumers) (\\S*)" };
704 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
709 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
713 final boolean runDrops = parts[0].equalsIgnoreCase ( "dropOldConsumers" );
714 final String maxAgeInDaysStr = parts[1];
715 final int maxAgeInDays = Integer.parseInt ( maxAgeInDaysStr );
716 final long oldestEpochSecs = ( NsaClock.now () / 1000 ) - ( 24 * 60 * 60 * maxAgeInDays );
718 out.println ( "Dropping consumer groups older than " + new Date ( oldestEpochSecs * 1000 ) );
720 final ConfigDb db = context.getDb ();
722 // kafka updates consumer partition records in ZK each time a message
723 // is served. we can determine which consumers are old based on a lack
724 // of update to the partition entries
725 // (see https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper)
727 // kafka only works with ZK, and our configDb was constructed with a non-kafka
728 // root node. We have to switch it to get to the right content...
729 if ( ! ( db instanceof ZkConfigDb ) )
731 throw new ConfigDbException ( "You can only show/drop old consumers against a ZK config db." );
734 final ZkConfigDb newZkDb = new ZkConfigDb ( context.getConnectionString (), "" );
737 final LinkedList<ConfigPath> removals = new LinkedList<ConfigPath> ();
738 for ( ConfigPath consumerGroupName : newZkDb.loadChildrenNames ( newZkDb.parse ( "/consumers" ) ) )
741 if ( cgCount % 500 == 0 )
743 out.println ( "" + cgCount + " groups examined" );
746 boolean foundAnything = false;
747 boolean foundRecentUse = false;
748 long mostRecent = -1;
750 // each consumer group has an "offsets" entry, which contains 0 or more topic entries.
751 // each topic contains partition nodes.
752 for ( ConfigPath topic : newZkDb.loadChildrenNames ( consumerGroupName.getChild ( "offsets" ) ) )
754 for ( ConfigPath offset : newZkDb.loadChildrenNames ( topic ) )
756 foundAnything = true;
758 final long modTime = newZkDb.getLastModificationTime ( offset );
759 mostRecent = Math.max ( mostRecent, modTime );
761 foundRecentUse = ( modTime > oldestEpochSecs );
762 if ( foundRecentUse ) break;
764 if ( foundRecentUse ) break;
767 // decide if this consumer group is old
768 out.println ( "Group " + consumerGroupName.getName () + " was most recently used " + new Date ( mostRecent*1000 ) );
769 if ( foundAnything && !foundRecentUse )
771 removals.add ( consumerGroupName );
774 if ( removals.size () >= kMaxRemovals )
781 for ( ConfigPath consumerGroupName : removals )
783 out.println ( "Group " + consumerGroupName.getName () + " has no recent activity." );
786 out.println ( "Removing group " + consumerGroupName.getName () + "..." );
787 newZkDb.clear ( consumerGroupName );
791 catch ( ConfigDbException e )
793 out.println ( "Command failed: " + e.getMessage() );
795 catch ( NumberFormatException e )
797 out.println ( "Command failed: " + e.getMessage() );
799 catch ( JSONException e )
801 out.println ( "Command failed: " + e.getMessage() );
806 public void displayHelp ( PrintStream out )
808 out.println ( "showOldConsumers <minAgeInDays>" );
809 out.println ( "dropOldConsumers <minAgeInDays>" );
810 out.println ( "\tDrop (or just show) any consumer group that has been inactive longer than <minAgeInDays> days." );
812 out.println ( "\tTo be safe, <minAgeInDays> should be much higher than the maximum storage time on the Kafka topics." );
813 out.println ( "\tA very old consumer will potentially miss messages, but will resume at the oldest message, while a" );
814 out.println ( "\tdeleted consumer will start at the current message if it ever comes back." );
816 out.println ( "\tNote that show/drops are limited to " + kMaxRemovals + " records per invocation." );