Python - HTTP send a JSON object and decoding example

1 minute read

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import http.client
import ssl
import json
import base64
from collections import namedtuple
from optparse import OptionParser
def load_input_data(input_path):
    with open(input_path) as inputFile:
        return inputFile.read()
def string_to_base64(s):
    return base64.b64encode(s.encode('utf-8')).decode('utf-8')
def create_json_data(s, protocol):
    class CustomRequestData(object):
        def __init__(self):
            self.version = "0.1"
            self.description = "test"
            self.enableDebug = True
            self.protocol = protocol
            self.requestPayload = string_to_base64(s)
    request = CustomRequestData()
    return json.dumps(request.__dict__)
def create_http_header():
    password = string_to_base64("user:password")
    headers = {
        'content-type': "application/json",
        'authorization': "Basic " + password
    }
    return headers
def base64_to_string(b):
    return base64.b64decode(b).decode('utf-8')
def _json_object_hook(d):
    return namedtuple('X', d.keys())(*d.values())
def json_to_obj(data):
    return json.loads(data, object_hook=_json_object_hook)
def connect_https_server(host_name):
    port_number = 9992
    host_end_point_address = host_name + ":" + str(port_number)
    return http.client.HTTPSConnection(host_end_point_address, timeout = 5, context = ssl._create_unverified_context())
def send_request_save_response(host_name, protocol, input_edifact_path, output_file_path):
    json_data = create_json_data(load_input_data(input_edifact_path), protocol)
    conn = connect_https_server(host_name)
    conn.request("POST", "/endpoint_data", json_data, create_http_header())
    res = conn.getresponse()
    data = res.read()
    result = json_to_obj(data.decode('utf-8'))
    file = open(output_file_path, 'w')
    file.write(base64_to_string(result.respond_pay_load))
    file.close()
def main():
    parser = OptionParser()
    parser.add_option("-s", "--server",
                      dest="server_name",
                      default="localhost",
                      help="server name of service")
    parser.add_option("-p", "--protocol",
                      dest="protocol_type",
                      default="tcp",
                      help="protocol type")
    parser.add_option("-i", "--input_path",
                      dest="input_path",
                      default="~/input.txt",
                      help="input file path")
    parser.add_option("-o", "--output_path",
                      dest="output_path",
                      default="~/output.txt",
                      help="output file path")
    (options, args) = parser.parse_args()
    send_request_save_response(options.server_name,
                               options.protocol_type,
                               options.input_path,
                               options.output_path)
if __name__ == "__main__":
    main()