Source code for autopush.router.gcm

"""GCM Router"""
from typing import Any  # noqa

from twisted.logger import Logger
from twisted.internet.error import ConnectError, TimeoutError

from autopush.constants import DEFAULT_ROUTER_TIMEOUT
from autopush.exceptions import RouterException
from autopush.metrics import make_tags
from autopush.router import gcmclient
from autopush.router.interface import RouterResponse
from autopush.types import JSONDict  # noqa


[docs]class GCMRouter(object): """GCM Router Implementation""" log = Logger() dryRun = 0 collapseKey = "simplepush" MAX_TTL = 2419200
[docs] def __init__(self, conf, router_conf, metrics): """Create a new GCM router and connect to GCM""" self.conf = conf self.router_conf = router_conf self.metrics = metrics self.min_ttl = router_conf.get("ttl", 60) self.dryRun = router_conf.get("dryrun", False) self.collapseKey = router_conf.get("collapseKey") timeout = router_conf.get("timeout", DEFAULT_ROUTER_TIMEOUT) self.gcmclients = {} self.senderIDs = {} self.gcm_endpoint = router_conf["endpoint"] # Flatten the SenderID list from human readable and init gcmclient if not router_conf.get("senderIDs"): raise IOError("SenderIDs not configured.") for sid in router_conf.get("senderIDs"): auth = router_conf.get("senderIDs").get(sid).get("auth") self.senderIDs[sid] = auth self.gcmclients[sid] = gcmclient.GCM(auth, timeout=timeout, logger=self.log, endpoint=self.gcm_endpoint) self._base_tags = ["platform:gcm"] self.log.debug("Starting GCM router...")
def amend_endpoint_response(self, response, router_data): # type: (JSONDict, JSONDict) -> None response["senderid"] = router_data.get('creds', {}).get('senderID')
[docs] def register(self, uaid, router_data, app_id, *args, **kwargs): # type: (str, JSONDict, str, *Any, **Any) -> None """Validate that the GCM Instance Token is in the ``router_data``""" # "token" is the GCM registration id token generated by the client. if "token" not in router_data: raise self._error("connect info missing GCM Instance 'token'", status=401) # senderid is the remote client's senderID value. This value is # very difficult for the client to change, and there was a problem # where some clients had an older, invalid senderID. We need to # be able to match senderID to it's corresponding auth key. # If the client has an unexpected or invalid SenderID, # it is impossible for us to reach them. senderid = app_id if senderid not in self.senderIDs: raise self._error("Invalid SenderID", status=410, errno=105, uri=kwargs.get('uri'), senderid=senderid) # Assign a senderid router_data["creds"] = {"senderID": senderid, "auth": self.senderIDs[senderid]}
[docs] def route_notification(self, notification, uaid_data): """Start the GCM notification routing, returns a deferred""" # Kick the entire notification routing off to a thread return self._route(notification, uaid_data)
[docs] def _route(self, notification, uaid_data): """Blocking GCM call to route the notification""" router_data = uaid_data["router_data"] # THIS MUST MATCH THE CHANNELID GENERATED BY THE REGISTRATION SERVICE # Currently this value is in hex form. data = {"chid": notification.channel_id.hex} # Payload data is optional. The endpoint handler validates that the # correct encryption headers are included with the data. if notification.data: mdata = self.router_conf.get('max_data', 4096) if notification.data_length > mdata: raise self._error("This message is intended for a " + "constrained device and is limited " + "to 3070 bytes. Converted buffer too " + "long by %d bytes" % (notification.data_length - mdata), 413, errno=104, log_exception=False) data['body'] = notification.data data['con'] = notification.headers['encoding'] if 'encryption' in notification.headers: data['enc'] = notification.headers.get('encryption') if 'crypto_key' in notification.headers: data['cryptokey'] = notification.headers['crypto_key'] elif 'encryption_key' in notification.headers: data['enckey'] = notification.headers['encryption_key'] # registration_ids are the GCM instance tokens (specified during # registration. router_ttl = min(self.MAX_TTL, max(notification.ttl or 0, self.min_ttl)) payload = gcmclient.JSONMessage( registration_ids=[router_data.get("token")], collapse_key=self.collapseKey, time_to_live=router_ttl, dry_run=self.dryRun or ("dryrun" in router_data), data=data, ) try: client = self.gcmclients[router_data['creds']['senderID']] d = client.send(payload) d.addCallback( self._process_reply, uaid_data, router_ttl, notification) d.addErrback( self._process_error ) return d except KeyError: self.log.critical("Missing GCM bridge credentials") raise RouterException("Server error", status_code=500, errno=900)
def _process_error(self, failure): err = failure.value if isinstance(err, gcmclient.GCMAuthenticationError): self.log.error("GCM Authentication Error: %s" % err) raise RouterException("Server error", status_code=500, errno=901) if isinstance(err, TimeoutError): self.log.warn("GCM Timeout: %s" % err) self.metrics.increment("notification.bridge.error", tags=make_tags( self._base_tags, reason="timeout")) raise RouterException("Server error", status_code=502, errno=903, log_exception=False) if isinstance(err, ConnectError): self.log.warn("GCM Unavailable: %s" % err) self.metrics.increment("notification.bridge.error", tags=make_tags( self._base_tags, reason="connection_unavailable")) raise RouterException("Server error", status_code=502, errno=902, log_exception=False) return failure
[docs] def _error(self, err, status, **kwargs): """Error handler that raises the RouterException""" self.log.debug(err, **kwargs) return RouterException(err, status_code=status, response_body=err, **kwargs)
[docs] def _process_reply(self, reply, uaid_data, ttl, notification): """Process GCM send reply""" # acks: # for reg_id, msg_id in reply.success.items(): # updates for old_id, new_id in reply.canonicals.items(): self.log.debug("GCM id changed : {old} => {new}", old=old_id, new=new_id) self.metrics.increment("notification.bridge.error", tags=make_tags(self._base_tags, reason="reregister")) return RouterResponse(status_code=503, response_body="Please try request again.", router_data=dict(token=new_id)) # naks: # uninstall: for reg_id in reply.not_registered: self.metrics.increment("notification.bridge.error", tags=make_tags(self._base_tags, reason="unregistered")) self.log.debug("GCM no longer registered: %s" % reg_id) return RouterResponse( status_code=410, response_body="Endpoint requires client update", router_data={}, ) # for reg_id, err_code in reply.failed.items(): if len(reply.failed.items()) > 0: self.metrics.increment("notification.bridge.error", tags=make_tags(self._base_tags, reason="failure")) self.log.debug("GCM failures: {failed()}", failed=lambda: repr(reply.failed.items())) raise RouterException("GCM unable to deliver", status_code=410, response_body="GCM recipient not available.", log_exception=False, ) # retries: if reply.retry_after: self.metrics.increment("notification.bridge.error", tags=make_tags(self._base_tags, reason="retry")) self.log.warn("GCM retry requested: {failed()}", failed=lambda: repr(reply.failed.items())) raise RouterException("GCM failure to deliver, retry", status_code=503, headers={"Retry-After": reply.retry_after}, response_body="Please try request " "in {} seconds.".format( reply.retry_after ), log_exception=False) self.metrics.increment("notification.bridge.sent", tags=self._base_tags) self.metrics.increment("notification.message_data", notification.data_length, tags=make_tags(self._base_tags, destination='Direct')) location = "%s/m/%s" % (self.conf.endpoint_url, notification.version) return RouterResponse(status_code=201, response_body="", headers={"TTL": ttl, "Location": location}, logged_status=200)