aai should not check status code
[testsuite/python-testing-utils.git] / robotframework-onap / ONAPLibrary / VESProtobuf.py
1 # Copyright 2019 AT&T Intellectual Property. All rights reserved.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 from google.protobuf import descriptor
16 from google.protobuf import descriptor_pb2
17 from google.protobuf import message_factory
18 from google.protobuf.json_format import MessageToJson
19
20
21 class VESProtobuf(object):
22     """ non keywords methods related to VES """
23
24     def __init__(self):
25         super(VESProtobuf, self).__init__()
26         self.message_descriptors = VESProtobuf.get_message_definitions()
27
28     @staticmethod
29     def create_ves_event():
30         file_descriptor_proto = descriptor_pb2.FileDescriptorProto()
31         file_descriptor_proto.name = 'VesEvent'
32         VESProtobuf.create_commoneventheader(file_descriptor_proto)
33         VESProtobuf.create_vesevent(file_descriptor_proto)
34         return file_descriptor_proto
35
36     @staticmethod
37     def create_vesevent(file_descriptor_proto):
38         message_type = file_descriptor_proto.message_type.add()
39         message_type.name = "VesEvent"
40         VESProtobuf.create_message_field(message_type, 1, "commonEventHeader", "CommonEventHeader")
41         VESProtobuf.create_field(message_type, 2, "eventFields", descriptor.FieldDescriptor.TYPE_BYTES)
42         return message_type
43
44     @staticmethod
45     def create_commoneventheader(file_descriptor_proto):
46         message_type = file_descriptor_proto.message_type.add()
47         message_type.name = "CommonEventHeader"
48         enum_type = VESProtobuf.create_enum_type(message_type, 'Priority')
49         VESProtobuf.create_enum_type_value(enum_type, ["PRIORITY_NOT_PROVIDED", "HIGH", "MEDIUM", "NORMAL", "LOW"])
50         VESProtobuf.create_field(message_type, 1, "version", descriptor.FieldDescriptor.TYPE_STRING)
51         VESProtobuf.create_field(message_type, 2, "domain", descriptor.FieldDescriptor.TYPE_STRING)
52         VESProtobuf.create_field(message_type, 3, "sequence", descriptor.FieldDescriptor.TYPE_UINT32)
53         VESProtobuf.create_enum_field(message_type, 4, "priority", "Priority")
54         VESProtobuf.create_field(message_type, 5, "eventId", descriptor.FieldDescriptor.TYPE_STRING)
55         VESProtobuf.create_field(message_type, 6, "eventName", descriptor.FieldDescriptor.TYPE_STRING)
56         VESProtobuf.create_field(message_type, 7, "eventType", descriptor.FieldDescriptor.TYPE_STRING)
57         VESProtobuf.create_field(message_type, 8, "lastEpochMicrosec", descriptor.FieldDescriptor.TYPE_UINT64)
58         VESProtobuf.create_field(message_type, 9, "startEpochMicrosec", descriptor.FieldDescriptor.TYPE_UINT64)
59         VESProtobuf.create_field(message_type, 10, "nfNamingCode", descriptor.FieldDescriptor.TYPE_STRING)
60         VESProtobuf.create_field(message_type, 11, "nfcNamingCode", descriptor.FieldDescriptor.TYPE_STRING)
61         VESProtobuf.create_field(message_type, 12, "nfVendorName", descriptor.FieldDescriptor.TYPE_STRING)
62         VESProtobuf.create_field(message_type, 13, "reportingEntityId", descriptor.FieldDescriptor.TYPE_BYTES)
63         VESProtobuf.create_field(message_type, 14, "reportingEntityName", descriptor.FieldDescriptor.TYPE_STRING)
64         VESProtobuf.create_field(message_type, 15, "sourceId", descriptor.FieldDescriptor.TYPE_BYTES)
65         VESProtobuf.create_field(message_type, 16, "sourceName", descriptor.FieldDescriptor.TYPE_STRING)
66         VESProtobuf.create_field(message_type, 17, "timeZoneOffset", descriptor.FieldDescriptor.TYPE_STRING)
67         VESProtobuf.create_field(message_type, 18, "vesEventListenerVersion",
68                                  descriptor.FieldDescriptor.TYPE_STRING)
69         return message_type
70
71     @staticmethod
72     def create_enum_type(desc_proto, name):
73         enum_type = desc_proto.enum_type.add()
74         enum_type.name = name
75         return enum_type
76
77     @staticmethod
78     def create_enum_type_value(enum_type, value_list):
79         for i in range(len(value_list)):
80             enum_type_val = enum_type.value.add()
81             enum_type_val.name = value_list[i]
82             enum_type_val.number = i
83
84     @staticmethod
85     def create_field(desc_proto, number, name, field_type):
86         field = desc_proto.field.add()
87         field.number = number
88         field.name = name
89         field.type = field_type
90
91     @staticmethod
92     def create_enum_field(desc_proto, number, name, type_name):
93         field = desc_proto.field.add()
94         field.number = number
95         field.name = name
96         field.type = descriptor.FieldDescriptor.TYPE_ENUM
97         field.type_name = type_name
98
99     @staticmethod
100     def create_message_field(desc_proto, number, name, type_name):
101         field = desc_proto.field.add()
102         field.number = number
103         field.name = name
104         field.type = descriptor.FieldDescriptor.TYPE_MESSAGE
105         field.type_name = type_name
106
107     @staticmethod
108     def get_message_definitions():
109         messages = message_factory.GetMessages((VESProtobuf.create_ves_event(),))
110         message_factory._FACTORY = message_factory.MessageFactory()
111         return messages
112
113     def binary_to_json(self, binary_message):
114         ves = self.message_descriptors['VesEvent']()
115         ves.MergeFromString(binary_message)
116         json = MessageToJson(ves)
117         return json