5aefe2d61359a7f5e797efd144327587f929af2e
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / messagerouter / msgrtr / nsa / cambria / resources / streamReaders / CambriaJsonStreamReader.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.resources.streamReaders;
23
24 import java.io.InputStream;
25
26 import javax.servlet.http.HttpServletResponse;
27
28 import org.json.JSONException;
29 import org.json.JSONObject;
30 import org.json.JSONTokener;
31 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.CambriaApiException;
32 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.backends.Publisher.message;
33 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.beans.LogDetails;
34 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.resources.CambriaEventSet.reader;
35
36 /**
37  * 
38  * @author author
39  *
40  */
41 public class CambriaJsonStreamReader implements reader {
42         private final JSONTokener fTokens;
43         private final boolean fIsList;
44         private long fCount;
45         private final String fDefPart;
46         public static final String kKeyField = "cambria.partition";
47
48         /**
49          * 
50          * @param is
51          * @param defPart
52          * @throws CambriaApiException
53          */
54         public CambriaJsonStreamReader(InputStream is, String defPart) throws CambriaApiException {
55                 try {
56                         fTokens = new JSONTokener(is);
57                         fCount = 0;
58                         fDefPart = defPart;
59
60                         final int c = fTokens.next();
61                         if (c == '[') {
62                                 fIsList = true;
63                         } else if (c == '{') {
64                                 fTokens.back();
65                                 fIsList = false;
66                         } else {
67                                 throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expecting an array or an object.");
68                         }
69                 } catch (JSONException e) {
70                         throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, e.getMessage());
71                 }
72         }
73
74         @Override
75         public message next() throws CambriaApiException {
76                 try {
77                         if (!fTokens.more()) {
78                                 return null;
79                         }
80
81                         final int c = fTokens.next();
82                         
83                         /*if (c ==','){
84                                 fCloseCount++;
85                                 System.out.println("fCloseCount=" + fCloseCount +" fCount "+fCount);
86                         }*/
87                         if (fIsList) {
88                                 if (c == ']' || (fCount > 0 && c == 10))
89                                         return null;
90
91
92                                 if (fCount > 0 && c != ',' && c!= 10) {
93                                         throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST,
94                                                         "Expected ',' or closing ']' after last object.");
95                                 }
96
97                                 if (fCount == 0 && c != '{' && c!= 10  && c!=32) {
98                                         throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expected { to start an object.");
99                                 }
100                         } else if (fCount != 0 || c != '{') {
101                                 throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expected '{' to start an object.");
102                         }
103
104                         if (c == '{') {
105                                 fTokens.back();
106                         }
107                         final JSONObject o = new JSONObject(fTokens);
108                         fCount++;
109                         return new msg(o);
110                 } catch (JSONException e) {
111                         throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, e.getMessage());
112
113                 }
114         }
115
116         private class msg implements message {
117                 private final String fKey;
118                 private  String fMsg;
119                 private LogDetails logDetails;
120                 private boolean transactionEnabled;
121
122                 /**
123                  * constructor
124                  * 
125                  * @param o
126                  */
127                 //public msg(JSONObject o){}
128                 
129                 
130                 public msg(JSONObject o) {
131                         String key = o.optString(kKeyField, fDefPart);
132                         if (key == null) {
133                                 key = "" + System.currentTimeMillis();
134                         }
135                         fKey = key;
136                                         
137                                 fMsg = o.toString().trim();
138                         
139                 }
140
141                 @Override
142                 public String getKey() {
143                         return fKey;
144                 }
145
146                 @Override
147                 public String getMessage() {
148                         return fMsg;
149                 }
150
151                 @Override
152                 public boolean isTransactionEnabled() {
153                         return transactionEnabled;
154                 }
155
156                 @Override
157                 public void setTransactionEnabled(boolean transactionEnabled) {
158                         this.transactionEnabled = transactionEnabled;
159                 }
160
161                 @Override
162                 public void setLogDetails(LogDetails logDetails) {
163                         this.logDetails = logDetails;
164                 }
165
166                 @Override
167                 public LogDetails getLogDetails() {
168                         return logDetails;
169                 }
170         }
171 }