Update INFO.yaml with new PTL
[testsuite/python-testing-utils.git] / robotframework-onap / ONAPLibrary / SocketKeywords.py
1 # Copyright 2019 AT&T Intellectual Property. All rights reserved.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import socket
16 import ssl
17 from robot.api.deco import keyword
18
19
20 class SocketKeywords(object):
21     """SocketKeywords are common resource for simple socket keywords."""
22
23     def __init__(self):
24         super(SocketKeywords, self).__init__()
25
26     @keyword
27     def send_binary_data(self, host, port, data, ssl_enabled=None, cert_required=None, ca=None, cert=None, key=None):
28         """ send raw bytes over tcp socket with optional ssl """
29         if ssl_enabled:
30             if cert_required:
31                 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
32                 context.verify_mode = ssl.CERT_REQUIRED
33                 # Load CA cert
34                 context.load_verify_locations(str(ca))
35                 # Load Client cert and key
36                 context.load_cert_chain(str(cert), str(key))
37             else:
38                 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
39                 context.verify_mode = ssl.CERT_OPTIONAL
40             sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
41             ssock = context.wrap_socket(sock, server_hostname=str(host))
42             # Connect to server over ssl and send data
43             ssock.connect((str(host), int(port)))
44             ssock.sendall(bytes(data))
45             ssock.close()
46         else:
47             sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
48             # Connect to server and send data
49             sock.connect((str(host), int(port)))
50             sock.sendall(bytes(data))
51             sock.close()