1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.tools;
24 import java.io.IOException;
25 import java.io.PrintStream;
26 import java.security.NoSuchAlgorithmException;
27 import java.util.Date;
28 import java.util.LinkedList;
29 import java.util.Map.Entry;
31 import org.json.JSONException;
33 import com.att.nsa.apiServer.CommonServlet;
34 import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaMetaBroker;
35 import org.onap.dmaap.dmf.mr.metabroker.Topic;
36 import com.att.nsa.cmdtool.Command;
37 import com.att.nsa.cmdtool.CommandLineTool;
38 import com.att.nsa.cmdtool.CommandNotReadyException;
39 import com.att.nsa.configs.ConfigDb;
40 import com.att.nsa.configs.ConfigDbException;
41 import com.att.nsa.configs.ConfigPath;
42 import com.att.nsa.configs.confimpl.EncryptingLayer;
43 import com.att.nsa.configs.confimpl.ZkConfigDb;
44 import com.att.nsa.drumlin.till.data.rrConvertor;
45 import com.att.nsa.drumlin.till.data.uniqueStringGenerator;
46 import com.att.nsa.drumlin.till.nv.impl.nvWriteableTable;
47 import com.att.nsa.security.db.BaseNsaApiDbImpl;
48 import com.att.nsa.security.db.EncryptingApiDbImpl;
49 import com.att.nsa.security.db.NsaApiDb.KeyExistsException;
50 import com.att.nsa.security.db.simple.NsaSimpleApiKey;
51 import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory;
52 import com.att.nsa.util.NsaClock;
54 public class ConfigTool extends CommandLineTool<ConfigToolContext>
56 protected ConfigTool ()
58 super ( "Cambria API Config Tool", "cambriaConfig> " );
60 super.registerCommand ( new ListTopicCommand () );
61 super.registerCommand ( new WriteTopicCommand () );
62 super.registerCommand ( new ReadTopicCommand () );
63 super.registerCommand ( new SetTopicOwnerCommand () );
64 super.registerCommand ( new InitSecureTopicCommand () );
65 super.registerCommand ( new ListApiKeysCommand () );
66 super.registerCommand ( new PutApiCommand () );
67 super.registerCommand ( new writeApiKeyCommand () );
68 super.registerCommand ( new EncryptApiKeysCommand () );
69 super.registerCommand ( new DecryptApiKeysCommand () );
70 super.registerCommand ( new NodeFetchCommand () );
71 super.registerCommand ( new DropOldConsumerGroupsCommand () );
74 public static void main ( String[] args ) throws IOException
76 final String connStr = args.length>0 ? args[0] : "localhost:2181";
77 final ConfigDb db = new ZkConfigDb (
79 args.length>1 ? args[1] : CommonServlet.getDefaultZkRoot ( "cambria" )
82 final ConfigToolContext context = new ConfigToolContext ( db, connStr, new nvWriteableTable() );
83 final ConfigTool ct = new ConfigTool ();
84 ct.runFromMain ( args, context );
87 private static class ListTopicCommand implements Command<ConfigToolContext>
90 public String[] getMatches ()
92 return new String[] { "topics", "list (\\S*)" };
96 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
101 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
105 final ConfigDb db = context.getDb();
106 final ConfigPath base = db.parse ( "/topics" );
108 if ( parts.length > 0 )
110 final ConfigPath myTopic = base.getChild ( parts[0] );
111 final String data = db.load ( myTopic );
114 out.println ( data );
118 out.println ( "No topic [" + parts[0] + "]" );
123 for ( ConfigPath child : db.loadChildrenNames ( base ) )
125 out.println ( child.getName () );
129 catch ( ConfigDbException e )
131 out.println ( "Command failed: " + e.getMessage() );
136 public void displayHelp ( PrintStream out )
138 out.println ( "topics" );
139 out.println ( "list <topic>" );
143 private static class WriteTopicCommand implements Command<ConfigToolContext>
146 public String[] getMatches ()
148 return new String[] { "write (\\S*) (\\S*)" };
152 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
157 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
161 final ConfigDb db = context.getDb();
162 final ConfigPath base = db.parse ( "/topics" );
163 final ConfigPath myTopic = base.getChild ( parts[0] );
164 db.store ( myTopic, parts[1] );
165 out.println ( "wrote [" + parts[1] + "] to topic [" + parts[0] + "]" );
167 catch ( ConfigDbException e )
169 out.println ( "Command failed: " + e.getMessage() );
174 public void displayHelp ( PrintStream out )
176 out.println ( "write <topic> <string>" );
177 out.println ( "\tBe careful with this. You can write data that's not compatible with Cambria's config db." );
181 private static class ReadTopicCommand implements Command<ConfigToolContext>
184 public String[] getMatches ()
186 return new String[] { "read (\\S*)" };
190 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
195 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
199 final ConfigDb db = context.getDb();
200 final ConfigPath base = db.parse ( "/topics" );
201 final ConfigPath myTopic = base.getChild ( parts[0] );
202 db.store ( myTopic, parts[1] );
203 out.println ( "wrote [" + parts[1] + "] to topic [" + parts[0] + "]" );
205 catch ( ConfigDbException e )
207 out.println ( "Command failed: " + e.getMessage() );
212 public void displayHelp ( PrintStream out )
214 out.println ( "read <topic>" );
215 out.println ( "\tRead config data for a topic." );
219 private static class InitSecureTopicCommand implements Command<ConfigToolContext>
222 public String[] getMatches ()
224 return new String[] { "initTopic (\\S*) (\\S*) (\\S*)" };
228 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
233 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
237 DMaaPKafkaMetaBroker.createTopicEntry ( context.getDb (),
238 context.getDb ().parse("/topics"), parts[0], parts[2], parts[1],true );
239 out.println ( "Topic [" + parts[0] + "] updated." );
241 catch ( ConfigDbException e )
243 out.println ( "Command failed: " + e.getMessage () );
248 public void displayHelp ( PrintStream out )
250 out.println ( "initTopic <topic> <ownerApiKey> <description>" );
254 private static class SetTopicOwnerCommand implements Command<ConfigToolContext>
257 public String[] getMatches ()
259 return new String[] { "setOwner (\\S*) (\\S*)" };
263 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
268 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
272 final Topic kt = DMaaPKafkaMetaBroker.getKafkaTopicConfig ( context.getDb(),
273 context.getDb().parse ( "/topics" ), parts[0] );
276 final String desc = kt.getDescription ();
278 DMaaPKafkaMetaBroker.createTopicEntry ( context.getDb (),
279 context.getDb ().parse("/topics"), parts[0], desc, parts[1], true );
280 out.println ( "Topic [" + parts[0] + "] updated." );
284 out.println ( "Topic [" + parts[0] + "] doesn't exist." );
287 catch ( ConfigDbException e )
289 out.println ( "Command failed: " + e.getMessage () );
294 public void displayHelp ( PrintStream out )
296 out.println ( "setOwner <topic> <ownerApiKey>" );
300 private static class ListApiKeysCommand implements Command<ConfigToolContext>
303 public String[] getMatches ()
305 return new String[] { "listApiKeys", "listApiKey (\\S*) (\\S*) (\\S*)", "listApiKey (\\S*)" };
309 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
314 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
318 final ConfigDb db = context.getDb ();
319 if ( parts.length == 0 )
321 final BaseNsaApiDbImpl<NsaSimpleApiKey> readFrom = new BaseNsaApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory () );
323 for ( String key : readFrom.loadAllKeys () )
328 out.println ( "" + count + " records." );
332 BaseNsaApiDbImpl<NsaSimpleApiKey> readFrom = new BaseNsaApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory () );
333 if ( parts.length == 3 )
335 readFrom = new EncryptingApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory (),
336 EncryptingLayer.readSecretKey ( parts[1] ), rrConvertor.base64Decode ( parts[2] ) );
338 final NsaSimpleApiKey apikey = readFrom.loadApiKey ( parts[0] );
339 if ( apikey == null )
341 out.println ( "Key '" + parts[0] + "' not found." );
345 out.println ( apikey.asJsonObject ().toString () );
349 catch ( ConfigDbException e )
351 out.println ( "Command failed: " + e.getMessage() );
353 catch ( JSONException e )
355 out.println ( "Command failed: " + e.getMessage() );
360 public void displayHelp ( PrintStream out )
362 out.println ( "listApiKeys" );
363 out.println ( "listApiKey <key>" );
364 out.println ( "listApiKey <key> <dbKey> <dbIv>" );
368 private static class PutApiCommand implements Command<ConfigToolContext>
371 public String[] getMatches ()
375 // these are <key> <enckey> <encinit> <value>
376 "putApiKey (secret) (\\S*) (\\S*) (\\S*) (\\S*)",
377 "putApiKey (email) (\\S*) (\\S*) (\\S*) (\\S*)",
378 "putApiKey (description) (\\S*) (\\S*) (\\S*) (\\S*)"
383 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
388 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
392 final ConfigDb db = context.getDb ();
393 if ( parts.length == 5 )
395 final BaseNsaApiDbImpl<NsaSimpleApiKey> apiKeyDb =
396 new EncryptingApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory (),
397 EncryptingLayer.readSecretKey ( parts[2] ), rrConvertor.base64Decode ( parts[3] ) );
399 final NsaSimpleApiKey apikey = apiKeyDb.loadApiKey ( parts[1] );
400 if ( apikey == null )
402 out.println ( "Key '" + parts[1] + "' not found." );
406 if ( parts[0].equalsIgnoreCase ( "secret" ) )
408 apikey.resetSecret ( parts[4] );
410 else if ( parts[0].equalsIgnoreCase ( "email" ) )
412 apikey.setContactEmail ( parts[4] );
414 else if ( parts[0].equalsIgnoreCase ( "description" ) )
416 apikey.setDescription ( parts[4] );
419 apiKeyDb.saveApiKey ( apikey );
420 out.println ( apikey.asJsonObject ().toString () );
424 catch ( ConfigDbException e )
426 out.println ( "Command failed: " + e.getMessage() );
428 catch ( JSONException e )
430 out.println ( "Command failed: " + e.getMessage() );
435 public void displayHelp ( PrintStream out )
437 out.println ( "putApiKey secret <apiKey> <dbKey> <dbIv> <newSecret>" );
438 out.println ( "putApiKey email <apiKey> <dbKey> <dbIv> <newEmail>" );
439 out.println ( "putApiKey description <apiKey> <dbKey> <dbIv> <newDescription>" );
443 private static class writeApiKeyCommand implements Command<ConfigToolContext>
446 public String[] getMatches ()
450 // <enckey> <encinit> <key> <secret>
451 "writeApiKey (\\S*) (\\S*) (\\S*) (\\S*)",
456 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
461 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
465 final ConfigDb db = context.getDb ();
466 if ( parts.length == 4 )
468 final BaseNsaApiDbImpl<NsaSimpleApiKey> apiKeyDb =
469 new EncryptingApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory (),
470 EncryptingLayer.readSecretKey ( parts[0] ), rrConvertor.base64Decode ( parts[1] ) );
472 apiKeyDb.deleteApiKey ( parts[2] );
473 final NsaSimpleApiKey apikey = apiKeyDb.createApiKey ( parts[2], parts[3] );
474 out.println ( apikey.asJsonObject ().toString () );
477 catch ( ConfigDbException e )
479 out.println ( "Command failed: " + e.getMessage() );
481 catch ( JSONException e )
483 out.println ( "Command failed: " + e.getMessage() );
485 catch ( KeyExistsException e )
487 out.println ( "Command failed: " + e.getMessage() );
492 public void displayHelp ( PrintStream out )
494 out.println ( "writeApiKey <dbKey> <dbIv> <newApiKey> <newSecret>" );
498 private static class EncryptApiKeysCommand implements Command<ConfigToolContext>
501 public String[] getMatches ()
503 return new String[] { "convertApiKeyDb", "convertApiKeyDb (\\S*) (\\S*)" };
507 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
512 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
516 final String key = parts.length == 2 ? parts[0] : EncryptingLayer.createSecretKey ();
517 final String iv = parts.length == 2 ? parts[1] : rrConvertor.base64Encode ( uniqueStringGenerator.createValue ( 16 ) );
519 // This doesn't do well when the number of API keys is giant...
520 if ( parts.length == 0 )
522 out.println ( "YOU MUST RECORD THESE VALUES AND USE THEM IN THE SERVER CONFIG" );
523 out.println ( "Key: " + key );
524 out.println ( " IV: " + iv );
525 out.println ( "\n" );
526 out.println ( "Call again with key and IV on command line." );
527 out.println ( "\n" );
528 return; // because otherwise the values get lost
531 final ConfigDb db = context.getDb ();
532 final BaseNsaApiDbImpl<NsaSimpleApiKey> readFrom = new BaseNsaApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory () );
533 final EncryptingApiDbImpl<NsaSimpleApiKey> writeTo = new EncryptingApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory (),
534 EncryptingLayer.readSecretKey ( key ), rrConvertor.base64Decode ( iv ) );
537 for ( Entry<String, NsaSimpleApiKey> e : readFrom.loadAllKeyRecords ().entrySet () )
539 out.println ( "-------------------------------" );
540 out.println ( "Converting " + e.getKey () );
541 final String was = e.getValue ().asJsonObject ().toString ();
544 writeTo.saveApiKey ( e.getValue () );
548 out.println ( "Conversion complete, converted " + count + " records." );
550 catch ( ConfigDbException e )
552 out.println ( "Command failed: " + e.getMessage() );
554 catch ( NoSuchAlgorithmException e )
556 out.println ( "Command failed: " + e.getMessage() );
561 public void displayHelp ( PrintStream out )
563 out.println ( "convertApiKeyDb" );
564 out.println ( "\tconvert an API key DB to an encrypted DB and output the cipher details" );
568 private static class DecryptApiKeysCommand implements Command<ConfigToolContext>
571 public String[] getMatches ()
573 return new String[] { "revertApiKeyDb (\\S*) (\\S*)" };
577 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
582 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
586 final String keyStr = parts[0];
587 final String iv = parts[1];
588 final byte[] ivBytes = rrConvertor.base64Decode ( iv );
590 final ConfigDb db = context.getDb ();
591 final EncryptingApiDbImpl<NsaSimpleApiKey> readFrom = new EncryptingApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory (),
592 EncryptingLayer.readSecretKey ( keyStr ), ivBytes );
593 final BaseNsaApiDbImpl<NsaSimpleApiKey> writeTo = new BaseNsaApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory () );
596 for ( String apiKey : readFrom.loadAllKeys () )
598 out.println ( "Converting " + apiKey );
599 final NsaSimpleApiKey record = readFrom.loadApiKey ( apiKey );
600 if ( record == null )
602 out.println ( "Couldn't load " + apiKey );
606 writeTo.saveApiKey ( record );
610 out.println ( "Conversion complete, converted " + count + " records." );
612 catch ( ConfigDbException e )
614 out.println ( "Command failed: " + e.getMessage() );
619 public void displayHelp ( PrintStream out )
621 out.println ( "revertApiKeyDb <keyBase64> <ivBase64>" );
622 out.println ( "\trevert an API key DB to a deencrypted DB" );
626 private static class NodeFetchCommand implements Command<ConfigToolContext>
629 public String[] getMatches ()
631 return new String[] { "node (\\S*)" };
635 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
640 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
644 final String node = parts[0];
646 final ConfigDb db = context.getDb ();
647 final ConfigPath cp = db.parse ( node );
649 boolean doneOne = false;
650 for ( ConfigPath child : db.loadChildrenNames ( cp ) )
652 out.println ( "\t- " + child.getName () );
661 out.println ( "(No child nodes of '" + node + "')" );
664 final String val = db.load ( cp );
667 out.println ( "(No data at '" + node + "')" );
674 catch ( ConfigDbException e )
676 out.println ( "Command failed: " + e.getMessage() );
678 catch ( IllegalArgumentException e )
680 out.println ( "Command failed: " + e.getMessage() );
685 public void displayHelp ( PrintStream out )
687 out.println ( "node <nodeName>" );
688 out.println ( "\tread a config db node" );
692 private static class DropOldConsumerGroupsCommand implements Command<ConfigToolContext>
694 private final long kMaxRemovals = 500;
697 public String[] getMatches ()
699 return new String[] { "(dropOldConsumers) (\\S*)", "(showOldConsumers) (\\S*)" };
703 public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
708 public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
712 final boolean runDrops = parts[0].equalsIgnoreCase ( "dropOldConsumers" );
713 final String maxAgeInDaysStr = parts[1];
714 final int maxAgeInDays = Integer.parseInt ( maxAgeInDaysStr );
715 final long oldestEpochSecs = ( NsaClock.now () / 1000 ) - ( 24 * 60 * 60 * maxAgeInDays );
717 out.println ( "Dropping consumer groups older than " + new Date ( oldestEpochSecs * 1000 ) );
719 final ConfigDb db = context.getDb ();
721 // kafka updates consumer partition records in ZK each time a message
722 // is served. we can determine which consumers are old based on a lack
723 // of update to the partition entries
724 // (see https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper)
726 // kafka only works with ZK, and our configDb was constructed with a non-kafka
727 // root node. We have to switch it to get to the right content...
728 if ( ! ( db instanceof ZkConfigDb ) )
730 throw new ConfigDbException ( "You can only show/drop old consumers against a ZK config db." );
733 final ZkConfigDb newZkDb = new ZkConfigDb ( context.getConnectionString (), "" );
736 final LinkedList<ConfigPath> removals = new LinkedList<ConfigPath> ();
737 for ( ConfigPath consumerGroupName : newZkDb.loadChildrenNames ( newZkDb.parse ( "/consumers" ) ) )
740 if ( cgCount % 500 == 0 )
742 out.println ( "" + cgCount + " groups examined" );
745 boolean foundAnything = false;
746 boolean foundRecentUse = false;
747 long mostRecent = -1;
749 // each consumer group has an "offsets" entry, which contains 0 or more topic entries.
750 // each topic contains partition nodes.
751 for ( ConfigPath topic : newZkDb.loadChildrenNames ( consumerGroupName.getChild ( "offsets" ) ) )
753 for ( ConfigPath offset : newZkDb.loadChildrenNames ( topic ) )
755 foundAnything = true;
757 final long modTime = newZkDb.getLastModificationTime ( offset );
758 mostRecent = Math.max ( mostRecent, modTime );
760 foundRecentUse = ( modTime > oldestEpochSecs );
761 if ( foundRecentUse ) break;
763 if ( foundRecentUse ) break;
766 // decide if this consumer group is old
767 out.println ( "Group " + consumerGroupName.getName () + " was most recently used " + new Date ( mostRecent*1000 ) );
768 if ( foundAnything && !foundRecentUse )
770 removals.add ( consumerGroupName );
773 if ( removals.size () >= kMaxRemovals )
780 for ( ConfigPath consumerGroupName : removals )
782 out.println ( "Group " + consumerGroupName.getName () + " has no recent activity." );
785 out.println ( "Removing group " + consumerGroupName.getName () + "..." );
786 newZkDb.clear ( consumerGroupName );
790 catch ( ConfigDbException e )
792 out.println ( "Command failed: " + e.getMessage() );
794 catch ( NumberFormatException e )
796 out.println ( "Command failed: " + e.getMessage() );
798 catch ( JSONException e )
800 out.println ( "Command failed: " + e.getMessage() );
805 public void displayHelp ( PrintStream out )
807 out.println ( "showOldConsumers <minAgeInDays>" );
808 out.println ( "dropOldConsumers <minAgeInDays>" );
809 out.println ( "\tDrop (or just show) any consumer group that has been inactive longer than <minAgeInDays> days." );
811 out.println ( "\tTo be safe, <minAgeInDays> should be much higher than the maximum storage time on the Kafka topics." );
812 out.println ( "\tA very old consumer will potentially miss messages, but will resume at the oldest message, while a" );
813 out.println ( "\tdeleted consumer will start at the current message if it ever comes back." );
815 out.println ( "\tNote that show/drops are limited to " + kMaxRemovals + " records per invocation." );