f0ec225a811df14c2452ed3d6b5413a638f225c9
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / messagerouter / msgrtr / nsa / cambria / resources / streamReaders / CambriaRawStreamReader.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.resources.streamReaders;
23
24 import java.io.IOException;
25 import java.io.InputStream;
26
27 import javax.servlet.http.HttpServletResponse;
28
29 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.CambriaApiException;
30 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.backends.Publisher.message;
31 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.beans.LogDetails;
32 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.resources.CambriaEventSet.reader;
33
34 import com.att.nsa.util.StreamTools;
35
36 /**
37  * 
38  * This stream reader reads raw bytes creating a single message.
39  * @author author
40  *
41  */
42 public class CambriaRawStreamReader implements reader
43 {
44         /**
45          * This is the constructor of CambriaRawStreamReader, it will basically the read from Input stream
46          * @param is
47          * @param defPart
48          * @throws CambriaApiException
49          */
50         public CambriaRawStreamReader ( InputStream is, String defPart ) throws CambriaApiException
51         {
52                 fStream = is;
53                 fDefPart = defPart;
54                 fClosed = false;
55         }
56
57         @Override
58         /**
59          * 
60          * next() method reads the bytes and
61          * iterates through the messages 
62          * @throws CambriaApiException
63          * 
64          */
65         public message next () throws CambriaApiException
66         {
67                 if ( fClosed ) return null;
68
69                 try
70                 {
71                         final byte[] rawBytes = StreamTools.readBytes ( fStream );
72                         fClosed = true;
73                         return new message ()
74                         {
75                                 private LogDetails logDetails;
76                                 private boolean transactionEnabled;
77
78                                 /**
79                                  * returns boolean value which 
80                                  * indicates whether transaction is enabled
81                                  */
82                                 public boolean isTransactionEnabled() {
83                                         return transactionEnabled;
84                                 }
85
86                                 /**
87                                  * sets boolean value which 
88                                  * indicates whether transaction is enabled
89                                  */
90                                 public void setTransactionEnabled(boolean transactionEnabled) {
91                                         this.transactionEnabled = transactionEnabled;
92                                 }
93                                 
94                                 @Override
95                                 /**
96                                  * @returns key
97                                  * It ch4ecks whether fDefPart value is Null.
98                                  * If yes, it will return ystem.currentTimeMillis () else
99                                  * it will return fDefPart variable value
100                                  */
101                                 public String getKey ()
102                                 {
103                                         return fDefPart == null ? "" + System.currentTimeMillis () : fDefPart;
104                                 }
105
106                                 @Override
107                                 /**
108                                  * returns the message in String type object
109                                  */
110                                 public String getMessage ()
111                                 {
112                                         return new String ( rawBytes );
113                                 }
114
115                                 /**
116                                  * set log details in logDetails variable
117                                  */
118                                 @Override
119                                 public void setLogDetails(LogDetails logDetails) {
120                                         this.logDetails = logDetails;
121                                 }
122
123                                 @Override
124                                 /**
125                                  * get the log details
126                                  */
127                                 public LogDetails getLogDetails() {
128                                         return this.logDetails;
129                                 }
130                         };
131                 }
132                 catch ( IOException e )
133                 {
134                         throw new CambriaApiException ( HttpServletResponse.SC_BAD_REQUEST, e.getMessage () );
135                 }
136         }
137         
138         private final InputStream fStream;
139         private final String fDefPart;
140         private boolean fClosed;
141         //private String transactionId;
142 }