Source code for autopush.router.gcmclient

import json

import treq
from twisted.web.http_headers import Headers
from twisted.logger import Logger
from twisted.internet.error import ConnectError

from autopush.constants import DEFAULT_ROUTER_TIMEOUT
from autopush.exceptions import RouterException


class GCMAuthenticationError(Exception):
    pass


[docs]class Result(object): """Abstraction object for GCM response"""
[docs] def __init__(self, response, message): """Process GCM message and response into abstracted object :param message: Message payload :type message: JSONMessage :param response: GCM response :type response: requests.Response """ self.success = {} self.canonicals = {} self.unavailable = [] self.not_registered = [] self.failed = {} self.message = message self.retry_message = None self.retry_after = ( response.headers.getRawHeaders('Retry-After') or [None])[0]
def parse_response(self, content, code, message): # 401 handled in GCM.process() if code in (400, 404): raise RouterException(content) data = json.loads(content) if not data.get('results'): raise RouterException("Recv'd invalid response from GCM") reg_id = message.payload['registration_ids'][0] for res in data['results']: if 'message_id' in res: self.success[reg_id] = res['message_id'] if 'registration_id' in res: self.canonicals[reg_id] = res['registration_id'] else: if res['error'] in ['Unavailable', 'InternalServerError']: self.unavailable.append(reg_id) elif res['error'] == 'NotRegistered': self.not_registered.append(reg_id) else: self.failed[reg_id] = res['error'] return self
[docs]class JSONMessage(object): """GCM formatted payload """
[docs] def __init__(self, registration_ids, collapse_key, time_to_live, dry_run, data): """Convert data elements into a GCM payload. :param registration_ids: Single or list of registration ids to send to :type registration_ids: str or list :param collapse_key: GCM collapse key for the data. :type collapse_key: str :param time_to_live: Seconds to keep message alive :type time_to_live: int :param dry_run: GCM Dry run flag to allow remote verification :type dry_run: bool :param data: Data elements to send :type data: dict """ if not registration_ids: raise RouterException("No Registration IDs specified") if not isinstance(registration_ids, list): registration_ids = [registration_ids] self.registration_ids = registration_ids self.payload = { 'registration_ids': self.registration_ids, 'time_to_live': int(time_to_live), 'delay_while_idle': False, 'dry_run': bool(dry_run), } if collapse_key: self.payload["collapse_key"] = collapse_key if data: self.payload['data'] = data
[docs]class GCM(object): """Primitive HTTP GCM service handler."""
[docs] def __init__(self, api_key=None, logger=None, metrics=None, endpoint="gcm-http.googleapis.com/gcm/send", **options): """Initialize the GCM primitive. :param api_key: The GCM API key (from the Google developer console) :type api_key: str :param logger: Status logger :type logger: logger :param metrics: Metric recorder :type metrics: autopush.metrics.IMetric :param endpoint: GCM endpoint override :type endpoint: str :param options: Additional options :type options: dict """ self._endpoint = "https://{}".format(endpoint) self._api_key = api_key self.metrics = metrics self.log = logger or Logger() self._options = options self._sender = treq.post
def process(self, response, payload): if response.code == 401: raise GCMAuthenticationError("Authentication Error") result = Result(response, payload) if 500 <= response.code <= 599: result.retry_message = payload return result # Fetch the content body d = response.text() d.addCallback(result.parse_response, response.code, payload) return d def error(self, failure): if isinstance(failure.value, GCMAuthenticationError) or \ isinstance(failure.value, ConnectError): raise failure.value self.log.error("GCMClient failure: {}".format(failure.value)) raise RouterException("Server error: {}".format(failure.value))
[docs] def send(self, payload): """Send a payload to GCM :param payload: Dictionary of GCM formatted data :type payload: JSONMessage :return: Result """ headers = Headers({ 'Content-Type': ['application/json'], 'Authorization': ['key={}'.format(self._api_key)], }) if 'timeout' not in self._options: self._options['timeout'] = DEFAULT_ROUTER_TIMEOUT d = self._sender( url=self._endpoint, headers=headers, data=json.dumps(payload.payload), **self._options ) # handle the immediate response (which contains no body) d.addCallback(self.process, payload) d.addErrback(self.error) return d