"""Metrics interface and implementations"""
from typing import ( # noqa
TYPE_CHECKING,
Any,
Optional,
Sequence
)
from twisted.internet import reactor
from txstatsd.client import StatsDClientProtocol, TwistedStatsDClient
from txstatsd.metrics.metrics import Metrics
import datadog
from datadog import ThreadStats
from autopush import logging
if TYPE_CHECKING: # pragma: nocover
from autopush.config import AutopushConfig # noqa
[docs]class IMetrics(object):
"""Metrics interface
Each method except :meth:`__init__` and :meth:`start` must be implemented.
Additional ``kwargs`` may be recorded as additional metric tags for metric
systems that support it, otherwise they should be ignored.
"""
[docs] def __init__(self, *args, **kwargs):
"""Setup the metrics"""
[docs] def start(self):
"""Start any connection needed for metric transmission"""
[docs] def increment(self, name, count=1, **kwargs):
"""Increment a counter for a metric name"""
raise NotImplementedError("No increment implemented")
[docs] def gauge(self, name, count, **kwargs):
"""Record a gauge for a metric name"""
raise NotImplementedError("No gauge implemented")
[docs] def timing(self, name, duration, **kwargs):
"""Record a timing in ms for a metric name"""
raise NotImplementedError("No timing implemented")
[docs]class SinkMetrics(IMetrics):
"""Exists to ignore metrics when metrics are not active"""
[docs] def increment(self, name, count=1, **kwargs):
pass
[docs] def gauge(self, name, count, **kwargs):
pass
[docs] def timing(self, name, duration, **kwargs):
pass
class TwistedMetrics(object):
"""Twisted implementation of statsd output"""
def __init__(self, statsd_host="localhost", statsd_port=8125):
self.client = TwistedStatsDClient.create(statsd_host, statsd_port)
self._metric = Metrics(connection=self.client, namespace="autopush")
def start(self):
protocol = StatsDClientProtocol(self.client)
reactor.listenUDP(0, protocol)
def increment(self, name, count=1, **kwargs):
self._metric.increment(name, count, **kwargs)
def gauge(self, name, count, **kwargs):
self._metric.gauge(name, count, **kwargs)
def timing(self, name, duration, **kwargs):
self._metric.timing(name, duration, **kwargs)
def make_tags(base=None, **kwargs):
# type: (Sequence[str], **Any) -> Sequence[str]
"""Generate a list of tag values"""
tags = list(base or [])
tags.extend('{}:{}'.format(key, val) for key, val in kwargs.iteritems())
return tags
class DatadogMetrics(object):
"""DataDog Metric backend"""
def __init__(self, api_key, app_key, hostname, flush_interval=10,
namespace="autopush"):
datadog.initialize(api_key=api_key, app_key=app_key,
host_name=hostname)
self._client = ThreadStats()
self._flush_interval = flush_interval
self._host = hostname
self._namespace = namespace
def _prefix_name(self, name):
return "%s.%s" % (self._namespace, name)
def start(self):
self._client.start(flush_interval=self._flush_interval,
roll_up_interval=self._flush_interval)
def increment(self, name, count=1, **kwargs):
self._client.increment(self._prefix_name(name), count, host=self._host,
**kwargs)
def gauge(self, name, count, **kwargs):
self._client.gauge(self._prefix_name(name), count, host=self._host,
**kwargs)
def timing(self, name, duration, **kwargs):
self._client.timing(self._prefix_name(name), value=duration,
host=self._host, **kwargs)
def from_config(conf):
# type: (AutopushConfig) -> IMetrics
"""Create an IMetrics from the given config"""
if conf.datadog_api_key:
return DatadogMetrics(
hostname=logging.instance_id_or_hostname if conf.ami_id else
conf.hostname,
api_key=conf.datadog_api_key,
app_key=conf.datadog_app_key,
flush_interval=conf.datadog_flush_interval,
)
elif conf.statsd_host:
return TwistedMetrics(conf.statsd_host, conf.statsd_port)
else:
return SinkMetrics()
def periodic_reporter(metrics, prefix=''):
# type: (IMetrics, Optional[str]) -> None
"""Emit metrics on twisted's thread pool.
Only meant to be called via a LoopingCall (TimerService).
"""
# unfortunately stats only available via the private '_team'
stats = reactor.getThreadPool()._team.statistics()
for attr in ('idleWorkerCount', 'busyWorkerCount', 'backloggedWorkCount'):
name = '{}{}twisted.threadpool.{}'.format(
prefix,
'.' if prefix else '',
attr
)
metrics.gauge(name, getattr(stats, attr))