"""autopush/autoendpoint daemon scripts"""
import os
from argparse import Namespace # noqa
from twisted.application.internet import (
TCPServer,
TimerService,
SSLServer,
StreamServerEndpointService,
)
from twisted.application.service import MultiService
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from twisted.internet.protocol import ServerFactory # noqa
from twisted.logger import Logger
from typing import ( # noqa
Any,
Dict,
Optional,
Sequence,
)
from autopush import constants
from autopush.http import (
InternalRouterHTTPFactory,
EndpointHTTPFactory,
MemUsageHTTPFactory,
agent_from_config
)
from autopush.config import AutopushConfig
from autopush.db import DatabaseManager, DynamoDBResource # noqa
from autopush.exceptions import InvalidConfig
from autopush.haproxy import HAProxyServerEndpoint
from autopush.logging import PushLogger
from autopush.main_argparse import parse_connection, parse_endpoint
from autopush.metrics import periodic_reporter
from autopush.router import routers_from_config
from autopush.ssl import (
monkey_patch_ssl_wrap_socket,
undo_monkey_patch_ssl_wrap_socket,
)
from autopush.websocket import (
ConnectionWSSite,
PushServerFactory,
)
from autopush.websocket import PushServerProtocol # noqa
log = Logger()
[docs]class AutopushMultiService(MultiService):
shared_config_files = (
'/etc/autopush_shared.ini',
'configs/autopush_shared.ini',
'~/.autopush_shared.ini',
'.autopush_shared.ini',
)
config_files = None # type: Sequence[str]
logger_name = None # type: str
def __init__(self, conf, resource=None):
# type: (AutopushConfig, DynamoDBResource) -> None
super(AutopushMultiService, self).__init__()
self.conf = conf
self.db = DatabaseManager.from_config(conf, resource=resource)
self.agent = agent_from_config(conf)
[docs] @staticmethod
def parse_args(config_files, args):
# type: (Sequence[str], Sequence[str]) -> Namespace
"""Parse command line args via argparse"""
raise NotImplementedError # pragma: nocover
[docs] def setup(self, rotate_tables=True):
# type: (bool) -> None
"""Initialize the services"""
if not self.conf.no_sslcontext_cache:
monkey_patch_ssl_wrap_socket()
[docs] def add_maybe_ssl(self, port, factory, ssl_cf):
# type: (int, ServerFactory, Optional[Any]) -> None
"""Add a Service from factory, optionally behind TLS"""
self.addService(
SSLServer(port, factory, contextFactory=ssl_cf, reactor=reactor)
if ssl_cf else
TCPServer(port, factory, reactor=reactor)
)
[docs] def add_timer(self, *args, **kwargs):
"""Add a TimerService"""
self.addService(TimerService(*args, **kwargs))
[docs] def add_memusage(self):
"""Add the memusage Service"""
factory = MemUsageHTTPFactory(self.conf, None)
self.addService(
TCPServer(self.conf.memusage_port, factory, reactor=reactor))
[docs] def run(self):
"""Start the services and run the reactor"""
reactor.suggestThreadPoolSize(constants.THREAD_POOL_SIZE)
self.startService()
reactor.run()
@inlineCallbacks
def stopService(self):
yield self.agent._pool.closeCachedConnections()
yield super(AutopushMultiService, self).stopService()
if not self.conf.no_sslcontext_cache:
undo_monkey_patch_ssl_wrap_socket()
[docs] @classmethod
def _from_argparse(cls, ns, resource=None, **kwargs):
# type: (Namespace, DynamoDBResource, **Any) -> AutopushMultiService
"""Create an instance from argparse/additional kwargs"""
# Add some entropy to prevent potential conflicts.
postfix = os.urandom(4).encode('hex').ljust(8, '0')
conf = AutopushConfig.from_argparse(
ns,
debug=ns.debug,
preflight_uaid="deadbeef00000000deadbeef" + postfix,
**kwargs
)
return cls(conf, resource=resource)
[docs] @classmethod
def main(cls, args=None, use_files=True, resource=None):
# type: (Sequence[str], bool, DynamoDBResource) -> Any
"""Entry point to autopush's main command line scripts.
aka autopush/autoendpoint.
"""
ns = cls.parse_args(cls.config_files if use_files else [], args)
PushLogger.setup_logging(
cls.logger_name,
log_level=ns.log_level or ("debug" if ns.debug else "info"),
log_format="text" if ns.human_logs else "json",
log_output=ns.log_output,
sentry_dsn=bool(os.environ.get("SENTRY_DSN")),
firehose_delivery_stream=ns.firehose_stream_name,
no_aws=ns.no_aws
)
try:
app = cls.from_argparse(ns, resource=resource)
except InvalidConfig as e:
log.critical(str(e))
return 1
app.setup()
return app.run()
[docs]class EndpointApplication(AutopushMultiService):
"""The autoendpoint application"""
config_files = AutopushMultiService.shared_config_files + (
'/etc/autopush_endpoint.ini',
'configs/autopush_endpoint.ini',
'~/.autopush_endpoint.ini',
'.autopush_endpoint.ini'
)
parse_args = staticmethod(parse_endpoint) # type: ignore
logger_name = "Autoendpoint"
endpoint_factory = EndpointHTTPFactory
def __init__(self, conf, resource=None):
# type: (AutopushConfig, DynamoDBResource) -> None
super(EndpointApplication, self).__init__(conf, resource=resource)
self.routers = routers_from_config(conf, self.db, self.agent)
[docs] def setup(self, rotate_tables=True):
super(EndpointApplication, self).setup(rotate_tables)
self.db.setup(self.conf.preflight_uaid)
self.add_endpoint()
if self.conf.memusage_port:
self.add_memusage()
# Start the table rotation checker/updater
if rotate_tables:
self.add_timer(60, self.db.update_rotating_tables)
self.add_timer(15, periodic_reporter, self.db.metrics,
prefix='autoendpoint')
[docs] def add_endpoint(self):
"""Start the Endpoint HTTP router"""
conf = self.conf
factory = self.endpoint_factory(conf, self.db, self.routers)
factory.protocol.maxData = conf.max_data
factory.add_health_handlers()
ssl_cf = factory.ssl_cf()
self.add_maybe_ssl(conf.port, factory, ssl_cf)
if conf.proxy_protocol_port:
ep = HAProxyServerEndpoint(
reactor,
conf.proxy_protocol_port,
ssl_cf
)
self.addService(StreamServerEndpointService(ep, factory))
@classmethod
def from_argparse(cls, ns, resource=None):
# type: (Namespace, DynamoDBResource) -> AutopushMultiService
return super(EndpointApplication, cls)._from_argparse(
ns,
port=ns.port,
endpoint_scheme=ns.endpoint_scheme,
endpoint_hostname=ns.endpoint_hostname or ns.hostname,
endpoint_port=ns.endpoint_port,
cors=not ns.no_cors,
bear_hash_key=ns.auth_key,
proxy_protocol_port=ns.proxy_protocol_port,
aws_ddb_endpoint=ns.aws_ddb_endpoint,
resource=resource
)
[docs]class ConnectionApplication(AutopushMultiService):
"""The autopush application"""
config_files = AutopushMultiService.shared_config_files + (
'/etc/autopush_connection.ini',
'configs/autopush_connection.ini',
'~/.autopush_connection.ini',
'.autopush_connection.ini'
)
parse_args = staticmethod(parse_connection) # type: ignore
logger_name = "Autopush"
internal_router_factory = InternalRouterHTTPFactory
websocket_factory = PushServerFactory
websocket_site_factory = ConnectionWSSite
def __init__(self, conf, resource=None):
# type: (AutopushConfig, DynamoDBResource) -> None
super(ConnectionApplication, self).__init__(
conf,
resource=resource
)
self.clients = {} # type: Dict[str, PushServerProtocol]
[docs] def setup(self, rotate_tables=True):
super(ConnectionApplication, self).setup(rotate_tables)
self.db.setup(self.conf.preflight_uaid)
self.add_internal_router()
if self.conf.memusage_port:
self.add_memusage()
self.add_websocket()
# Start the table rotation checker/updater
if rotate_tables:
self.add_timer(60, self.db.update_rotating_tables)
self.add_timer(15, periodic_reporter, self.db.metrics)
[docs] def add_internal_router(self):
"""Start the internal HTTP notification router"""
factory = self.internal_router_factory(
self.conf, self.db, self.clients)
factory.add_health_handlers()
self.add_maybe_ssl(self.conf.router_port, factory, factory.ssl_cf())
[docs] def add_websocket(self):
"""Start the public WebSocket server"""
conf = self.conf
ws_factory = self.websocket_factory(conf, self.db, self.agent,
self.clients)
site_factory = self.websocket_site_factory(conf, ws_factory)
self.add_maybe_ssl(conf.port, site_factory, site_factory.ssl_cf())
@classmethod
def from_argparse(cls, ns, resource=None):
# type: (Namespace, DynamoDBResource) -> AutopushMultiService
return super(ConnectionApplication, cls)._from_argparse(
ns,
port=ns.port,
endpoint_scheme=ns.endpoint_scheme,
endpoint_hostname=ns.endpoint_hostname,
endpoint_port=ns.endpoint_port,
router_scheme="https" if ns.router_ssl_key else "http",
router_hostname=ns.router_hostname,
router_port=ns.router_port,
env=ns.env,
hello_timeout=ns.hello_timeout,
router_ssl=dict(
key=ns.router_ssl_key,
cert=ns.router_ssl_cert,
dh_param=ns.ssl_dh_param
),
auto_ping_interval=ns.auto_ping_interval,
auto_ping_timeout=ns.auto_ping_timeout,
max_connections=ns.max_connections,
close_handshake_timeout=ns.close_handshake_timeout,
aws_ddb_endpoint=ns.aws_ddb_endpoint,
resource=resource
)