[DMAAP-CLIENT] First sonar issues review part2
[dmaap/messagerouter/dmaapclient.git] / src / main / java / org / onap / dmaap / mr / tools / MessageCommand.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Modifications Copyright © 2021 Orange.
8  *  ================================================================================
9  *  Licensed under the Apache License, Version 2.0 (the "License");
10  *  you may not use this file except in compliance with the License.
11  *  You may obtain a copy of the License at
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  *  ============LICENSE_END=========================================================
20  *
21  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  *
23  *******************************************************************************/
24
25 package org.onap.dmaap.mr.tools;
26
27 import com.att.nsa.cmdtool.Command;
28 import com.att.nsa.cmdtool.CommandNotReadyException;
29 import java.io.IOException;
30 import java.io.PrintStream;
31 import java.util.List;
32 import java.util.concurrent.TimeUnit;
33 import org.onap.dmaap.mr.client.MRBatchingPublisher;
34 import org.onap.dmaap.mr.client.MRClientFactory;
35 import org.onap.dmaap.mr.client.MRConsumer;
36 import org.onap.dmaap.mr.client.MRPublisher.Message;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 public class MessageCommand implements Command<MRCommandContext> {
41     final Logger logger = LoggerFactory.getLogger(MessageCommand.class);
42
43     private static final String SENDING_PROBLEM_MESSAGE = "Problem sending message: ";
44
45     @Override
46     public String[] getMatches() {
47         return new String[] {
48             "(post) (\\S*) (\\S*) (.*)",
49             "(read) (\\S*) (\\S*) (\\S*)",
50         };
51     }
52
53     @Override
54     public void checkReady(MRCommandContext context) throws CommandNotReadyException {
55         if (!context.checkClusterReady()) {
56             throw new CommandNotReadyException("Use 'cluster' to specify a cluster to use.");
57         }
58     }
59
60     @Override
61     public void execute(String[] parts, MRCommandContext context, PrintStream out) throws CommandNotReadyException {
62         if (parts[0].equalsIgnoreCase("read")) {
63             final MRConsumer cc = MRClientFactory.createConsumer(context.getCluster(), parts[1], parts[2], parts[3],
64                     -1, -1, null, context.getApiKey(), context.getApiPwd());
65             context.applyTracer(cc);
66             try {
67                 for (String msg : cc.fetch()) {
68                     out.println(msg);
69                 }
70             } catch (Exception e) {
71                 out.println("Problem fetching messages: " + e.getMessage());
72                 logger.error("Problem fetching messages: ", e);
73             } finally {
74                 cc.close();
75             }
76         } else {
77             final MRBatchingPublisher pub = ToolsUtil.createBatchPublisher(context, parts[1]);
78             try {
79                 pub.send(parts[2], parts[3]);
80             } catch (IOException e) {
81                 out.println(SENDING_PROBLEM_MESSAGE + e.getMessage());
82                 logger.error(SENDING_PROBLEM_MESSAGE, e);
83             } finally {
84                 List<Message> left = null;
85                 try {
86                     left = pub.close(500, TimeUnit.MILLISECONDS);
87                 } catch (IOException e) {
88                     out.println(SENDING_PROBLEM_MESSAGE + e.getMessage());
89                     logger.error(SENDING_PROBLEM_MESSAGE, e);
90                 } catch (InterruptedException e) {
91                     out.println(SENDING_PROBLEM_MESSAGE + e.getMessage());
92                     logger.error(SENDING_PROBLEM_MESSAGE, e);
93                     Thread.currentThread().interrupt();
94                 }
95                 if (left != null && !left.isEmpty()) {
96                     out.println(left.size() + " messages not sent.");
97                 }
98             }
99         }
100     }
101
102     @Override
103     public void displayHelp(PrintStream out) {
104         out.println("post <topicName> <partition> <message>");
105         out.println("read <topicName> <consumerGroup> <consumerId>");
106     }
107
108 }