Artifact Content

Artifact 52b298d0e3516fe6c4f2a19d4f2d469ab9bb4772:

# -*- coding: latin-1 -*-
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation; either version 3, or (at your option) any later
# version.
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY
# or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
# for more details.

"Pythonic simple SOAP Client implementation"

__author__ = "Mariano Reingart ("
__copyright__ = "Copyright (C) 2008 Mariano Reingart"
__license__ = "LGPL 3.0"
__version__ = "1.02c"

import urllib
    import httplib2
    Http = httplib2.Http
except ImportError:
    import urllib2
    class Http(): # wrapper to use when httplib2 not available
        def request(self, url, method, body, headers):
            f = urllib2.urlopen(urllib2.Request(url, body, headers))

from simplexml import SimpleXMLElement, TYPE_MAP, OrderedDict

class SoapFault(RuntimeError):
    def __init__(self,faultcode,faultstring):
        self.faultcode = faultcode
        self.faultstring = faultstring

# soap protocol specification & namespace
soap_namespaces = dict(

class SoapClient(object):
    "Simple SOAP Client (s�mil PHP)"
    def __init__(self, location = None, action = None, namespace = None,
                 cert = None, trace = False, exceptions = True, proxy = None, ns=False,
                 soap_ns=None, wsdl = None, cache = False):
        self.certssl = cert
        self.keyssl = None
        self.location = location        # server location (url)
        self.action = action            # SOAP base action
        self.namespace = namespace      # message
        self.trace = trace              # show debug messages
        self.exceptions = exceptions    # lanzar execpiones? (Soap Faults)
        self.xml_request = self.xml_response = ''
        if not soap_ns and not ns:
            self.__soap_ns = 'soap' # 1.1
        elif not soap_ns and ns:
            self.__soap_ns = 'soapenv' # 1.2
            self.__soap_ns = soap_ns

        # parse wsdl url = wsdl and self.wsdl(wsdl, debug=trace, cache=cache)
        self.service_port = None                 # service port for late binding

        if not proxy:
            self.http = Http()
            import socks
            self.http = httplib2.Http(proxy_info = httplib2.ProxyInfo(
                proxy_type=socks.PROXY_TYPE_HTTP, **proxy))
        #if self.certssl: # esto funciona para validar al server?
        #    self.http.add_certificate(self.keyssl, self.keyssl, self.certssl)
        self.__ns = ns # namespace prefix or False to not use it
        if not ns:
            self.__xml = """<?xml version="1.0" encoding="UTF-8"?>
<%(soap_ns)s:Envelope xmlns:xsi=""
    <%(method)s xmlns="%(namespace)s">
            self.__xml = """<?xml version="1.0" encoding="UTF-8"?>
<%(soap_ns)s:Envelope xmlns:%(soap_ns)s="%(soap_uri)s" xmlns:%(ns)s="%(namespace)s">

    def __getattr__(self, attr):
        "Return a pseudo-method that can be called"
        if not # not using WSDL?
            return lambda self=self, *args, **kwargs:,*args,**kwargs)
        else: # using WSDL:
            return lambda self=self, *args, **kwargs: self.wsdl_call(attr,*args,**kwargs)

    def call(self, method, *args, **kwargs):
        "Prepare xml request and make SOAP call, returning a SimpleXMLElement"
        #TODO: method != input_message
        # Basic SOAP request:
        xml = self.__xml % dict(method=method, namespace=self.namespace, ns=self.__ns,
                                soap_ns=self.__soap_ns, soap_uri=soap_namespaces[self.__soap_ns])
        request = SimpleXMLElement(xml,namespace=self.__ns and self.namespace, prefix=self.__ns)
        # serialize parameters
        if kwargs:
            parameters = kwargs.items()
            parameters = args
        if parameters and isinstance(parameters[0], SimpleXMLElement):
            # merge xmlelement parameter ("raw" - already marshalled)
            for param in parameters[0].children():
            # marshall parameters:
            for k,v in parameters: # dict: tag=valor
        self.xml_request = request.as_xml()
        self.xml_response = self.send(method, self.xml_request)
        response = SimpleXMLElement(self.xml_response, namespace=self.namespace)
        if self.exceptions and response("Fault", ns=soap_namespaces.values(), error=False):
            raise SoapFault(unicode(response.faultcode), unicode(response.faultstring))
        return response

    def send(self, method, xml):
        "Send SOAP request using HTTP"
        if self.location == 'test': return
        location = "%s" % self.location #?op=%s" % (self.location, method)
            soap_action = self.action
            soap_action = self.action+method
                'Content-type': 'text/xml; charset="UTF-8"',
                'Content-length': str(len(xml)),
                "SOAPAction": "\"%s\"" % (soap_action)
        if self.trace:
            print "-"*80
            print "POST %s" % location
            print '\n'.join(["%s: %s" % (k,v) for k,v in headers.items()])
            print u"\n%s" % xml.decode("utf8","ignore")
        response, content = self.http.request(
            location,"POST", body=xml, headers=headers )
        self.response = response
        self.content = content
        if self.trace:
            print '\n'.join(["%s: %s" % (k,v) for k,v in response.items()])
            print content#.decode("utf8","ignore")
            print "="*80
        return content

    def get_operation(self, method):
        # try to find operation in wsdl file
        soap_ver = self.__soap_ns == 'soap12' and 'soap12' or 'soap11'
        if not self.service_port:
            for service_name, service in
                for port_name, port in [port for port in service['ports'].items()]:
                    if port['soap_ver'] == soap_ver:
                        self.service_port = service_name, port_name
                    raise RuntimeError("Cannot determine service in WSDL: "
                                       "SOAP version: %s" % soap_ver)
            port =[self.service_port[0]]['ports'][self.service_port[1]]
        self.location = port['location']
        operation = port['operations'].get(unicode(method))
        if not operation:
            raise RuntimeError("Operation %s not found in WSDL: "
                               "Service/Port Type: %s" %
                               (method, self.service_port))
        return operation

    def wsdl_call(self, method, *args, **kwargs):
        "Pre and post process SOAP call, input and output parameters using WSDL"
        soap_uri = soap_namespaces[self.__soap_ns]
        operation = self.get_operation(method)
        # get i/o type declarations:
        input = operation['input']
        output = operation['output']
        if 'action' in operation:
            self.action = operation['action']
        # sort parameters (same order as xsd:sequence)
        def sort_dict(od, d):
            if isinstance(od, dict):
                ret = OrderedDict()
                for k in od.keys():
                    v = d.get(k)
                    if v:
                        if isinstance(v, dict):
                            v = sort_dict(od[k], v)
                        elif isinstance(v, list):
                            v = [sort_dict(od[k][0], v1)
                                    for v1 in v]
                        ret[str(k)] = v
                return ret
                return d
        if input and kwargs:
            params = sort_dict(input.values()[0], kwargs).items()
            method = input.keys()[0]
        #elif not input:
            #TODO: no message! (see wsmtxca.dummy)
            params = kwargs and kwargs.items()
        # call remote procedure
        response =, *params)
        # parse results:
        resp = response('Body',ns=soap_uri).children().unmarshall(output)
        return resp and resp.values()[0] # pass Response tag children

    def help(self, method):
        "Return operation documentation and invocation/returned value example"
        operation = self.get_operation(method)
        input = operation['input'].values()
        input = input and input[0]
        output = operation['output'].values()[0]
        return u"%s(%s)\n -> %s:\n\n%s" % (
            input and ", ".join("%s=%s" % (k,repr(v)) for k,v
                                 in input.items()) or "",
            output and output or "",

    def wsdl(self, url, debug=False, cache=False):
        "Parse Web Service Description v1.1"
        soap_ns = {
            "": 'soap11',
            "": 'soap12',

        get_local_name = lambda s: str((':' in s) and s.split(':')[1] or s)

        REVERSE_TYPE_MAP = dict([(v,k) for k,v in TYPE_MAP.items()])

        def fetch(url):
            "Fetch a document from a URL, save it locally if cache enabled"
            import os, hashlib
            # make md5 hash of the url for caching...
            filename = "%s.xml" % hashlib.md5(url).hexdigest()
            if isinstance(cache, basestring):
                filename = os.path.join(cache, filename)
            if cache and os.path.exists(filename):
                if debug: print "Reading file %s" % (filename, )
                f = open(filename, "r")
                xml =
                if debug: print "Fetching url %s" % (url, )
                f = urllib.urlopen(url)
                xml =
                if cache:
                    if debug: print "Writing file %s" % (filename, )
                    f = open(filename, "w")
            return xml

        # Open uri and read xml:
        xml = fetch(url)
        # Parse WSDL XML:
        wsdl = SimpleXMLElement(xml, namespace=wsdl_uri)

        # detect soap prefix and uri (xmlns attributes of <definitions>)
        xsd_ns = None
        soap_uris = {}
        for k, v in wsdl[:]:
            if v in soap_ns and k.startswith("xmlns:"):
                soap_uris[get_local_name(k)] = v
            if v== xsd_uri and k.startswith("xmlns:"):
                xsd_ns = get_local_name(k)

        # Extract useful data:
        self.namespace = wsdl['targetNamespace']
        self.documentation = unicode(wsdl('documentation', error=False) or '')

        services = {}
        bindings = {}           # binding_name: binding
        operations = {}         # operation_name: operation
        port_type_bindings = {} # port_type_name: binding
        messages = {}           # message: element
        elements = {}           # element: type def

        for service in wsdl.service:
            if not service_name:
                continue # empty service?
            if debug: print "Processing service", service_name
            serv = services.setdefault(service_name, {'ports': {}})
            serv['documentation']=service['documentation'] or ''
            for port in service.port:
                binding_name = get_local_name(port['binding'])
                address = port('address', ns=soap_uris.values(), error=False)
                location = address and address['location'] or None
                soap_uri = address and soap_uris.get(address.get_prefix())
                soap_ver = soap_uri and soap_ns.get(soap_uri)
                bindings[binding_name] = {'service_name': service_name,
                    'location': location,
                    'soap_uri': soap_uri, 'soap_ver': soap_ver,
                serv['ports'][port['name']] = bindings[binding_name]

        for binding in wsdl.binding:
            binding_name = binding['name']
            if debug: print "Processing binding", service_name
            soap_binding = binding('binding', ns=soap_uris.values(), error=False)
            transport = soap_binding and soap_binding['transport'] or None
            port_type_name = get_local_name(binding['type'])
                'port_type_name': port_type_name,
                'transport': transport, 'operations': {},
            port_type_bindings[port_type_name] = bindings[binding_name]
            for operation in binding.operation:
                op_name = operation['name']
                op = operation('operation',ns=soap_uris.values(), error=False)
                action = op and op['soapAction']
                d = operations.setdefault(op_name, {})
                bindings[binding_name]['operations'][op_name] = d
                d.update({'name': op_name})
                #if action: #TODO: separe operation_binding from operation
                if action:
                    d["action"] = action

        #TODO: cleanup element/schema/types parsing:
        def process_element(element_name, node):
            "Parse and define simple element types"
            if debug: print "Processing element", element_name
            for tag in node:
                if tag.get_local_name() in ("annotation", "documentation"):
                elif tag.get_local_name() in ('element', 'restriction'):
                    if debug: print element_name,"has not children!",tag
                    children = tag # element "alias"?
                    alias = True
                elif tag.children():
                    children = tag.children()
                    alias = False
                    if debug: print element_name,"has not children!",tag
                    continue #TODO: abstract?
                d = OrderedDict()
                for e in children:
                    t = e['type']
                    if not t:
                        t = e['base'] # complexContent (extension)!
                    if not t:
                        t = 'anyType' # no type given!
                    t = t.split(":")
                    if len(t)>1:
                        ns, type_name = t
                        ns, type_name = None, t[0]
                    if element_name == type_name:
                        continue # prevent infinite recursion
                    uri = ns and e.get_namespace_uri(ns) or xsd_uri
                    if uri==xsd_uri:
                        # look for the type, None == any
                        fn = REVERSE_TYPE_MAP.get(unicode(type_name), None)
                        # complex type, postprocess later
                        fn = elements.setdefault(unicode(type_name), OrderedDict())
                    if e['name'] is not None and not alias:
                        e_name = unicode(e['name'])
                        d[e_name] = fn
                        if debug: print "complexConent/simpleType/element", element_name, "=", type_name
                        d[None] = fn
                    if e['maxOccurs']=="unbounded":
                        # it's an array... TODO: compound arrays?
                        d.array = True
                    if e is not None and e.get_local_name() == 'extension' and e.children():
                        # extend base element:
                        process_element(element_name, e.children())
                elements.setdefault(element_name, OrderedDict()).update(d)

        # check axis2 namespace at schema types attributes
        self.namespace = dict(wsdl.types("schema", ns=xsd_uri)[:]).get('targetNamespace', self.namespace)

        imported_schemas = {}

        def preprocess_schema(schema):
            "Find schema elements and complex types"
            for element in schema.children():
                if element.get_local_name() in ('import', ):
                    schema_namespace = element['namespace']
                    schema_location = element['schemaLocation']
                    if schema_location is None:
                        if debug: print "Schema location not provided for %s!" % (schema_namespace, )
                    if schema_location in imported_schemas:
                        if debug: print "Schema %s already imported!" % (schema_location, )
                    imported_schemas[schema_location] = schema_namespace
                    if debug: print "Importing schema %s from %s" % (schema_namespace, schema_location)
                    # Open uri and read xml:
                    xml = fetch(schema_location)
                    # Parse imported XML schema (recursively):
                    imported_schema = SimpleXMLElement(xml, namespace=xsd_uri)

                if element.get_local_name() in ('element', 'complexType', "simpleType"):
                    element_name = unicode(element['name'])
                    if debug: print "Parsing Element %s: %s" % (element.get_local_name(),element_name)
                    if element.get_local_name() == 'complexType':
                        children = element.children()
                    elif element.get_local_name() == 'simpleType':
                        children = element("restriction", ns=xsd_uri)
                    elif element.get_local_name() == 'element' and element['type']:
                        children = element
                        children = element.children()
                        if children:
                            children = children.children()
                        elif element.get_local_name() == 'element':
                            children = element
                    if children:
                        process_element(element_name, children)

        def postprocess_element(elements):
            "Fix unresolved references (elements referenced before its definition, thanks .net)"
            for k,v in elements.items():
                if isinstance(v, OrderedDict):
                    if v.array:
                        elements[k] = [v] # convert arrays to python lists
                    if v!=elements: #TODO: fix recursive elements
                    if None in v and v[None]: # extension base?
                        if isinstance(v[None], dict):
                            for i, kk in enumerate(v[None]):
                                # extend base -keep orginal order-
                                elements[k].insert(kk, v[None][kk], i)
                            del v[None]
                        else:  # "alias", just replace
                            if debug: print "Replacing ", k , " = ", v[None]
                            elements[k] = v[None]
                if isinstance(v, list):
                    for n in v: # recurse list

        # process current wsdl schema:
        for schema in wsdl.types("schema", ns=xsd_uri):


        for message in wsdl.message:
            if debug: print "Processing message", message['name']
            part = message('part', error=False)
            element = {}
            if part:
                element_name = part['element']
                if not element_name:
                    element_name = part['type'] # some uses type instead
                element_name = get_local_name(element_name)
                element = {element_name: elements.get(element_name)}
            messages[message['name']] = element

        for port_type in wsdl.portType:
            port_type_name = port_type['name']
            if debug: print "Processing port type", port_type_name
            binding = port_type_bindings[port_type_name]

            for operation in port_type.operation:
                op_name = operation['name']
                op = operations[op_name]
                op['documentation'] = unicode(operation('documentation', error=False) or '')
                if binding['soap_ver']:
                    #TODO: separe operation_binding from operation (non SOAP?)
                    input = get_local_name(operation.input['message'])
                    output = get_local_name(operation.output['message'])
                    op['input'] = messages[input]
                    op['output'] = messages[output]

        if debug:
            import pprint

        return services

def parse_proxy(proxy_str):
    "Parses proxy address user:pass@host:port into a dict suitable for httplib2"
    proxy_dict = {}
    if proxy_str is None:
    if "@" in proxy_str:
        user_pass, host_port = proxy_str.split("@")
        user_pass, host_port = "", proxy_str
    if ":" in host_port:
        host, port = host_port.split(":")
        proxy_dict['proxy_host'], proxy_dict['proxy_port'] = host, int(port)
    if ":" in user_pass:
        proxy_dict['proxy_user'], proxy_dict['proxy_pass'] = user_pass.split(":")
    return proxy_dict

if __name__=="__main__":
    import sys

    if '--web2py' in sys.argv:
        # test local sample webservice exposed by web2py
        from client import SoapClient
        if not '--wsdl' in sys.argv:
            client = SoapClient(
                location = "",
                action = '', # SOAPAction
                namespace = "",
                soap_ns='soap', trace = True, ns = False, exceptions=True)
            client = SoapClient(wsdl="",trace=True)
        response = client.Dummy()
        print 'dummy', response
        response = client.Echo(value='hola')
        print 'echo', repr(response)
        response = client.AddIntegers(a=1,b=2)
        if not '--wsdl' in sys.argv:
            result = response.AddResult # manully convert returned type
            print int(result)
            result = response['AddResult']
            print result, type(result), "auto-unmarshalled"

    if '--raw' in sys.argv:
        # raw (unmarshalled parameter) local sample webservice exposed by web2py
        from client import SoapClient
        client = SoapClient(
            location = "",
            action = '', # SOAPAction
            namespace = "",
            soap_ns='soap', trace = True, ns = False)
        params = SimpleXMLElement("""<?xml version="1.0" encoding="UTF-8"?><AddIntegers><a>3</a><b>2</b></AddIntegers>""") # manully convert returned type
        response ='AddIntegers',params)
        result = response.AddResult
        print int(result) # manully convert returned type

    if '--ctg' in sys.argv:
        # test AFIP Agriculture webservice
        client = SoapClient(
            location = "",
            action = '', # SOAPAction
            namespace = "",
            trace = True,
            ns = True)
        response = client.dummy()
        result = response.dummyResponse
        print str(result.appserver)
        print str(result.dbserver)
        print str(result.authserver)

    if '--wsfe' in sys.argv:
        # Demo & Test (AFIP Electronic Invoice):
        ta_file = open("TA.xml")
            ta_string =   # read access ticket (
        ta = SimpleXMLElement(ta_string)
        token = str(ta.credentials.token)
        sign = str(ta.credentials.sign)
        cuit = long(20267565393)
        id = 1234
        cbte =199
        client = SoapClient(
            location = "",
            action = '', # SOAPAction
            namespace = "",
            trace = True)
        results = client.FERecuperaQTYRequest(
            argAuth= {"Token": token, "Sign": sign, "cuit":long(cuit)}
        if int(results.FERecuperaQTYRequestResult.RError.percode) != 0:
            print "Percode: %s" % results.FERecuperaQTYRequestResult.RError.percode
            print "MSGerror: %s" % results.FERecuperaQTYRequestResult.RError.perrmsg
            print int(results.FERecuperaQTYRequestResult.qty.value)

    if '--feriados' in sys.argv:
        # Demo & Test: Argentina Holidays (Ministerio del Interior):
        # this webservice seems disabled
        from datetime import datetime, timedelta
        client = SoapClient(
            location = "",
            action = '', # SOAPAction
            namespace = "",
            trace = True)
        dt1 = - timedelta(days=60)
        dt2 = + timedelta(days=60)
        feriadosXML = client.FeriadosEntreFechasas_xml(dt1=dt1.isoformat(), dt2=dt2.isoformat());
        print feriadosXML

    if '--wsdl-parse' in sys.argv:
        client = SoapClient()
        # Test PySimpleSOAP WSDL
        client.wsdl("file:C:/test.wsdl", debug=True)
        # Test Java Axis WSDL:
        # Test .NET 2.0 WSDL:
        # Test JBoss WSDL:

    if '--wsdl-client' in sys.argv:
        client = SoapClient(wsdl='',trace=True)
        results = client.FEXDummy()
        print results['FEXDummyResult']['AppServer']
        print results['FEXDummyResult']['DbServer']
        print results['FEXDummyResult']['AuthServer']
        ta_file = open("TA.xml")
            ta_string =   # read access ticket (
        ta = SimpleXMLElement(ta_string)
        token = str(ta.credentials.token)
        sign = str(ta.credentials.sign)
        response = client.FEXGetCMP(
            Auth={"Token": token, "Sign": sign, "Cuit": 20267565393},
            Cmp={"Tipo_cbte": 19, "Punto_vta": 1, "Cbte_nro": 1})
        result = response['FEXGetCMPResult']
        if False: print result
        if 'FEXErr' in result:
            print "FEXError:", result['FEXErr']['ErrCode'], result['FEXErr']['ErrCode']
        cbt = result['FEXResultGet']
        print cbt['Cae']
        FEX_event = result['FEXEvents']
        print FEX_event['EventCode'], FEX_event['EventMsg']

    if '--wsdl-ctg' in sys.argv:
        client = SoapClient(wsdl='',
                            trace=True, ns = "ctg")
        results = client.dummy()
        print results
        print results['DummyResponse']['appserver']
        print results['DummyResponse']['dbserver']
        print results['DummyResponse']['authserver']
        ta_file = open("TA.xml")
            ta_string =   # read access ticket (
        ta = SimpleXMLElement(ta_string)
        token = str(ta.credentials.token)
        sign = str(ta.credentials.sign)
        response = client.obtenerProvincias(auth={"token":token, "sign":sign, "cuitRepresentado":20267565393})
        print "response=",response
        for ret in response:
            print ret['return']['codigoProvincia'], ret['return']['descripcionProvincia'].encode("latin1")
        prueba = dict(numeroCartaDePorte=512345678, codigoEspecie=23,
                cuitRemitenteComercial=20267565393, cuitDestino=20267565393, cuitDestinatario=20267565393,
                codigoLocalidadOrigen=3058, codigoLocalidadDestino=3059,
                codigoCosecha='0910', pesoNetoCarga=1000, cantHoras=1,
                patenteVehiculo='CZO985', cuitTransportista=20267565393,
                numeroCTG="43816783", transaccion='10000001681', observaciones='',

        response = client.solicitarCTG(
            auth={"token": token, "sign": sign, "cuitRepresentado": 20267565393},
            solicitarCTGRequest= prueba)

        print response['return']['numeroCTG']

    ##print parse_proxy(None)
    ##print parse_proxy("host:1234")
    ##print parse_proxy("user:pass@host:1234")