Source code for autopush.ssl

"""Custom SSL configuration"""
from __future__ import absolute_import
import socket  # noqa
import ssl
from typing import (  # noqa
    Any,
    Dict,
    FrozenSet,
    Optional,
    Tuple,
)

from OpenSSL import SSL
from twisted.internet.ssl import DefaultOpenSSLContextFactory

try:
    SSL_PROTO = ssl.PROTOCOL_TLS
except AttributeError:  # pragma: nocover
    SSL_PROTO = ssl.PROTOCOL_SSLv23


MOZILLA_INTERMEDIATE_CIPHERS = (
    'ECDHE-RSA-AES128-GCM-SHA256:'
    'ECDHE-ECDSA-AES128-GCM-SHA256:'
    'ECDHE-RSA-AES256-GCM-SHA384:'
    'ECDHE-ECDSA-AES256-GCM-SHA384:'
    'DHE-RSA-AES128-GCM-SHA256:'
    'DHE-DSS-AES128-GCM-SHA256:'
    'ECDHE-RSA-AES128-SHA256:'
    'ECDHE-ECDSA-AES128-SHA256:'
    'ECDHE-RSA-AES128-SHA:'
    'ECDHE-ECDSA-AES128-SHA:'
    'ECDHE-RSA-AES256-SHA384:'
    'ECDHE-ECDSA-AES256-SHA384:'
    'ECDHE-RSA-AES256-SHA:'
    'ECDHE-ECDSA-AES256-SHA:'
    'DHE-RSA-AES128-SHA256:'
    'DHE-RSA-AES128-SHA:'
    'DHE-DSS-AES128-SHA256:'
    'DHE-RSA-AES256-SHA256:'
    'DHE-DSS-AES256-SHA:'
    'DHE-RSA-AES256-SHA:'
    'AES128-GCM-SHA256:'
    'AES256-GCM-SHA384:'
    'AES128-SHA256:'
    'AES256-SHA256:'
    'AES128-SHA:'
    'AES256-SHA:'
    'AES:'
    'CAMELLIA:DES-CBC3-SHA:'
    '!aNULL:!eNULL:!EXPORT:!DES:!RC4:!MD5:!PSK:!aECDH:'
    '!EDH-DSS-DES-CBC3-SHA:!EDH-RSA-DES-CBC3-SHA:!KRB5-DES-CBC3-SHA'
)


[docs]class AutopushSSLContextFactory(DefaultOpenSSLContextFactory): """A SSL context factory""" def __init__(self, *args, **kwargs): self.dh_file = kwargs.pop('dh_file', None) self.require_peer_certs = kwargs.pop('require_peer_certs', False) DefaultOpenSSLContextFactory.__init__(self, *args, **kwargs)
[docs] def cacheContext(self): """Setup the main context factory with custom SSL settings""" if self._context is None: ctx = self._contextFactory(self.sslmethod) ctx.set_cipher_list(MOZILLA_INTERMEDIATE_CIPHERS) ctx.set_options(SSL.OP_CIPHER_SERVER_PREFERENCE) ctx.set_options(SSL.OP_NO_SSLv2) ctx.set_options(SSL.OP_NO_SSLv3) ctx.set_options(SSL.OP_NO_COMPRESSION) ctx.set_mode(SSL.MODE_RELEASE_BUFFERS) ctx.set_options(SSL.OP_ALL & ~SSL.OP_MICROSOFT_BIG_SSLV3_BUFFER) ctx.use_certificate_chain_file(self.certificateFileName) ctx.use_privatekey_file(self.privateKeyFileName) if self.dh_file: ctx.load_tmp_dh(self.dh_file) if self.require_peer_certs: # Require peer certs but only for use by # RequestHandlers ctx.set_verify( SSL.VERIFY_PEER | SSL.VERIFY_CLIENT_ONCE, self._allow_peer) self._context = ctx
def _allow_peer(self, conn, cert, errno, depth, preverify_ok): # skip verification: we only care about whitelisted signatures # on file return True
def monkey_patch_ssl_wrap_socket(): """Replace ssl.wrap_socket with ssl_wrap_socket_cached""" ssl.wrap_socket = ssl_wrap_socket_cached def undo_monkey_patch_ssl_wrap_socket(): """Undo monkey_patch_ssl_wrap_socket""" ssl.wrap_socket = _orig_ssl_wrap_socket _CacheKey = FrozenSet[Tuple[str, Any]] _sslcontext_cache = {} # type: Dict[_CacheKey, ssl.SSLContext] _orig_ssl_wrap_socket = ssl.wrap_socket def ssl_wrap_socket_cached( sock, # type: socket.socket keyfile=None, # type: Optional[str] certfile=None, # type: Optional[str] server_side=False, # type: bool cert_reqs=ssl.CERT_NONE, # type: int ssl_version=SSL_PROTO, # type: int ca_certs=None, # type: Optional[str] do_handshake_on_connect=True, # type: bool suppress_ragged_eofs=True, # type: bool ciphers=None # type: Optional[str] ): # type: (...) -> ssl.SSLSocket """ssl.wrap_socket replacement that caches SSLContexts""" key_kwargs = ( ('keyfile', keyfile), ('certfile', certfile), ('cert_reqs', cert_reqs), ('ssl_version', ssl_version), ('ca_certs', ca_certs), ('ciphers', ciphers), ) key = frozenset(key_kwargs) context = _sslcontext_cache.get(key) if context is not None: return context.wrap_socket( sock, server_side=server_side, do_handshake_on_connect=do_handshake_on_connect, suppress_ragged_eofs=suppress_ragged_eofs ) wrapped = _orig_ssl_wrap_socket( sock, keyfile=keyfile, certfile=certfile, server_side=server_side, cert_reqs=cert_reqs, ssl_version=ssl_version, ca_certs=ca_certs, do_handshake_on_connect=do_handshake_on_connect, suppress_ragged_eofs=suppress_ragged_eofs, ciphers=ciphers ) _sslcontext_cache[key] = wrapped.context return wrapped