summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCole Robinson <crobinso@redhat.com>2013-06-19 00:18:16 (GMT)
committerCole Robinson <crobinso@redhat.com>2013-06-19 16:36:57 (GMT)
commita782282ee479ba4cc1b8b1d89700ac630ba83eef (patch)
treebde10af82b46f31c22a68eeeb1a93ef97594bad6
parentf7498b03dab9c534e399bcca60b3246746add4fe (diff)
downloadpython-bugzilla-a782282ee479ba4cc1b8b1d89700ac630ba83eef.zip
python-bugzilla-a782282ee479ba4cc1b8b1d89700ac630ba83eef.tar.gz
python-bugzilla-a782282ee479ba4cc1b8b1d89700ac630ba83eef.tar.xz
CVE-2013-2191: Switch to pycurl to get SSL host and cert validation
Right now python-bugzilla will happily allow connecting to a host with a self signed SSL certificate, or hostname that doesn't match the cert. This isn't a safe default. Standard python libs don't handle these cases, but pycurl does. So we switch to pycurl for the transport layer. Add a --nosslverify CLI switch to turn off this functionality if the user chooses. Thanks to Tomas Hoger for much of the sample code.
-rwxr-xr-xbin/bugzilla25
-rw-r--r--bugzilla/__init__.py2
-rw-r--r--bugzilla/base.py139
-rw-r--r--python-bugzilla.spec1
-rw-r--r--tests/ro_functional.py8
5 files changed, 120 insertions, 55 deletions
diff --git a/bin/bugzilla b/bin/bugzilla
index 7129d03..5763334 100755
--- a/bin/bugzilla
+++ b/bin/bugzilla
@@ -90,6 +90,9 @@ def setup_parser():
p.add_option('--bztype', default='auto',
help="Bugzilla type. Autodetected if not set. "
"Available types: %s" % " ".join(bugzilla.classlist))
+ p.add_option("--nosslverify", dest="sslverify",
+ action="store_false", default=True,
+ help="Don't error on invalid bugzilla SSL certificate")
p.add_option('--user',
help="username")
p.add_option('--password',
@@ -1024,7 +1027,8 @@ def main(bzinstance=None):
bz = bzinstance
else:
bz = bzclass(url=global_opt.bugzilla,
- cookiefile=global_opt.cookiefile or -1)
+ cookiefile=global_opt.cookiefile or -1,
+ sslverify=global_opt.sslverify)
# Handle 'login' action
@@ -1126,7 +1130,7 @@ if __name__ == '__main__':
main()
except KeyboardInterrupt:
log.debug("", exc_info=True)
- print "\ninterrupted."
+ print "\nExited at user request."
sys.exit(1)
except socket.error, e:
log.debug("", exc_info=True)
@@ -1139,8 +1143,19 @@ if __name__ == '__main__':
except xmlrpclib.ProtocolError, e:
log.debug("", exc_info=True)
print "\nInvalid server response: %d %s" % (e.errcode, e.errmsg)
- redir = e.headers.getheader("location", 0)
+
+ # Give SSL recommendations
+ import pycurl
+ sslerrcodes = [getattr(pycurl, ename) for ename in dir(pycurl) if
+ ename.startswith("E_SSL")]
+ if e.errcode in sslerrcodes:
+ print ("\nIf you trust the remote server, you can work "
+ "around this error with:\n"
+ " bugzilla --nosslverify ...")
+
+ # Detect redirect
+ redir = (e.headers and e.headers.getheader("location", 0) or None)
if redir:
- print "Server was attempting a redirect."
- print 'Try "bugzilla --bugzilla %s ..."' % redir
+ print ("\nServer was attempting a redirect. Try: "
+ " bugzilla --bugzilla %s ..." % redir)
sys.exit(4)
diff --git a/bugzilla/__init__.py b/bugzilla/__init__.py
index 44dbb4d..49f8cbe 100644
--- a/bugzilla/__init__.py
+++ b/bugzilla/__init__.py
@@ -97,7 +97,7 @@ class Bugzilla(object):
def __init__(self, **kwargs):
log.info("Bugzilla v%s initializing" % __version__)
if 'url' not in kwargs:
- raise TypeError("You must pass a valid bugzilla xmlrpc.cgi URL")
+ raise TypeError("You must pass a valid bugzilla URL")
# pylint: disable=W0233
# Use of __init__ of non parent class
diff --git a/bugzilla/base.py b/bugzilla/base.py
index 9035c00..b0f90dd 100644
--- a/bugzilla/base.py
+++ b/bugzilla/base.py
@@ -11,9 +11,13 @@
import cookielib
import os
+import StringIO
import urllib2
+import urlparse
import xmlrpclib
+import pycurl
+
from bugzilla import __version__, log
from bugzilla.bug import _Bug, _User
@@ -95,43 +99,73 @@ def _build_cookiejar(cookiefile):
return retcj
-# CookieTransport code mostly borrowed from pybugz
-class _CookieTransport(xmlrpclib.Transport):
- def __init__(self, uri, cookiejar, use_datetime=0):
- self.verbose = 0
-
- # python 2.4 compat
+class _CURLTransport(xmlrpclib.Transport):
+ def __init__(self, url, cookiejar,
+ sslverify=True, sslcafile=None, debug=0):
if hasattr(xmlrpclib.Transport, "__init__"):
- xmlrpclib.Transport.__init__(self, use_datetime=use_datetime)
+ xmlrpclib.Transport.__init__(self, use_datetime=False)
+
+ self.verbose = debug
+
+ # transport constructor needs full url too, as xmlrpc does not pass
+ # scheme to request
+ self.scheme = urlparse.urlparse(url)[0]
+ if self.scheme not in ["http", "https"]:
+ raise Exception("Invalid URL scheme: %s (%s)" % (self.scheme, url))
+
+ self.c = pycurl.Curl()
+ self.c.setopt(pycurl.POST, 1)
+ self.c.setopt(pycurl.CONNECTTIMEOUT, 30)
+ self.c.setopt(pycurl.HTTPHEADER, [
+ "Content-Type: text/xml",
+ ])
+ self.c.setopt(pycurl.VERBOSE, debug)
+
+ self.set_cookiejar(cookiejar)
+
+ # ssl settings
+ if self.scheme == "https":
+ # override curl built-in ca file setting
+ if sslcafile is not None:
+ self.c.setopt(pycurl.CAINFO, sslcafile)
+
+ # disable ssl verification
+ if not sslverify:
+ self.c.setopt(pycurl.SSL_VERIFYPEER, 0)
+ self.c.setopt(pycurl.SSL_VERIFYHOST, 0)
+
+ def set_cookiejar(self, cj):
+ self.c.setopt(pycurl.COOKIEFILE, cj.filename or "")
+ self.c.setopt(pycurl.COOKIEJAR, cj.filename or "")
+
+ def get_cookies(self):
+ return self.c.getinfo(pycurl.INFO_COOKIELIST)
+
+ def open_helper(self, url, request_body):
+ self.c.setopt(pycurl.URL, url)
+ self.c.setopt(pycurl.POSTFIELDS, request_body)
+
+ b = StringIO.StringIO()
+ self.c.setopt(pycurl.WRITEFUNCTION, b.write)
+ try:
+ self.c.perform()
+ except pycurl.error, e:
+ raise xmlrpclib.ProtocolError(url, e[0], e[1], None)
- self.uri = uri
- self.opener = urllib2.build_opener()
- self.opener.add_handler(urllib2.HTTPCookieProcessor(cookiejar))
+ b.seek(0)
+ return b
def request(self, host, handler, request_body, verbose=0):
- req = urllib2.Request(self.uri)
- req.add_header('User-Agent', self.user_agent)
- req.add_header('Content-Type', 'text/xml')
+ self.verbose = verbose
+ url = "%s://%s%s" % (self.scheme, host, handler)
- if hasattr(self, 'accept_gzip_encoding') and self.accept_gzip_encoding:
- req.add_header('Accept-Encoding', 'gzip')
+ # xmlrpclib fails to escape \r
+ request_body = request_body.replace('\r', '&#xd;')
- req.add_data(request_body)
+ stringio = self.open_helper(url, request_body)
+ return self.parse_response(stringio)
- resp = self.opener.open(req)
- # In Python 2, resp is a urllib.addinfourl instance, which does not
- # have the getheader method that parse_response expects.
- if not hasattr(resp, 'getheader'):
- resp.getheader = resp.headers.getheader
-
- if resp.code == 200:
- self.verbose = verbose
- return self.parse_response(resp)
-
- resp.close()
- raise xmlrpclib.ProtocolError(self.uri, resp.status,
- resp.reason, resp.msg)
class BugzillaError(Exception):
@@ -186,8 +220,6 @@ class BugzillaBase(object):
Given a big huge bugzilla query URL, returns a query dict that can
be passed along to the Bugzilla.query() method.
'''
- import urlparse
-
q = {}
(ignore, ignore, path,
ignore, query, ignore) = urlparse.urlparse(url)
@@ -219,13 +251,16 @@ class BugzillaBase(object):
url = url + '/xmlrpc.cgi'
return url
- def __init__(self, url=None, user=None, password=None, cookiefile=-1):
+ def __init__(self, url=None, user=None, password=None, cookiefile=-1,
+ sslverify=True):
# Settings the user might want to tweak
self.user = user or ''
self.password = password or ''
self.url = ''
+ self._transport = None
self._cookiejar = None
+ self._sslverify = bool(sslverify)
self.logged_in = False
@@ -371,9 +406,11 @@ class BugzillaBase(object):
url = self.url
url = self.fix_url(url)
- transport = _CookieTransport(url, self._cookiejar)
- transport.user_agent = self.user_agent
- self._proxy = xmlrpclib.ServerProxy(url, transport)
+ self._transport = _CURLTransport(url, self._cookiejar,
+ sslverify=self._sslverify)
+ self._transport.user_agent = self.user_agent
+ self._proxy = xmlrpclib.ServerProxy(url, self._transport)
+
self.url = url
# we've changed URLs - reload config
@@ -431,8 +468,6 @@ class BugzillaBase(object):
except xmlrpclib.Fault:
r = False
- if r and self._cookiejar.filename is not None:
- self._cookiejar.save()
return r
def logout(self):
@@ -1178,18 +1213,32 @@ class BugzillaBase(object):
'''Get the contents of the attachment with the given attachment ID.
Returns a file-like object.'''
att_uri = self._attachment_uri(attachid)
- opener = urllib2.build_opener(
- urllib2.HTTPCookieProcessor(self._cookiejar))
- att = opener.open(att_uri)
- # RFC 2183 defines the content-disposition header, if you're curious
- disp = att.headers['content-disposition'].split(';')
+ headers = {}
+ ret = StringIO.StringIO()
+
+ def headers_cb(buf):
+ if not ":" in buf:
+ return
+ name, val = buf.split(":", 1)
+ headers[name.lower()] = val
+
+ c = pycurl.Curl()
+ c.setopt(pycurl.URL, att_uri)
+ c.setopt(pycurl.WRITEFUNCTION, ret.write)
+ c.setopt(pycurl.HEADERFUNCTION, headers_cb)
+ c.setopt(pycurl.COOKIEFILE, self._cookiejar.filename or "")
+ c.perform()
+ c.close()
+
+ disp = headers['content-disposition'].split(';')
disp.pop(0)
parms = dict([p.strip().split("=", 1) for p in disp])
- # Parameter values can be quoted/encoded as per RFC 2231
- att.name = _decode_rfc2231_value(parms['filename'])
+ ret.name = _decode_rfc2231_value(parms['filename'])
+
# Hooray, now we have a file-like object with .read() and .name
- return att
+ ret.seek(0)
+ return ret
def updateattachmentflags(self, bugid, attachid, flagname, **kwargs):
'''
diff --git a/python-bugzilla.spec b/python-bugzilla.spec
index 595ba94..4f27907 100644
--- a/python-bugzilla.spec
+++ b/python-bugzilla.spec
@@ -19,6 +19,7 @@ BuildRequires: python-setuptools
BuildRequires: python-setuptools-devel
%endif
+Requires: python-pycurl
Requires: python-magic
%description
diff --git a/tests/ro_functional.py b/tests/ro_functional.py
index aa4c754..26de9e7 100644
--- a/tests/ro_functional.py
+++ b/tests/ro_functional.py
@@ -74,13 +74,13 @@ class BaseTest(unittest.TestCase):
return
self.assertTrue(len(out.splitlines()) >= mincount)
- self.assertTrue(any([l.startswith("#" + expectbug)
- for l in out.splitlines()]))
+ self.assertTrue(bool([l for l in out.splitlines() if
+ l.startswith("#" + expectbug)]))
# Check --ids output option
out2 = self.clicomm(cli + " --ids")
self.assertTrue(len(out.splitlines()) == len(out2.splitlines()))
- self.assertTrue(any([l == expectbug for l in out2.splitlines()]))
+ self.assertTrue(bool([l for l in out2.splitlines() if l == expectbug]))
def _testQueryFull(self, bugid, mincount, expectstr):
@@ -145,7 +145,7 @@ class BZ34(BaseTest):
class BZ42(BaseTest):
- url = "https://bugzilla.freedesktop.org/xmlrpc.cgi"
+ url = "https://bugs.freedesktop.org/xmlrpc.cgi"
bzclass = bugzilla.Bugzilla4
closestatus = "CLOSED,RESOLVED"