1 # Test the support for SSL and sockets
5 from test import test_support
14 import urllib, urlparse
18 from BaseHTTPServer import HTTPServer
19 from SimpleHTTPServer import SimpleHTTPRequestHandler
21 # Optionally test SSL support, if we have it in the tested platform
28 HOST = test_support.HOST
30 SVN_PYTHON_ORG_ROOT_CERT = None
32 def handle_error(prefix):
33 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
34 if test_support.verbose:
35 sys.stdout.write(prefix + exc_format)
37 def testSimpleSSLwrap(self):
39 ssl.sslwrap_simple(socket.socket(socket.AF_INET))
41 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
46 ssl.sslwrap_simple(socket.socket(socket.AF_INET)._sock)
48 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
53 class BasicTests(unittest.TestCase):
55 def testSSLconnect(self):
56 if not test_support.is_resource_enabled('network'):
58 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
59 cert_reqs=ssl.CERT_NONE)
60 s.connect(("svn.python.org", 443))
63 raise test_support.TestFailed("Peer cert %s shouldn't be here!")
66 # this should fail because we have no verification certs
67 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
68 cert_reqs=ssl.CERT_REQUIRED)
70 s.connect(("svn.python.org", 443))
76 def testCrucialConstants(self):
87 if test_support.verbose:
88 sys.stdout.write("\n RAND_status is %d (%s)\n"
89 % (v, (v and "sufficient randomness") or
90 "insufficient randomness"))
96 print "didn't raise TypeError"
97 ssl.RAND_add("this is a random string", 75.0)
99 def testParseCert(self):
100 # note that this uses an 'unofficial' function in _ssl.c,
101 # provided solely for this test, to exercise the certificate
103 p = ssl._ssl._test_decode_cert(CERTFILE, False)
104 if test_support.verbose:
105 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
107 def testDERtoPEM(self):
109 pem = open(SVN_PYTHON_ORG_ROOT_CERT, 'r').read()
110 d1 = ssl.PEM_cert_to_DER_cert(pem)
111 p2 = ssl.DER_cert_to_PEM_cert(d1)
112 d2 = ssl.PEM_cert_to_DER_cert(p2)
114 raise test_support.TestFailed("PEM-to-DER or DER-to-PEM translation failed")
116 class NetworkedTests(unittest.TestCase):
118 def testConnect(self):
119 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
120 cert_reqs=ssl.CERT_NONE)
121 s.connect(("svn.python.org", 443))
124 raise test_support.TestFailed("Peer cert %s shouldn't be here!")
127 # this should fail because we have no verification certs
128 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
129 cert_reqs=ssl.CERT_REQUIRED)
131 s.connect(("svn.python.org", 443))
137 # this should succeed because we specify the root cert
138 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
139 cert_reqs=ssl.CERT_REQUIRED,
140 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
142 s.connect(("svn.python.org", 443))
143 except ssl.SSLError, x:
144 raise test_support.TestFailed("Unexpected exception %s" % x)
149 def testNonBlockingHandshake(self):
150 s = socket.socket(socket.AF_INET)
151 s.connect(("svn.python.org", 443))
153 s = ssl.wrap_socket(s,
154 cert_reqs=ssl.CERT_NONE,
155 do_handshake_on_connect=False)
162 except ssl.SSLError, err:
163 if err.args[0] == ssl.SSL_ERROR_WANT_READ:
164 select.select([s], [], [])
165 elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
166 select.select([], [s], [])
170 if test_support.verbose:
171 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
173 def testFetchServerCert(self):
175 pem = ssl.get_server_certificate(("svn.python.org", 443))
177 raise test_support.TestFailed("No server certificate on svn.python.org:443!")
180 pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE)
185 raise test_support.TestFailed("Got server certificate %s for svn.python.org!" % pem)
187 pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
189 raise test_support.TestFailed("No server certificate on svn.python.org:443!")
190 if test_support.verbose:
191 sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem)
197 _have_threads = False
202 class ThreadedEchoServer(threading.Thread):
204 class ConnectionHandler(threading.Thread):
206 """A mildly complicated class, because we want it to work both
207 with and without the SSL wrapper around the socket connection, so
208 that we can test the STARTTLS functionality."""
210 def __init__(self, server, connsock):
214 self.sock.setblocking(1)
216 threading.Thread.__init__(self)
219 def show_conn_details(self):
220 if self.server.certreqs == ssl.CERT_REQUIRED:
221 cert = self.sslconn.getpeercert()
222 if test_support.verbose and self.server.chatty:
223 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
224 cert_binary = self.sslconn.getpeercert(True)
225 if test_support.verbose and self.server.chatty:
226 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
227 cipher = self.sslconn.cipher()
228 if test_support.verbose and self.server.chatty:
229 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
231 def wrap_conn (self):
233 self.sslconn = ssl.wrap_socket(self.sock, server_side=True,
234 certfile=self.server.certificate,
235 ssl_version=self.server.protocol,
236 ca_certs=self.server.cacerts,
237 cert_reqs=self.server.certreqs)
239 if self.server.chatty:
240 handle_error("\n server: bad connection attempt from " +
241 str(self.sock.getpeername()) + ":\n")
243 if not self.server.expect_bad_connects:
244 # here, we want to stop the server, because this shouldn't
245 # happen in the context of our test case
247 # normally, we'd just stop here, but for the test
248 # harness, we want to stop the server
257 return self.sslconn.read()
259 return self.sock.recv(1024)
261 def write(self, bytes):
263 return self.sslconn.write(bytes)
265 return self.sock.send(bytes)
271 self.sock._sock.close()
275 if not self.server.starttls_server:
276 if isinstance(self.sock, ssl.SSLSocket):
277 self.sslconn = self.sock
278 elif not self.wrap_conn():
280 self.show_conn_details()
285 # eof, so quit this handler
288 elif msg.strip() == 'over':
289 if test_support.verbose and self.server.connectionchatty:
290 sys.stdout.write(" server: client closed connection\n")
293 elif self.server.starttls_server and msg.strip() == 'STARTTLS':
294 if test_support.verbose and self.server.connectionchatty:
295 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
297 if not self.wrap_conn():
299 elif self.server.starttls_server and self.sslconn and msg.strip() == 'ENDTLS':
300 if test_support.verbose and self.server.connectionchatty:
301 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
303 self.sslconn.unwrap()
305 if test_support.verbose and self.server.connectionchatty:
306 sys.stdout.write(" server: connection is now unencrypted...\n")
308 if (test_support.verbose and
309 self.server.connectionchatty):
310 ctype = (self.sslconn and "encrypted") or "unencrypted"
311 sys.stdout.write(" server: read %s (%s), sending back %s (%s)...\n"
312 % (repr(msg), ctype, repr(msg.lower()), ctype))
313 self.write(msg.lower())
315 if self.server.chatty:
316 handle_error("Test server failure:\n")
319 # normally, we'd just stop here, but for the test
320 # harness, we want to stop the server
325 def __init__(self, certificate, ssl_version=None,
326 certreqs=None, cacerts=None, expect_bad_connects=False,
327 chatty=True, connectionchatty=False, starttls_server=False,
328 wrap_accepting_socket=False):
330 if ssl_version is None:
331 ssl_version = ssl.PROTOCOL_TLSv1
333 certreqs = ssl.CERT_NONE
334 self.certificate = certificate
335 self.protocol = ssl_version
336 self.certreqs = certreqs
337 self.cacerts = cacerts
338 self.expect_bad_connects = expect_bad_connects
340 self.connectionchatty = connectionchatty
341 self.starttls_server = starttls_server
342 self.sock = socket.socket()
344 if wrap_accepting_socket:
345 self.sock = ssl.wrap_socket(self.sock, server_side=True,
346 certfile=self.certificate,
347 cert_reqs = self.certreqs,
348 ca_certs = self.cacerts,
349 ssl_version = self.protocol)
350 if test_support.verbose and self.chatty:
351 sys.stdout.write(' server: wrapped server socket as %s\n' % str(self.sock))
352 self.port = test_support.bind_port(self.sock)
354 threading.Thread.__init__(self)
357 def start (self, flag=None):
359 threading.Thread.start(self)
362 self.sock.settimeout(0.5)
370 newconn, connaddr = self.sock.accept()
371 if test_support.verbose and self.chatty:
372 sys.stdout.write(' server: new connection from '
373 + str(connaddr) + '\n')
374 handler = self.ConnectionHandler(self, newconn)
376 except socket.timeout:
378 except KeyboardInterrupt:
382 handle_error("Test server failure:\n")
388 class AsyncoreEchoServer(threading.Thread):
390 class EchoServer (asyncore.dispatcher):
392 class ConnectionHandler (asyncore.dispatcher_with_send):
394 def __init__(self, conn, certfile):
395 asyncore.dispatcher_with_send.__init__(self, conn)
396 self.socket = ssl.wrap_socket(conn, server_side=True,
398 do_handshake_on_connect=True)
401 if isinstance(self.socket, ssl.SSLSocket):
402 while self.socket.pending() > 0:
403 self.handle_read_event()
406 def handle_read(self):
407 data = self.recv(1024)
408 self.send(data.lower())
410 def handle_close(self):
412 if test_support.verbose:
413 sys.stdout.write(" server: closed connection %s\n" % self.socket)
415 def handle_error(self):
418 def __init__(self, certfile):
419 self.certfile = certfile
420 asyncore.dispatcher.__init__(self)
421 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
422 self.port = test_support.bind_port(self.socket)
425 def handle_accept(self):
426 sock_obj, addr = self.accept()
427 if test_support.verbose:
428 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
429 self.ConnectionHandler(sock_obj, self.certfile)
431 def handle_error(self):
434 def __init__(self, certfile):
437 self.server = self.EchoServer(certfile)
438 self.port = self.server.port
439 threading.Thread.__init__(self)
443 return "<%s %s>" % (self.__class__.__name__, self.server)
445 def start (self, flag=None):
447 threading.Thread.start(self)
463 class SocketServerHTTPSServer(threading.Thread):
465 class HTTPSServer(HTTPServer):
467 def __init__(self, server_address, RequestHandlerClass, certfile):
469 HTTPServer.__init__(self, server_address, RequestHandlerClass)
470 # we assume the certfile contains both private key and certificate
471 self.certfile = certfile
473 self.active_lock = threading.Lock()
474 self.allow_reuse_address = True
477 return ('<%s %s:%s>' %
478 (self.__class__.__name__,
482 def get_request (self):
483 # override this to wrap socket with SSL
484 sock, addr = self.socket.accept()
485 sslconn = ssl.wrap_socket(sock, server_side=True,
486 certfile=self.certfile)
489 # The methods overridden below this are mainly so that we
490 # can run it in a thread and be able to stop it from another
491 # You probably wouldn't need them in other uses.
493 def server_activate(self):
494 # We want to run this in a thread for testing purposes,
495 # so we override this to set timeout, so that we get
496 # a chance to stop the server
497 self.socket.settimeout(0.5)
498 HTTPServer.server_activate(self)
500 def serve_forever(self):
501 # We want this to run in a thread, so we use a slightly
502 # modified version of "forever".
506 # We need to lock while handling the request.
507 # Another thread can close the socket after self.active
508 # has been checked and before the request is handled.
509 # This causes an exception when using the closed socket.
510 with self.active_lock:
513 self.handle_request()
514 except socket.timeout:
516 except KeyboardInterrupt:
520 sys.stdout.write(''.join(traceback.format_exception(*sys.exc_info())))
524 def server_close(self):
525 # Again, we want this to run in a thread, so we need to override
526 # close to clear the "active" flag, so that serve_forever() will
528 with self.active_lock:
529 HTTPServer.server_close(self)
532 class RootedHTTPRequestHandler(SimpleHTTPRequestHandler):
534 # need to override translate_path to get a known root,
535 # instead of using os.curdir, since the test could be
538 server_version = "TestHTTPS/1.0"
542 def translate_path(self, path):
543 """Translate a /-separated PATH to the local filename syntax.
545 Components that mean special things to the local file system
546 (e.g. drive or directory names) are ignored. (XXX They should
547 probably be diagnosed.)
550 # abandon query parameters
551 path = urlparse.urlparse(path)[2]
552 path = os.path.normpath(urllib.unquote(path))
553 words = path.split('/')
554 words = filter(None, words)
557 drive, word = os.path.splitdrive(word)
558 head, word = os.path.split(word)
559 if word in self.root: continue
560 path = os.path.join(path, word)
563 def log_message(self, format, *args):
565 # we override this to suppress logging unless "verbose"
567 if test_support.verbose:
568 sys.stdout.write(" server (%s:%d %s):\n [%s] %s\n" %
569 (self.server.server_address,
570 self.server.server_port,
571 self.request.cipher(),
572 self.log_date_time_string(),
576 def __init__(self, certfile):
579 self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0]
580 self.port = test_support.find_unused_port()
581 self.server = self.HTTPSServer(
582 (HOST, self.port), self.RootedHTTPRequestHandler, certfile)
583 threading.Thread.__init__(self)
587 return "<%s %s>" % (self.__class__.__name__, self.server)
589 def start (self, flag=None):
591 threading.Thread.start(self)
597 self.server.serve_forever()
602 self.server.server_close()
605 def badCertTest (certfile):
606 server = ThreadedEchoServer(CERTFILE,
607 certreqs=ssl.CERT_REQUIRED,
608 cacerts=CERTFILE, chatty=False)
609 flag = threading.Event()
611 # wait for it to start
616 s = ssl.wrap_socket(socket.socket(),
618 ssl_version=ssl.PROTOCOL_TLSv1)
619 s.connect((HOST, server.port))
620 except ssl.SSLError, x:
621 if test_support.verbose:
622 sys.stdout.write("\nSSLError is %s\n" % x[1])
623 except socket.error, x:
624 if test_support.verbose:
625 sys.stdout.write("\nsocket.error is %s\n" % x[1])
627 raise test_support.TestFailed(
628 "Use of invalid cert should have failed!")
633 def serverParamsTest (certfile, protocol, certreqs, cacertsfile,
634 client_certfile, client_protocol=None, indata="FOO\n",
635 chatty=True, connectionchatty=False,
636 wrap_accepting_socket=False):
638 server = ThreadedEchoServer(certfile,
640 ssl_version=protocol,
643 connectionchatty=connectionchatty,
644 wrap_accepting_socket=wrap_accepting_socket)
645 flag = threading.Event()
647 # wait for it to start
650 if client_protocol is None:
651 client_protocol = protocol
654 s = ssl.wrap_socket(socket.socket(),
655 certfile=client_certfile,
656 ca_certs=cacertsfile,
658 ssl_version=client_protocol)
659 s.connect((HOST, server.port))
660 except ssl.SSLError, x:
661 raise test_support.TestFailed("Unexpected SSL error: " + str(x))
663 raise test_support.TestFailed("Unexpected exception: " + str(x))
666 if test_support.verbose:
668 " client: sending %s...\n" % (repr(indata)))
672 if test_support.verbose:
673 sys.stdout.write(" client: read %s\n" % repr(outdata))
674 if outdata != indata.lower():
675 raise test_support.TestFailed(
676 "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
677 % (outdata[:min(len(outdata),20)], len(outdata),
678 indata[:min(len(indata),20)].lower(), len(indata)))
681 if test_support.verbose:
682 sys.stdout.write(" client: closing connection.\n")
688 def tryProtocolCombo (server_protocol,
693 if certsreqs is None:
694 certsreqs = ssl.CERT_NONE
696 if certsreqs == ssl.CERT_NONE:
697 certtype = "CERT_NONE"
698 elif certsreqs == ssl.CERT_OPTIONAL:
699 certtype = "CERT_OPTIONAL"
700 elif certsreqs == ssl.CERT_REQUIRED:
701 certtype = "CERT_REQUIRED"
702 if test_support.verbose:
703 formatstr = (expectedToWork and " %s->%s %s\n") or " {%s->%s} %s\n"
704 sys.stdout.write(formatstr %
705 (ssl.get_protocol_name(client_protocol),
706 ssl.get_protocol_name(server_protocol),
709 serverParamsTest(CERTFILE, server_protocol, certsreqs,
710 CERTFILE, CERTFILE, client_protocol, chatty=False)
711 except test_support.TestFailed:
715 if not expectedToWork:
716 raise test_support.TestFailed(
717 "Client protocol %s succeeded with server protocol %s!"
718 % (ssl.get_protocol_name(client_protocol),
719 ssl.get_protocol_name(server_protocol)))
722 class ThreadedTests(unittest.TestCase):
724 def testRudeShutdown(self):
726 listener_ready = threading.Event()
727 listener_gone = threading.Event()
728 port = test_support.find_unused_port()
730 # `listener` runs in a thread. It opens a socket listening on
731 # PORT, and sits in an accept() until the main thread connects.
732 # Then it rudely closes the socket, and sets Event `listener_gone`
733 # to let the main thread know the socket is gone.
740 s = None # reclaim the socket object, which also closes it
744 listener_ready.wait()
746 s.connect((HOST, port))
749 ssl_sock = ssl.wrap_socket(s)
753 raise test_support.TestFailed(
754 'connecting to closed SSL socket should have failed')
756 t = threading.Thread(target=listener)
763 if test_support.verbose:
764 sys.stdout.write("\n")
765 serverParamsTest(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE,
766 CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1,
767 chatty=True, connectionchatty=True)
769 def testReadCert(self):
771 if test_support.verbose:
772 sys.stdout.write("\n")
774 server = ThreadedEchoServer(CERTFILE,
775 certreqs=ssl.CERT_NONE,
776 ssl_version=ssl.PROTOCOL_SSLv23,
779 flag = threading.Event()
781 # wait for it to start
786 s = ssl.wrap_socket(socket.socket(),
789 cert_reqs=ssl.CERT_REQUIRED,
790 ssl_version=ssl.PROTOCOL_SSLv23)
791 s.connect((HOST, server.port))
792 except ssl.SSLError, x:
793 raise test_support.TestFailed(
794 "Unexpected SSL error: " + str(x))
796 raise test_support.TestFailed(
797 "Unexpected exception: " + str(x))
800 raise test_support.TestFailed(
801 "Can't SSL-handshake with test server")
802 cert = s.getpeercert()
804 raise test_support.TestFailed(
805 "Can't get peer certificate.")
807 if test_support.verbose:
808 sys.stdout.write(pprint.pformat(cert) + '\n')
809 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
810 if not cert.has_key('subject'):
811 raise test_support.TestFailed(
812 "No subject field in certificate: %s." %
813 pprint.pformat(cert))
814 if ((('organizationName', 'Python Software Foundation'),)
815 not in cert['subject']):
816 raise test_support.TestFailed(
817 "Missing or invalid 'organizationName' field in certificate subject; "
818 "should be 'Python Software Foundation'.")
824 def testNULLcert(self):
825 badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
827 def testMalformedCert(self):
828 badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
830 def testWrongCert(self):
831 badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
833 def testMalformedKey(self):
834 badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
837 def testProtocolSSL2(self):
838 if test_support.verbose:
839 sys.stdout.write("\n")
840 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
841 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
842 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
843 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True)
844 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
845 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
847 def testProtocolSSL23(self):
848 if test_support.verbose:
849 sys.stdout.write("\n")
851 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
852 except test_support.TestFailed, x:
853 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
854 if test_support.verbose:
856 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
858 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True)
859 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
860 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True)
862 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
863 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
864 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
866 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
867 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
868 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
870 def testProtocolSSL3(self):
871 if test_support.verbose:
872 sys.stdout.write("\n")
873 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True)
874 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
875 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
876 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
877 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False)
878 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
880 def testProtocolTLS1(self):
881 if test_support.verbose:
882 sys.stdout.write("\n")
883 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True)
884 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
885 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
886 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
887 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
888 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False)
890 def testSTARTTLS (self):
892 msgs = ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4", "ENDTLS", "msg 5", "msg 6")
894 server = ThreadedEchoServer(CERTFILE,
895 ssl_version=ssl.PROTOCOL_TLSv1,
896 starttls_server=True,
898 connectionchatty=True)
899 flag = threading.Event()
901 # wait for it to start
909 s.connect((HOST, server.port))
911 raise test_support.TestFailed("Unexpected exception: " + str(x))
913 if test_support.verbose:
914 sys.stdout.write("\n")
916 if test_support.verbose:
918 " client: sending %s...\n" % repr(indata))
921 outdata = conn.read()
924 outdata = s.recv(1024)
925 if (indata == "STARTTLS" and
926 outdata.strip().lower().startswith("ok")):
927 if test_support.verbose:
929 " client: read %s from server, starting TLS...\n"
931 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
933 elif (indata == "ENDTLS" and
934 outdata.strip().lower().startswith("ok")):
935 if test_support.verbose:
937 " client: read %s from server, ending TLS...\n"
942 if test_support.verbose:
944 " client: read %s from server\n" % repr(outdata))
945 if test_support.verbose:
946 sys.stdout.write(" client: closing connection.\n")
956 def testSocketServer(self):
958 server = SocketServerHTTPSServer(CERTFILE)
959 flag = threading.Event()
961 # wait for it to start
965 if test_support.verbose:
966 sys.stdout.write('\n')
967 d1 = open(CERTFILE, 'rb').read()
969 # now fetch the same data from the HTTPS server
970 url = 'https://127.0.0.1:%d/%s' % (
971 server.port, os.path.split(CERTFILE)[1])
972 f = urllib.urlopen(url)
973 dlen = f.info().getheader("content-length")
974 if dlen and (int(dlen) > 0):
975 d2 = f.read(int(dlen))
976 if test_support.verbose:
978 " client: read %d bytes from remote server '%s'\n"
982 msg = ''.join(traceback.format_exception(*sys.exc_info()))
983 if test_support.verbose:
984 sys.stdout.write('\n' + msg)
985 raise test_support.TestFailed(msg)
988 raise test_support.TestFailed(
989 "Couldn't fetch data from HTTPS server")
994 def testWrappedAccept (self):
996 if test_support.verbose:
997 sys.stdout.write("\n")
998 serverParamsTest(CERTFILE, ssl.PROTOCOL_SSLv23, ssl.CERT_REQUIRED,
999 CERTFILE, CERTFILE, ssl.PROTOCOL_SSLv23,
1000 chatty=True, connectionchatty=True,
1001 wrap_accepting_socket=True)
1004 def testAsyncoreServer (self):
1006 indata = "TEST MESSAGE of mixed case\n"
1008 if test_support.verbose:
1009 sys.stdout.write("\n")
1010 server = AsyncoreEchoServer(CERTFILE)
1011 flag = threading.Event()
1013 # wait for it to start
1018 s = ssl.wrap_socket(socket.socket())
1019 s.connect(('127.0.0.1', server.port))
1020 except ssl.SSLError, x:
1021 raise test_support.TestFailed("Unexpected SSL error: " + str(x))
1022 except Exception, x:
1023 raise test_support.TestFailed("Unexpected exception: " + str(x))
1025 if test_support.verbose:
1027 " client: sending %s...\n" % (repr(indata)))
1030 if test_support.verbose:
1031 sys.stdout.write(" client: read %s\n" % repr(outdata))
1032 if outdata != indata.lower():
1033 raise test_support.TestFailed(
1034 "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
1035 % (outdata[:min(len(outdata),20)], len(outdata),
1036 indata[:min(len(indata),20)].lower(), len(indata)))
1038 if test_support.verbose:
1039 sys.stdout.write(" client: closing connection.\n")
1043 # wait for server thread to end
1047 def testAllRecvAndSendMethods(self):
1049 if test_support.verbose:
1050 sys.stdout.write("\n")
1052 server = ThreadedEchoServer(CERTFILE,
1053 certreqs=ssl.CERT_NONE,
1054 ssl_version=ssl.PROTOCOL_TLSv1,
1057 connectionchatty=False)
1058 flag = threading.Event()
1060 # wait for it to start
1064 s = ssl.wrap_socket(socket.socket(),
1068 cert_reqs=ssl.CERT_NONE,
1069 ssl_version=ssl.PROTOCOL_TLSv1)
1070 s.connect((HOST, server.port))
1071 except ssl.SSLError as x:
1072 raise support.TestFailed("Unexpected SSL error: " + str(x))
1073 except Exception as x:
1074 raise support.TestFailed("Unexpected exception: " + str(x))
1076 # helper methods for standardising recv* method signatures
1078 b = bytearray("\0"*100)
1079 count = s.recv_into(b)
1082 def _recvfrom_into():
1083 b = bytearray("\0"*100)
1084 count, addr = s.recvfrom_into(b)
1087 # (name, method, whether to expect success, *args)
1089 ('send', s.send, True, []),
1090 ('sendto', s.sendto, False, ["some.address"]),
1091 ('sendall', s.sendall, True, []),
1094 ('recv', s.recv, True, []),
1095 ('recvfrom', s.recvfrom, False, ["some.address"]),
1096 ('recv_into', _recv_into, True, []),
1097 ('recvfrom_into', _recvfrom_into, False, []),
1099 data_prefix = u"PREFIX_"
1101 for meth_name, send_meth, expect_success, args in send_methods:
1102 indata = data_prefix + meth_name
1104 send_meth(indata.encode('ASCII', 'strict'), *args)
1106 outdata = outdata.decode('ASCII', 'strict')
1107 if outdata != indata.lower():
1108 raise support.TestFailed(
1109 "While sending with <<%s>> bad data "
1110 "<<%r>> (%d) received; "
1111 "expected <<%r>> (%d)\n" % (
1112 meth_name, outdata[:20], len(outdata),
1113 indata[:20], len(indata)
1116 except ValueError as e:
1118 raise support.TestFailed(
1119 "Failed to send with method <<%s>>; "
1120 "expected to succeed.\n" % (meth_name,)
1122 if not str(e).startswith(meth_name):
1123 raise support.TestFailed(
1124 "Method <<%s>> failed with unexpected "
1125 "exception message: %s\n" % (
1130 for meth_name, recv_meth, expect_success, args in recv_methods:
1131 indata = data_prefix + meth_name
1133 s.send(indata.encode('ASCII', 'strict'))
1134 outdata = recv_meth(*args)
1135 outdata = outdata.decode('ASCII', 'strict')
1136 if outdata != indata.lower():
1137 raise support.TestFailed(
1138 "While receiving with <<%s>> bad data "
1139 "<<%r>> (%d) received; "
1140 "expected <<%r>> (%d)\n" % (
1141 meth_name, outdata[:20], len(outdata),
1142 indata[:20], len(indata)
1145 except ValueError as e:
1147 raise support.TestFailed(
1148 "Failed to receive with method <<%s>>; "
1149 "expected to succeed.\n" % (meth_name,)
1151 if not str(e).startswith(meth_name):
1152 raise support.TestFailed(
1153 "Method <<%s>> failed with unexpected "
1154 "exception message: %s\n" % (
1161 s.write("over\n".encode("ASCII", "strict"))
1168 def test_main(verbose=False):
1170 raise test_support.TestSkipped("No SSL support")
1172 global CERTFILE, SVN_PYTHON_ORG_ROOT_CERT
1173 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
1175 SVN_PYTHON_ORG_ROOT_CERT = os.path.join(
1176 os.path.dirname(__file__) or os.curdir,
1177 "https_svn_python_org_root.pem")
1179 if (not os.path.exists(CERTFILE) or
1180 not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT)):
1181 raise test_support.TestFailed("Can't read certificate files!")
1183 TESTPORT = test_support.find_unused_port()
1185 raise test_support.TestFailed("Can't find open port to test servers on!")
1187 tests = [BasicTests]
1189 if test_support.is_resource_enabled('network'):
1190 tests.append(NetworkedTests)
1193 thread_info = test_support.threading_setup()
1194 if thread_info and test_support.is_resource_enabled('network'):
1195 tests.append(ThreadedTests)
1197 test_support.run_unittest(*tests)
1200 test_support.threading_cleanup(*thread_info)
1202 if __name__ == "__main__":