Source code for autopush.logging

"""Custom Logging Setup
"""
import io
import json
import Queue
import pkg_resources
import socket
import sys
import time
import threading
from typing import Any  # noqa

import boto3
import raven
from raven.transport.twisted import TwistedHTTPTransport
from raven.utils.stacks import iter_stack_frames
from twisted.internet import reactor
from twisted.logger import (
    formatEvent,
    formatEventAsClassicLogText,
    globalLogBeginner,
    globalLogPublisher,
    LogLevel,
    ILogObserver
)
from zope.interface import implementer

from autopush.utils import get_ec2_instance_id

# A complete set of keys we don't include in Fields from a log event
IGNORED_KEYS = frozenset([
    "factory",
    "failure",
    "format",
    "isError",
    "log_failure",
    "log_format",
    "log_flattened",
    "log_level",
    "log_legacy",
    "log_logger",
    "log_source",
    "log_system",
    "log_text",
    "log_time",
    "log_trace",
    "message",
    "message_type",
    "severity",
    "task_level",
    "time",
    "timestamp",
    "type",
    "why",
])


# whether the global LogBeginner.beginLoggingTo has been called: it
# should only be called once
began_logging = False

# an ec2 instance id or falling back to the hostname
instance_id_or_hostname = None


def begin_or_register(observer, redirectStandardIO=False, **kwargs):
    # type: (Any, bool, **Any) -> None
    """Register observer with the global LogPublisher

    Registers via the global LogBeginner the first time called.
    """
    global began_logging
    if not began_logging:
        globalLogBeginner.beginLoggingTo(
            [observer],
            redirectStandardIO=redirectStandardIO,
            **kwargs
        )
        began_logging = True
    else:
        globalLogPublisher.addObserver(observer)


[docs]@implementer(ILogObserver) class PushLogger(object): """Twisted LogObserver implementation Supports firehose delivery, Raven exception reporting, and json/test console debugging output. """
[docs] def __init__(self, logger_name, log_level="debug", log_format="json", log_output="stdout", sentry_dsn=None, firehose_delivery_stream=None): self.logger_name = "-".join([ logger_name, pkg_resources.get_distribution("autopush").version ]) self._filename = None self.log_level = LogLevel.lookupByName(log_level) if log_output == "stdout": self._output = sys.stdout elif log_output == "none": self._output = None else: self._filename = log_output self._output = None if log_format == "json": self.format_event = self.json_format else: self.format_event = formatEventAsClassicLogText if sentry_dsn: self.raven_client = raven.Client( release=raven.fetch_package_version("autopush"), transport=TwistedHTTPTransport, enable_breadcrumbs=False, ) else: self.raven_client = None if firehose_delivery_stream: self.firehose = FirehoseProcessor( stream_name=firehose_delivery_stream) else: self.firehose = None
[docs] def __call__(self, event): if self.raven_client and 'log_failure' in event: self.raven_log(event) if event["log_level"] < self.log_level: return text = self.format_event(event) if self.firehose: self.firehose.process(text) if self._output: self._output.write(unicode(text)) self._output.flush()
def raven_log(self, event): f = event["log_failure"] stack = None extra = dict() tb = f.getTracebackObject() if not tb: # include the current stack for at least some # context. sentry's expecting that "Frames should be # sorted from oldest to newest." stack = list(iter_stack_frames())[:-5] # approx. extra = dict(no_failure_tb=True) extra.update( log_format=event.get('log_format'), log_namespace=event.get('log_namespace'), client_info=event.get('client_info'), ) reactor.callFromThread( self.raven_client.captureException, exc_info=(f.type, f.value, tb), stack=stack, extra=extra, ) # just in case del tb def json_format(self, event): error = bool(event.get("isError")) or "log_failure" in event ts = event["log_time"] if error: severity = 3 else: severity = 5 def to_fields(kv): reply = dict() for k, v in kv: if (k not in IGNORED_KEYS and type(v) in (str, unicode, list, int, float, bool)): reply[k] = v return reply msg = { "Hostname": instance_id_or_hostname, "Timestamp": ts * 1000 * 1000 * 1000, "Type": "twisted:log", "Severity": event.get("severity") or severity, "EnvVersion": "2.0", "Fields": to_fields(event.iteritems()), "Logger": self.logger_name, } # flatten the client_info into Fields ci = event.get('client_info') if ci and isinstance(ci, dict): msg['Fields'].update( to_fields(ci.iteritems())) # flatten timings into Fields ti = event.get('timings') if ti and isinstance(ti, dict): msg["Fields"].update( to_fields(ti.iteritems()) ) # Add the nicely formatted message msg["Fields"]["message"] = formatEvent(event) return json.dumps(msg, skipkeys=True) + "\n" def start(self): if self._filename: self._output = io.open(self._filename, "a", encoding="utf-8") if self.firehose: self.firehose.start() begin_or_register(self) def stop(self): globalLogPublisher.removeObserver(self) if self._filename: self._output.close() self._output = None if self.firehose: self.firehose.stop() @classmethod def setup_logging(cls, logger_name, log_level="info", log_format="json", log_output="stdout", sentry_dsn=None, firehose_delivery_stream=None, no_aws=False): global instance_id_or_hostname if not instance_id_or_hostname: instance_id = None if no_aws else get_ec2_instance_id() instance_id_or_hostname = instance_id or socket.getfqdn() pl = cls(logger_name, log_level=log_level, log_format=log_format, log_output=log_output, sentry_dsn=sentry_dsn, firehose_delivery_stream=firehose_delivery_stream) pl.start() reactor.addSystemEventTrigger('before', 'shutdown', pl.stop) return pl
[docs]class FirehoseProcessor(object): """Batches log events for sending to AWS FireHose""" RECORD_SEPARATOR = u"\x1e" MAX_RECORD_SIZE = 1024 * 1024 MAX_REQUEST_SIZE = 4 * 1024 * 1024 MAX_RECORD_BATCH = 500 MAX_INTERVAL = 30
[docs] def __init__(self, stream_name, maxsize=0): self._records = Queue.Queue(maxsize=maxsize) self._prepped = [] self._total_size = 0 self._thread = None self._client = boto3.client("firehose") self._run = False self._stream_name = stream_name
def start(self): self._thread = threading.Thread(target=self._worker) self._thread.start() def stop(self): self._records.put_nowait(None) self._thread.join() self._thread = None def process(self, record): try: self._records.put_nowait(record) except Queue.Full: # Drop extra records pass def _worker(self): self._last_send = time.time() while True: time_since_sent = time.time() - self._last_send remaining_wait = self.MAX_INTERVAL - time_since_sent try: record = self._records.get(timeout=remaining_wait) except Queue.Empty: # Send the records self._send_record_batch() continue if record is None: # Stop signal so we exit break # Is this record going to put us over our request size? rec_size = len(record) + 1 if self._total_size + rec_size >= self.MAX_REQUEST_SIZE: self._send_record_batch() # Store this record self._prepped.append(record) self._total_size += rec_size if len(self._prepped) >= self.MAX_RECORD_BATCH: self._send_record_batch() # We're done running, send any remaining self._send_record_batch() def _send_record_batch(self): self._last_send = time.time() if not self._prepped: return # Attempt to send the record batch twice, or give up tries = 0 while tries < 3: response = self._client.put_record_batch( DeliveryStreamName=self._stream_name, Records=[{"Data": bytes(self.RECORD_SEPARATOR + record)} for record in self._prepped] ) if response["FailedPutCount"] > 0: tries += 1 else: break self._prepped = [] self._total_size = 0