]> rtime.felk.cvut.cz Git - l4.git/blob - l4/pkg/python/contrib/Lib/test/test_ssl.py
Inital import
[l4.git] / l4 / pkg / python / contrib / Lib / test / test_ssl.py
1 # Test the support for SSL and sockets
2
3 import sys
4 import unittest
5 from test import test_support
6 import asyncore
7 import socket
8 import select
9 import errno
10 import subprocess
11 import time
12 import os
13 import pprint
14 import urllib, urlparse
15 import shutil
16 import traceback
17
18 from BaseHTTPServer import HTTPServer
19 from SimpleHTTPServer import SimpleHTTPRequestHandler
20
21 # Optionally test SSL support, if we have it in the tested platform
22 skip_expected = False
23 try:
24     import ssl
25 except ImportError:
26     skip_expected = True
27
28 HOST = test_support.HOST
29 CERTFILE = None
30 SVN_PYTHON_ORG_ROOT_CERT = None
31
32 def handle_error(prefix):
33     exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
34     if test_support.verbose:
35         sys.stdout.write(prefix + exc_format)
36
37     def testSimpleSSLwrap(self):
38         try:
39             ssl.sslwrap_simple(socket.socket(socket.AF_INET))
40         except IOError, e:
41             if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
42                 pass
43             else:
44                 raise
45         try:
46             ssl.sslwrap_simple(socket.socket(socket.AF_INET)._sock)
47         except IOError, e:
48             if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
49                 pass
50             else:
51                 raise
52
53 class BasicTests(unittest.TestCase):
54
55     def testSSLconnect(self):
56         if not test_support.is_resource_enabled('network'):
57             return
58         s = ssl.wrap_socket(socket.socket(socket.AF_INET),
59                             cert_reqs=ssl.CERT_NONE)
60         s.connect(("svn.python.org", 443))
61         c = s.getpeercert()
62         if c:
63             raise test_support.TestFailed("Peer cert %s shouldn't be here!")
64         s.close()
65
66         # this should fail because we have no verification certs
67         s = ssl.wrap_socket(socket.socket(socket.AF_INET),
68                             cert_reqs=ssl.CERT_REQUIRED)
69         try:
70             s.connect(("svn.python.org", 443))
71         except ssl.SSLError:
72             pass
73         finally:
74             s.close()
75
76     def testCrucialConstants(self):
77         ssl.PROTOCOL_SSLv2
78         ssl.PROTOCOL_SSLv23
79         ssl.PROTOCOL_SSLv3
80         ssl.PROTOCOL_TLSv1
81         ssl.CERT_NONE
82         ssl.CERT_OPTIONAL
83         ssl.CERT_REQUIRED
84
85     def testRAND(self):
86         v = ssl.RAND_status()
87         if test_support.verbose:
88             sys.stdout.write("\n RAND_status is %d (%s)\n"
89                              % (v, (v and "sufficient randomness") or
90                                 "insufficient randomness"))
91         try:
92             ssl.RAND_egd(1)
93         except TypeError:
94             pass
95         else:
96             print "didn't raise TypeError"
97         ssl.RAND_add("this is a random string", 75.0)
98
99     def testParseCert(self):
100         # note that this uses an 'unofficial' function in _ssl.c,
101         # provided solely for this test, to exercise the certificate
102         # parsing code
103         p = ssl._ssl._test_decode_cert(CERTFILE, False)
104         if test_support.verbose:
105             sys.stdout.write("\n" + pprint.pformat(p) + "\n")
106
107     def testDERtoPEM(self):
108
109         pem = open(SVN_PYTHON_ORG_ROOT_CERT, 'r').read()
110         d1 = ssl.PEM_cert_to_DER_cert(pem)
111         p2 = ssl.DER_cert_to_PEM_cert(d1)
112         d2 = ssl.PEM_cert_to_DER_cert(p2)
113         if (d1 != d2):
114             raise test_support.TestFailed("PEM-to-DER or DER-to-PEM translation failed")
115
116 class NetworkedTests(unittest.TestCase):
117
118     def testConnect(self):
119         s = ssl.wrap_socket(socket.socket(socket.AF_INET),
120                             cert_reqs=ssl.CERT_NONE)
121         s.connect(("svn.python.org", 443))
122         c = s.getpeercert()
123         if c:
124             raise test_support.TestFailed("Peer cert %s shouldn't be here!")
125         s.close()
126
127         # this should fail because we have no verification certs
128         s = ssl.wrap_socket(socket.socket(socket.AF_INET),
129                             cert_reqs=ssl.CERT_REQUIRED)
130         try:
131             s.connect(("svn.python.org", 443))
132         except ssl.SSLError:
133             pass
134         finally:
135             s.close()
136
137         # this should succeed because we specify the root cert
138         s = ssl.wrap_socket(socket.socket(socket.AF_INET),
139                             cert_reqs=ssl.CERT_REQUIRED,
140                             ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
141         try:
142             s.connect(("svn.python.org", 443))
143         except ssl.SSLError, x:
144             raise test_support.TestFailed("Unexpected exception %s" % x)
145         finally:
146             s.close()
147
148
149     def testNonBlockingHandshake(self):
150         s = socket.socket(socket.AF_INET)
151         s.connect(("svn.python.org", 443))
152         s.setblocking(False)
153         s = ssl.wrap_socket(s,
154                             cert_reqs=ssl.CERT_NONE,
155                             do_handshake_on_connect=False)
156         count = 0
157         while True:
158             try:
159                 count += 1
160                 s.do_handshake()
161                 break
162             except ssl.SSLError, err:
163                 if err.args[0] == ssl.SSL_ERROR_WANT_READ:
164                     select.select([s], [], [])
165                 elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
166                     select.select([], [s], [])
167                 else:
168                     raise
169         s.close()
170         if test_support.verbose:
171             sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
172
173     def testFetchServerCert(self):
174
175         pem = ssl.get_server_certificate(("svn.python.org", 443))
176         if not pem:
177             raise test_support.TestFailed("No server certificate on svn.python.org:443!")
178
179         try:
180             pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE)
181         except ssl.SSLError:
182             #should fail
183             pass
184         else:
185             raise test_support.TestFailed("Got server certificate %s for svn.python.org!" % pem)
186
187         pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
188         if not pem:
189             raise test_support.TestFailed("No server certificate on svn.python.org:443!")
190         if test_support.verbose:
191             sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem)
192
193
194 try:
195     import threading
196 except ImportError:
197     _have_threads = False
198 else:
199
200     _have_threads = True
201
202     class ThreadedEchoServer(threading.Thread):
203
204         class ConnectionHandler(threading.Thread):
205
206             """A mildly complicated class, because we want it to work both
207             with and without the SSL wrapper around the socket connection, so
208             that we can test the STARTTLS functionality."""
209
210             def __init__(self, server, connsock):
211                 self.server = server
212                 self.running = False
213                 self.sock = connsock
214                 self.sock.setblocking(1)
215                 self.sslconn = None
216                 threading.Thread.__init__(self)
217                 self.daemon = True
218
219             def show_conn_details(self):
220                 if self.server.certreqs == ssl.CERT_REQUIRED:
221                     cert = self.sslconn.getpeercert()
222                     if test_support.verbose and self.server.chatty:
223                         sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
224                     cert_binary = self.sslconn.getpeercert(True)
225                     if test_support.verbose and self.server.chatty:
226                         sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
227                 cipher = self.sslconn.cipher()
228                 if test_support.verbose and self.server.chatty:
229                     sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
230
231             def wrap_conn (self):
232                 try:
233                     self.sslconn = ssl.wrap_socket(self.sock, server_side=True,
234                                                    certfile=self.server.certificate,
235                                                    ssl_version=self.server.protocol,
236                                                    ca_certs=self.server.cacerts,
237                                                    cert_reqs=self.server.certreqs)
238                 except:
239                     if self.server.chatty:
240                         handle_error("\n server:  bad connection attempt from " +
241                                      str(self.sock.getpeername()) + ":\n")
242                     self.close()
243                     if not self.server.expect_bad_connects:
244                         # here, we want to stop the server, because this shouldn't
245                         # happen in the context of our test case
246                         self.running = False
247                         # normally, we'd just stop here, but for the test
248                         # harness, we want to stop the server
249                         self.server.stop()
250                     return False
251
252                 else:
253                     return True
254
255             def read(self):
256                 if self.sslconn:
257                     return self.sslconn.read()
258                 else:
259                     return self.sock.recv(1024)
260
261             def write(self, bytes):
262                 if self.sslconn:
263                     return self.sslconn.write(bytes)
264                 else:
265                     return self.sock.send(bytes)
266
267             def close(self):
268                 if self.sslconn:
269                     self.sslconn.close()
270                 else:
271                     self.sock._sock.close()
272
273             def run (self):
274                 self.running = True
275                 if not self.server.starttls_server:
276                     if isinstance(self.sock, ssl.SSLSocket):
277                         self.sslconn = self.sock
278                     elif not self.wrap_conn():
279                         return
280                     self.show_conn_details()
281                 while self.running:
282                     try:
283                         msg = self.read()
284                         if not msg:
285                             # eof, so quit this handler
286                             self.running = False
287                             self.close()
288                         elif msg.strip() == 'over':
289                             if test_support.verbose and self.server.connectionchatty:
290                                 sys.stdout.write(" server: client closed connection\n")
291                             self.close()
292                             return
293                         elif self.server.starttls_server and msg.strip() == 'STARTTLS':
294                             if test_support.verbose and self.server.connectionchatty:
295                                 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
296                             self.write("OK\n")
297                             if not self.wrap_conn():
298                                 return
299                         elif self.server.starttls_server and self.sslconn and msg.strip() == 'ENDTLS':
300                             if test_support.verbose and self.server.connectionchatty:
301                                 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
302                             self.write("OK\n")
303                             self.sslconn.unwrap()
304                             self.sslconn = None
305                             if test_support.verbose and self.server.connectionchatty:
306                                 sys.stdout.write(" server: connection is now unencrypted...\n")
307                         else:
308                             if (test_support.verbose and
309                                 self.server.connectionchatty):
310                                 ctype = (self.sslconn and "encrypted") or "unencrypted"
311                                 sys.stdout.write(" server: read %s (%s), sending back %s (%s)...\n"
312                                                  % (repr(msg), ctype, repr(msg.lower()), ctype))
313                             self.write(msg.lower())
314                     except ssl.SSLError:
315                         if self.server.chatty:
316                             handle_error("Test server failure:\n")
317                         self.close()
318                         self.running = False
319                         # normally, we'd just stop here, but for the test
320                         # harness, we want to stop the server
321                         self.server.stop()
322                     except:
323                         handle_error('')
324
325         def __init__(self, certificate, ssl_version=None,
326                      certreqs=None, cacerts=None, expect_bad_connects=False,
327                      chatty=True, connectionchatty=False, starttls_server=False,
328                      wrap_accepting_socket=False):
329
330             if ssl_version is None:
331                 ssl_version = ssl.PROTOCOL_TLSv1
332             if certreqs is None:
333                 certreqs = ssl.CERT_NONE
334             self.certificate = certificate
335             self.protocol = ssl_version
336             self.certreqs = certreqs
337             self.cacerts = cacerts
338             self.expect_bad_connects = expect_bad_connects
339             self.chatty = chatty
340             self.connectionchatty = connectionchatty
341             self.starttls_server = starttls_server
342             self.sock = socket.socket()
343             self.flag = None
344             if wrap_accepting_socket:
345                 self.sock = ssl.wrap_socket(self.sock, server_side=True,
346                                             certfile=self.certificate,
347                                             cert_reqs = self.certreqs,
348                                             ca_certs = self.cacerts,
349                                             ssl_version = self.protocol)
350                 if test_support.verbose and self.chatty:
351                     sys.stdout.write(' server:  wrapped server socket as %s\n' % str(self.sock))
352             self.port = test_support.bind_port(self.sock)
353             self.active = False
354             threading.Thread.__init__(self)
355             self.daemon = True
356
357         def start (self, flag=None):
358             self.flag = flag
359             threading.Thread.start(self)
360
361         def run (self):
362             self.sock.settimeout(0.5)
363             self.sock.listen(5)
364             self.active = True
365             if self.flag:
366                 # signal an event
367                 self.flag.set()
368             while self.active:
369                 try:
370                     newconn, connaddr = self.sock.accept()
371                     if test_support.verbose and self.chatty:
372                         sys.stdout.write(' server:  new connection from '
373                                          + str(connaddr) + '\n')
374                     handler = self.ConnectionHandler(self, newconn)
375                     handler.start()
376                 except socket.timeout:
377                     pass
378                 except KeyboardInterrupt:
379                     self.stop()
380                 except:
381                     if self.chatty:
382                         handle_error("Test server failure:\n")
383             self.sock.close()
384
385         def stop (self):
386             self.active = False
387
388     class AsyncoreEchoServer(threading.Thread):
389
390         class EchoServer (asyncore.dispatcher):
391
392             class ConnectionHandler (asyncore.dispatcher_with_send):
393
394                 def __init__(self, conn, certfile):
395                     asyncore.dispatcher_with_send.__init__(self, conn)
396                     self.socket = ssl.wrap_socket(conn, server_side=True,
397                                                   certfile=certfile,
398                                                   do_handshake_on_connect=True)
399
400                 def readable(self):
401                     if isinstance(self.socket, ssl.SSLSocket):
402                         while self.socket.pending() > 0:
403                             self.handle_read_event()
404                     return True
405
406                 def handle_read(self):
407                     data = self.recv(1024)
408                     self.send(data.lower())
409
410                 def handle_close(self):
411                     self.close()
412                     if test_support.verbose:
413                         sys.stdout.write(" server:  closed connection %s\n" % self.socket)
414
415                 def handle_error(self):
416                     raise
417
418             def __init__(self, certfile):
419                 self.certfile = certfile
420                 asyncore.dispatcher.__init__(self)
421                 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
422                 self.port = test_support.bind_port(self.socket)
423                 self.listen(5)
424
425             def handle_accept(self):
426                 sock_obj, addr = self.accept()
427                 if test_support.verbose:
428                     sys.stdout.write(" server:  new connection from %s:%s\n" %addr)
429                 self.ConnectionHandler(sock_obj, self.certfile)
430
431             def handle_error(self):
432                 raise
433
434         def __init__(self, certfile):
435             self.flag = None
436             self.active = False
437             self.server = self.EchoServer(certfile)
438             self.port = self.server.port
439             threading.Thread.__init__(self)
440             self.daemon = True
441
442         def __str__(self):
443             return "<%s %s>" % (self.__class__.__name__, self.server)
444
445         def start (self, flag=None):
446             self.flag = flag
447             threading.Thread.start(self)
448
449         def run (self):
450             self.active = True
451             if self.flag:
452                 self.flag.set()
453             while self.active:
454                 try:
455                     asyncore.loop(1)
456                 except:
457                     pass
458
459         def stop (self):
460             self.active = False
461             self.server.close()
462
463     class SocketServerHTTPSServer(threading.Thread):
464
465         class HTTPSServer(HTTPServer):
466
467             def __init__(self, server_address, RequestHandlerClass, certfile):
468
469                 HTTPServer.__init__(self, server_address, RequestHandlerClass)
470                 # we assume the certfile contains both private key and certificate
471                 self.certfile = certfile
472                 self.active = False
473                 self.active_lock = threading.Lock()
474                 self.allow_reuse_address = True
475
476             def __str__(self):
477                 return ('<%s %s:%s>' %
478                         (self.__class__.__name__,
479                          self.server_name,
480                          self.server_port))
481
482             def get_request (self):
483                 # override this to wrap socket with SSL
484                 sock, addr = self.socket.accept()
485                 sslconn = ssl.wrap_socket(sock, server_side=True,
486                                           certfile=self.certfile)
487                 return sslconn, addr
488
489             # The methods overridden below this are mainly so that we
490             # can run it in a thread and be able to stop it from another
491             # You probably wouldn't need them in other uses.
492
493             def server_activate(self):
494                 # We want to run this in a thread for testing purposes,
495                 # so we override this to set timeout, so that we get
496                 # a chance to stop the server
497                 self.socket.settimeout(0.5)
498                 HTTPServer.server_activate(self)
499
500             def serve_forever(self):
501                 # We want this to run in a thread, so we use a slightly
502                 # modified version of "forever".
503                 self.active = True
504                 while 1:
505                     try:
506                         # We need to lock while handling the request.
507                         # Another thread can close the socket after self.active
508                         # has been checked and before the request is handled.
509                         # This causes an exception when using the closed socket.
510                         with self.active_lock:
511                             if not self.active:
512                                 break
513                             self.handle_request()
514                     except socket.timeout:
515                         pass
516                     except KeyboardInterrupt:
517                         self.server_close()
518                         return
519                     except:
520                         sys.stdout.write(''.join(traceback.format_exception(*sys.exc_info())))
521                         break
522                     time.sleep(0.1)
523
524             def server_close(self):
525                 # Again, we want this to run in a thread, so we need to override
526                 # close to clear the "active" flag, so that serve_forever() will
527                 # terminate.
528                 with self.active_lock:
529                     HTTPServer.server_close(self)
530                     self.active = False
531
532         class RootedHTTPRequestHandler(SimpleHTTPRequestHandler):
533
534             # need to override translate_path to get a known root,
535             # instead of using os.curdir, since the test could be
536             # run from anywhere
537
538             server_version = "TestHTTPS/1.0"
539
540             root = None
541
542             def translate_path(self, path):
543                 """Translate a /-separated PATH to the local filename syntax.
544
545                 Components that mean special things to the local file system
546                 (e.g. drive or directory names) are ignored.  (XXX They should
547                 probably be diagnosed.)
548
549                 """
550                 # abandon query parameters
551                 path = urlparse.urlparse(path)[2]
552                 path = os.path.normpath(urllib.unquote(path))
553                 words = path.split('/')
554                 words = filter(None, words)
555                 path = self.root
556                 for word in words:
557                     drive, word = os.path.splitdrive(word)
558                     head, word = os.path.split(word)
559                     if word in self.root: continue
560                     path = os.path.join(path, word)
561                 return path
562
563             def log_message(self, format, *args):
564
565                 # we override this to suppress logging unless "verbose"
566
567                 if test_support.verbose:
568                     sys.stdout.write(" server (%s:%d %s):\n   [%s] %s\n" %
569                                      (self.server.server_address,
570                                       self.server.server_port,
571                                       self.request.cipher(),
572                                       self.log_date_time_string(),
573                                       format%args))
574
575
576         def __init__(self, certfile):
577             self.flag = None
578             self.active = False
579             self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0]
580             self.port = test_support.find_unused_port()
581             self.server = self.HTTPSServer(
582                 (HOST, self.port), self.RootedHTTPRequestHandler, certfile)
583             threading.Thread.__init__(self)
584             self.daemon = True
585
586         def __str__(self):
587             return "<%s %s>" % (self.__class__.__name__, self.server)
588
589         def start (self, flag=None):
590             self.flag = flag
591             threading.Thread.start(self)
592
593         def run (self):
594             self.active = True
595             if self.flag:
596                 self.flag.set()
597             self.server.serve_forever()
598             self.active = False
599
600         def stop (self):
601             self.active = False
602             self.server.server_close()
603
604
605     def badCertTest (certfile):
606         server = ThreadedEchoServer(CERTFILE,
607                                     certreqs=ssl.CERT_REQUIRED,
608                                     cacerts=CERTFILE, chatty=False)
609         flag = threading.Event()
610         server.start(flag)
611         # wait for it to start
612         flag.wait()
613         # try to connect
614         try:
615             try:
616                 s = ssl.wrap_socket(socket.socket(),
617                                     certfile=certfile,
618                                     ssl_version=ssl.PROTOCOL_TLSv1)
619                 s.connect((HOST, server.port))
620             except ssl.SSLError, x:
621                 if test_support.verbose:
622                     sys.stdout.write("\nSSLError is %s\n" % x[1])
623             except socket.error, x:
624                 if test_support.verbose:
625                     sys.stdout.write("\nsocket.error is %s\n" % x[1])
626             else:
627                 raise test_support.TestFailed(
628                     "Use of invalid cert should have failed!")
629         finally:
630             server.stop()
631             server.join()
632
633     def serverParamsTest (certfile, protocol, certreqs, cacertsfile,
634                           client_certfile, client_protocol=None, indata="FOO\n",
635                           chatty=True, connectionchatty=False,
636                           wrap_accepting_socket=False):
637
638         server = ThreadedEchoServer(certfile,
639                                     certreqs=certreqs,
640                                     ssl_version=protocol,
641                                     cacerts=cacertsfile,
642                                     chatty=chatty,
643                                     connectionchatty=connectionchatty,
644                                     wrap_accepting_socket=wrap_accepting_socket)
645         flag = threading.Event()
646         server.start(flag)
647         # wait for it to start
648         flag.wait()
649         # try to connect
650         if client_protocol is None:
651             client_protocol = protocol
652         try:
653             try:
654                 s = ssl.wrap_socket(socket.socket(),
655                                     certfile=client_certfile,
656                                     ca_certs=cacertsfile,
657                                     cert_reqs=certreqs,
658                                     ssl_version=client_protocol)
659                 s.connect((HOST, server.port))
660             except ssl.SSLError, x:
661                 raise test_support.TestFailed("Unexpected SSL error:  " + str(x))
662             except Exception, x:
663                 raise test_support.TestFailed("Unexpected exception:  " + str(x))
664             else:
665                 if connectionchatty:
666                     if test_support.verbose:
667                         sys.stdout.write(
668                             " client:  sending %s...\n" % (repr(indata)))
669                 s.write(indata)
670                 outdata = s.read()
671                 if connectionchatty:
672                     if test_support.verbose:
673                         sys.stdout.write(" client:  read %s\n" % repr(outdata))
674                 if outdata != indata.lower():
675                     raise test_support.TestFailed(
676                         "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
677                         % (outdata[:min(len(outdata),20)], len(outdata),
678                            indata[:min(len(indata),20)].lower(), len(indata)))
679                 s.write("over\n")
680                 if connectionchatty:
681                     if test_support.verbose:
682                         sys.stdout.write(" client:  closing connection.\n")
683                 s.close()
684         finally:
685             server.stop()
686             server.join()
687
688     def tryProtocolCombo (server_protocol,
689                           client_protocol,
690                           expectedToWork,
691                           certsreqs=None):
692
693         if certsreqs is None:
694             certsreqs = ssl.CERT_NONE
695
696         if certsreqs == ssl.CERT_NONE:
697             certtype = "CERT_NONE"
698         elif certsreqs == ssl.CERT_OPTIONAL:
699             certtype = "CERT_OPTIONAL"
700         elif certsreqs == ssl.CERT_REQUIRED:
701             certtype = "CERT_REQUIRED"
702         if test_support.verbose:
703             formatstr = (expectedToWork and " %s->%s %s\n") or " {%s->%s} %s\n"
704             sys.stdout.write(formatstr %
705                              (ssl.get_protocol_name(client_protocol),
706                               ssl.get_protocol_name(server_protocol),
707                               certtype))
708         try:
709             serverParamsTest(CERTFILE, server_protocol, certsreqs,
710                              CERTFILE, CERTFILE, client_protocol, chatty=False)
711         except test_support.TestFailed:
712             if expectedToWork:
713                 raise
714         else:
715             if not expectedToWork:
716                 raise test_support.TestFailed(
717                     "Client protocol %s succeeded with server protocol %s!"
718                     % (ssl.get_protocol_name(client_protocol),
719                        ssl.get_protocol_name(server_protocol)))
720
721
722     class ThreadedTests(unittest.TestCase):
723
724         def testRudeShutdown(self):
725
726             listener_ready = threading.Event()
727             listener_gone = threading.Event()
728             port = test_support.find_unused_port()
729
730             # `listener` runs in a thread.  It opens a socket listening on
731             # PORT, and sits in an accept() until the main thread connects.
732             # Then it rudely closes the socket, and sets Event `listener_gone`
733             # to let the main thread know the socket is gone.
734             def listener():
735                 s = socket.socket()
736                 s.bind((HOST, port))
737                 s.listen(5)
738                 listener_ready.set()
739                 s.accept()
740                 s = None # reclaim the socket object, which also closes it
741                 listener_gone.set()
742
743             def connector():
744                 listener_ready.wait()
745                 s = socket.socket()
746                 s.connect((HOST, port))
747                 listener_gone.wait()
748                 try:
749                     ssl_sock = ssl.wrap_socket(s)
750                 except IOError:
751                     pass
752                 else:
753                     raise test_support.TestFailed(
754                           'connecting to closed SSL socket should have failed')
755
756             t = threading.Thread(target=listener)
757             t.start()
758             connector()
759             t.join()
760
761         def testEcho (self):
762
763             if test_support.verbose:
764                 sys.stdout.write("\n")
765             serverParamsTest(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE,
766                              CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1,
767                              chatty=True, connectionchatty=True)
768
769         def testReadCert(self):
770
771             if test_support.verbose:
772                 sys.stdout.write("\n")
773             s2 = socket.socket()
774             server = ThreadedEchoServer(CERTFILE,
775                                         certreqs=ssl.CERT_NONE,
776                                         ssl_version=ssl.PROTOCOL_SSLv23,
777                                         cacerts=CERTFILE,
778                                         chatty=False)
779             flag = threading.Event()
780             server.start(flag)
781             # wait for it to start
782             flag.wait()
783             # try to connect
784             try:
785                 try:
786                     s = ssl.wrap_socket(socket.socket(),
787                                         certfile=CERTFILE,
788                                         ca_certs=CERTFILE,
789                                         cert_reqs=ssl.CERT_REQUIRED,
790                                         ssl_version=ssl.PROTOCOL_SSLv23)
791                     s.connect((HOST, server.port))
792                 except ssl.SSLError, x:
793                     raise test_support.TestFailed(
794                         "Unexpected SSL error:  " + str(x))
795                 except Exception, x:
796                     raise test_support.TestFailed(
797                         "Unexpected exception:  " + str(x))
798                 else:
799                     if not s:
800                         raise test_support.TestFailed(
801                             "Can't SSL-handshake with test server")
802                     cert = s.getpeercert()
803                     if not cert:
804                         raise test_support.TestFailed(
805                             "Can't get peer certificate.")
806                     cipher = s.cipher()
807                     if test_support.verbose:
808                         sys.stdout.write(pprint.pformat(cert) + '\n')
809                         sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
810                     if not cert.has_key('subject'):
811                         raise test_support.TestFailed(
812                             "No subject field in certificate: %s." %
813                             pprint.pformat(cert))
814                     if ((('organizationName', 'Python Software Foundation'),)
815                         not in cert['subject']):
816                         raise test_support.TestFailed(
817                             "Missing or invalid 'organizationName' field in certificate subject; "
818                             "should be 'Python Software Foundation'.")
819                     s.close()
820             finally:
821                 server.stop()
822                 server.join()
823
824         def testNULLcert(self):
825             badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
826                                      "nullcert.pem"))
827         def testMalformedCert(self):
828             badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
829                                      "badcert.pem"))
830         def testWrongCert(self):
831             badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
832                                      "wrongcert.pem"))
833         def testMalformedKey(self):
834             badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
835                                      "badkey.pem"))
836
837         def testProtocolSSL2(self):
838             if test_support.verbose:
839                 sys.stdout.write("\n")
840             tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
841             tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
842             tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
843             tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True)
844             tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
845             tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
846
847         def testProtocolSSL23(self):
848             if test_support.verbose:
849                 sys.stdout.write("\n")
850             try:
851                 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
852             except test_support.TestFailed, x:
853                 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
854                 if test_support.verbose:
855                     sys.stdout.write(
856                         " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
857                         % str(x))
858             tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True)
859             tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
860             tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True)
861
862             tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
863             tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
864             tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
865
866             tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
867             tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
868             tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
869
870         def testProtocolSSL3(self):
871             if test_support.verbose:
872                 sys.stdout.write("\n")
873             tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True)
874             tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
875             tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
876             tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
877             tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False)
878             tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
879
880         def testProtocolTLS1(self):
881             if test_support.verbose:
882                 sys.stdout.write("\n")
883             tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True)
884             tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
885             tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
886             tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
887             tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
888             tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False)
889
890         def testSTARTTLS (self):
891
892             msgs = ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4", "ENDTLS", "msg 5", "msg 6")
893
894             server = ThreadedEchoServer(CERTFILE,
895                                         ssl_version=ssl.PROTOCOL_TLSv1,
896                                         starttls_server=True,
897                                         chatty=True,
898                                         connectionchatty=True)
899             flag = threading.Event()
900             server.start(flag)
901             # wait for it to start
902             flag.wait()
903             # try to connect
904             wrapped = False
905             try:
906                 try:
907                     s = socket.socket()
908                     s.setblocking(1)
909                     s.connect((HOST, server.port))
910                 except Exception, x:
911                     raise test_support.TestFailed("Unexpected exception:  " + str(x))
912                 else:
913                     if test_support.verbose:
914                         sys.stdout.write("\n")
915                     for indata in msgs:
916                         if test_support.verbose:
917                             sys.stdout.write(
918                                 " client:  sending %s...\n" % repr(indata))
919                         if wrapped:
920                             conn.write(indata)
921                             outdata = conn.read()
922                         else:
923                             s.send(indata)
924                             outdata = s.recv(1024)
925                         if (indata == "STARTTLS" and
926                             outdata.strip().lower().startswith("ok")):
927                             if test_support.verbose:
928                                 sys.stdout.write(
929                                     " client:  read %s from server, starting TLS...\n"
930                                     % repr(outdata))
931                             conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
932                             wrapped = True
933                         elif (indata == "ENDTLS" and
934                             outdata.strip().lower().startswith("ok")):
935                             if test_support.verbose:
936                                 sys.stdout.write(
937                                     " client:  read %s from server, ending TLS...\n"
938                                     % repr(outdata))
939                             s = conn.unwrap()
940                             wrapped = False
941                         else:
942                             if test_support.verbose:
943                                 sys.stdout.write(
944                                     " client:  read %s from server\n" % repr(outdata))
945                     if test_support.verbose:
946                         sys.stdout.write(" client:  closing connection.\n")
947                     if wrapped:
948                         conn.write("over\n")
949                     else:
950                         s.send("over\n")
951                     s.close()
952             finally:
953                 server.stop()
954                 server.join()
955
956         def testSocketServer(self):
957
958             server = SocketServerHTTPSServer(CERTFILE)
959             flag = threading.Event()
960             server.start(flag)
961             # wait for it to start
962             flag.wait()
963             # try to connect
964             try:
965                 if test_support.verbose:
966                     sys.stdout.write('\n')
967                 d1 = open(CERTFILE, 'rb').read()
968                 d2 = ''
969                 # now fetch the same data from the HTTPS server
970                 url = 'https://127.0.0.1:%d/%s' % (
971                     server.port, os.path.split(CERTFILE)[1])
972                 f = urllib.urlopen(url)
973                 dlen = f.info().getheader("content-length")
974                 if dlen and (int(dlen) > 0):
975                     d2 = f.read(int(dlen))
976                     if test_support.verbose:
977                         sys.stdout.write(
978                             " client: read %d bytes from remote server '%s'\n"
979                             % (len(d2), server))
980                 f.close()
981             except:
982                 msg = ''.join(traceback.format_exception(*sys.exc_info()))
983                 if test_support.verbose:
984                     sys.stdout.write('\n' + msg)
985                 raise test_support.TestFailed(msg)
986             else:
987                 if not (d1 == d2):
988                     raise test_support.TestFailed(
989                         "Couldn't fetch data from HTTPS server")
990             finally:
991                 server.stop()
992                 server.join()
993
994         def testWrappedAccept (self):
995
996             if test_support.verbose:
997                 sys.stdout.write("\n")
998             serverParamsTest(CERTFILE, ssl.PROTOCOL_SSLv23, ssl.CERT_REQUIRED,
999                              CERTFILE, CERTFILE, ssl.PROTOCOL_SSLv23,
1000                              chatty=True, connectionchatty=True,
1001                              wrap_accepting_socket=True)
1002
1003
1004         def testAsyncoreServer (self):
1005
1006             indata = "TEST MESSAGE of mixed case\n"
1007
1008             if test_support.verbose:
1009                 sys.stdout.write("\n")
1010             server = AsyncoreEchoServer(CERTFILE)
1011             flag = threading.Event()
1012             server.start(flag)
1013             # wait for it to start
1014             flag.wait()
1015             # try to connect
1016             try:
1017                 try:
1018                     s = ssl.wrap_socket(socket.socket())
1019                     s.connect(('127.0.0.1', server.port))
1020                 except ssl.SSLError, x:
1021                     raise test_support.TestFailed("Unexpected SSL error:  " + str(x))
1022                 except Exception, x:
1023                     raise test_support.TestFailed("Unexpected exception:  " + str(x))
1024                 else:
1025                     if test_support.verbose:
1026                         sys.stdout.write(
1027                             " client:  sending %s...\n" % (repr(indata)))
1028                     s.write(indata)
1029                     outdata = s.read()
1030                     if test_support.verbose:
1031                         sys.stdout.write(" client:  read %s\n" % repr(outdata))
1032                     if outdata != indata.lower():
1033                         raise test_support.TestFailed(
1034                             "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
1035                             % (outdata[:min(len(outdata),20)], len(outdata),
1036                                indata[:min(len(indata),20)].lower(), len(indata)))
1037                     s.write("over\n")
1038                     if test_support.verbose:
1039                         sys.stdout.write(" client:  closing connection.\n")
1040                     s.close()
1041             finally:
1042                 server.stop()
1043                 # wait for server thread to end
1044                 server.join()
1045
1046
1047         def testAllRecvAndSendMethods(self):
1048
1049             if test_support.verbose:
1050                 sys.stdout.write("\n")
1051
1052             server = ThreadedEchoServer(CERTFILE,
1053                                         certreqs=ssl.CERT_NONE,
1054                                         ssl_version=ssl.PROTOCOL_TLSv1,
1055                                         cacerts=CERTFILE,
1056                                         chatty=True,
1057                                         connectionchatty=False)
1058             flag = threading.Event()
1059             server.start(flag)
1060             # wait for it to start
1061             flag.wait()
1062             # try to connect
1063             try:
1064                 s = ssl.wrap_socket(socket.socket(),
1065                                     server_side=False,
1066                                     certfile=CERTFILE,
1067                                     ca_certs=CERTFILE,
1068                                     cert_reqs=ssl.CERT_NONE,
1069                                     ssl_version=ssl.PROTOCOL_TLSv1)
1070                 s.connect((HOST, server.port))
1071             except ssl.SSLError as x:
1072                 raise support.TestFailed("Unexpected SSL error:  " + str(x))
1073             except Exception as x:
1074                 raise support.TestFailed("Unexpected exception:  " + str(x))
1075             else:
1076                 # helper methods for standardising recv* method signatures
1077                 def _recv_into():
1078                     b = bytearray("\0"*100)
1079                     count = s.recv_into(b)
1080                     return b[:count]
1081
1082                 def _recvfrom_into():
1083                     b = bytearray("\0"*100)
1084                     count, addr = s.recvfrom_into(b)
1085                     return b[:count]
1086
1087                 # (name, method, whether to expect success, *args)
1088                 send_methods = [
1089                     ('send', s.send, True, []),
1090                     ('sendto', s.sendto, False, ["some.address"]),
1091                     ('sendall', s.sendall, True, []),
1092                 ]
1093                 recv_methods = [
1094                     ('recv', s.recv, True, []),
1095                     ('recvfrom', s.recvfrom, False, ["some.address"]),
1096                     ('recv_into', _recv_into, True, []),
1097                     ('recvfrom_into', _recvfrom_into, False, []),
1098                 ]
1099                 data_prefix = u"PREFIX_"
1100
1101                 for meth_name, send_meth, expect_success, args in send_methods:
1102                     indata = data_prefix + meth_name
1103                     try:
1104                         send_meth(indata.encode('ASCII', 'strict'), *args)
1105                         outdata = s.read()
1106                         outdata = outdata.decode('ASCII', 'strict')
1107                         if outdata != indata.lower():
1108                             raise support.TestFailed(
1109                                 "While sending with <<%s>> bad data "
1110                                 "<<%r>> (%d) received; "
1111                                 "expected <<%r>> (%d)\n" % (
1112                                     meth_name, outdata[:20], len(outdata),
1113                                     indata[:20], len(indata)
1114                                 )
1115                             )
1116                     except ValueError as e:
1117                         if expect_success:
1118                             raise support.TestFailed(
1119                                 "Failed to send with method <<%s>>; "
1120                                 "expected to succeed.\n" % (meth_name,)
1121                             )
1122                         if not str(e).startswith(meth_name):
1123                             raise support.TestFailed(
1124                                 "Method <<%s>> failed with unexpected "
1125                                 "exception message: %s\n" % (
1126                                     meth_name, e
1127                                 )
1128                             )
1129
1130                 for meth_name, recv_meth, expect_success, args in recv_methods:
1131                     indata = data_prefix + meth_name
1132                     try:
1133                         s.send(indata.encode('ASCII', 'strict'))
1134                         outdata = recv_meth(*args)
1135                         outdata = outdata.decode('ASCII', 'strict')
1136                         if outdata != indata.lower():
1137                             raise support.TestFailed(
1138                                 "While receiving with <<%s>> bad data "
1139                                 "<<%r>> (%d) received; "
1140                                 "expected <<%r>> (%d)\n" % (
1141                                     meth_name, outdata[:20], len(outdata),
1142                                     indata[:20], len(indata)
1143                                 )
1144                             )
1145                     except ValueError as e:
1146                         if expect_success:
1147                             raise support.TestFailed(
1148                                 "Failed to receive with method <<%s>>; "
1149                                 "expected to succeed.\n" % (meth_name,)
1150                             )
1151                         if not str(e).startswith(meth_name):
1152                             raise support.TestFailed(
1153                                 "Method <<%s>> failed with unexpected "
1154                                 "exception message: %s\n" % (
1155                                     meth_name, e
1156                                 )
1157                             )
1158                         # consume data
1159                         s.read()
1160
1161                 s.write("over\n".encode("ASCII", "strict"))
1162                 s.close()
1163             finally:
1164                 server.stop()
1165                 server.join()
1166
1167
1168 def test_main(verbose=False):
1169     if skip_expected:
1170         raise test_support.TestSkipped("No SSL support")
1171
1172     global CERTFILE, SVN_PYTHON_ORG_ROOT_CERT
1173     CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
1174                             "keycert.pem")
1175     SVN_PYTHON_ORG_ROOT_CERT = os.path.join(
1176         os.path.dirname(__file__) or os.curdir,
1177         "https_svn_python_org_root.pem")
1178
1179     if (not os.path.exists(CERTFILE) or
1180         not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT)):
1181         raise test_support.TestFailed("Can't read certificate files!")
1182
1183     TESTPORT = test_support.find_unused_port()
1184     if not TESTPORT:
1185         raise test_support.TestFailed("Can't find open port to test servers on!")
1186
1187     tests = [BasicTests]
1188
1189     if test_support.is_resource_enabled('network'):
1190         tests.append(NetworkedTests)
1191
1192     if _have_threads:
1193         thread_info = test_support.threading_setup()
1194         if thread_info and test_support.is_resource_enabled('network'):
1195             tests.append(ThreadedTests)
1196
1197     test_support.run_unittest(*tests)
1198
1199     if _have_threads:
1200         test_support.threading_cleanup(*thread_info)
1201
1202 if __name__ == "__main__":
1203     test_main()