Source code for autopush.router.fcm

"""FCM legacy HTTP Router"""
from typing import Any  # noqa

import pyfcm
from requests.exceptions import ConnectionError
from twisted.internet.threads import deferToThread
from twisted.logger import Logger

from autopush.exceptions import RouterException
from autopush.metrics import make_tags
from autopush.router.interface import RouterResponse
from autopush.types import JSONDict  # noqa


[docs]class FCMRouter(object): """FCM Router Implementation Note: FCM is a newer branch of GCM. While there's not much change required for the server, there is significant work required for the client. To that end, having a separate router allows the "older" GCM to persist and lets the client determine when they want to use the newer FCM route. """ log = Logger() gcm = None dryRun = 0 collapseKey = "simplepush" MAX_TTL = 2419200 reasonTable = { "MissingRegistration": { "msg": ("'to' or 'registration_id' is blank or" " invalid: {regid}"), "err": 500, "errno": 1, }, "InvalidRegistration": { "msg": "registration_id is invalid: {regid}", "err": 410, "errno": 105, }, "NotRegistered": { "msg": "device has unregistered with FCM: {regid}", "err": 410, "errno": 103, }, "InvalidPackageName": { "msg": "Invalid Package Name specified", "err": 500, "errno": 2, "crit": True, }, "MismatchSenderid": { "msg": "Invalid SenderID used: {senderid}", "err": 410, "errno": 105, "crit": True, }, "MessageTooBig": { "msg": "Message length was too big: {nlen}", "err": 413, "errno": 104, }, "InvalidDataKey": { "msg": ("Payload contains an invalid or restricted " "key value"), "err": 500, "errno": 3, "crit": True, }, "InvalidTtl": { "msg": "Invalid TimeToLive {ttl}", "err": 400, "errno": 111, }, "Unavailable": { "msg": "Message has timed out or device is unavailable", "err": 200, "errno": 0, }, "InternalServerError": { "msg": "FCM internal server error", "err": 500, "errno": 999, }, "DeviceMessageRateExceeded": { "msg": "Too many messages for this device", "err": 503, "errno": 4, }, "TopicsMessageRateExceeded": { "msg": "Too many subscribers for this topic", "err": 503, "errno": 5, "crit": True, }, "Unreported": { "msg": "Error has no reported reason.", "err": 500, "errno": 999, "crit": True, } }
[docs] def __init__(self, conf, router_conf, metrics): """Create a new FCM router and connect to FCM""" self.conf = conf self.router_conf = router_conf self.metrics = metrics self.min_ttl = router_conf.get("ttl", 60) self.dryRun = router_conf.get("dryrun", False) self.collapseKey = router_conf.get("collapseKey", "webpush") self.clients = {} try: for (sid, creds) in router_conf["creds"].items(): self.clients[sid] = pyfcm.FCMNotification( api_key=creds["auth"]) except Exception as e: self.log.error("Could not instantiate FCM {ex}", ex=e) raise IOError("FCM Bridge not initiated in main") self._base_tags = ["platform:fcm"] self.log.debug("Starting FCM router...")
def amend_endpoint_response(self, response, router_data): # type: (JSONDict, JSONDict) -> None response["senderid"] = router_data.get('app_id')
[docs] def register(self, uaid, router_data, app_id, *args, **kwargs): # type: (str, JSONDict, str, *Any, **Any) -> None """Validate that the FCM Instance Token is in the ``router_data``""" # "token" is the FCM registration id token generated by the client. if "token" not in router_data: raise self._error("connect info missing FCM Instance 'token'", status=401, uri=kwargs.get('uri'), senderid=repr(app_id)) # senderid is the remote client's senderID value. This value is # very difficult for the client to change, and there was a problem # where some clients had an older, invalid senderID. We need to # be able to match senderID to it's corresponding auth key. # If the client has an unexpected or invalid SenderID, # it is impossible for us to reach them. if app_id not in self.clients: raise self._error("Invalid SenderID", status=410, errno=105) router_data["app_id"] = app_id
[docs] def route_notification(self, notification, uaid_data): """Start the FCM notification routing, returns a deferred""" router_data = uaid_data["router_data"] # Kick the entire notification routing off to a thread return deferToThread(self._route, notification, router_data)
[docs] def _route(self, notification, router_data): """Blocking FCM call to route the notification""" # THIS MUST MATCH THE CHANNELID GENERATED BY THE REGISTRATION SERVICE # Currently this value is in hex form. data = {"chid": notification.channel_id.hex} if not router_data.get("token"): raise self._error("No registration token found. " "Rejecting message.", 410, errno=106, log_exception=False) regid = router_data.get("token") # Payload data is optional. The endpoint handler validates that the # correct encryption headers are included with the data. if notification.data: mdata = self.router_conf.get('max_data', 4096) if notification.data_length > mdata: raise self._error("This message is intended for a " + "constrained device and is limited " + "to 3070 bytes. Converted buffer too " + "long by %d bytes" % (notification.data_length - mdata), 413, errno=104, log_exception=False) data['body'] = notification.data data['con'] = notification.headers['encoding'] if 'encryption' in notification.headers: data['enc'] = notification.headers['encryption'] if 'crypto_key' in notification.headers: data['cryptokey'] = notification.headers['crypto_key'] elif 'encryption_key' in notification.headers: data['enckey'] = notification.headers['encryption_key'] try: client = self.clients[router_data["app_id"]] except KeyError: self.log.critical("Missing FCM bridge credentials for {id}", id=router_data['app_id']) raise RouterException("Server error", status_code=500, error=901) # registration_ids are the FCM instance tokens (specified during # registration. router_ttl = min(self.MAX_TTL, max(self.min_ttl, notification.ttl or 0)) try: result = client.notify_single_device( collapse_key=self.collapseKey, data_message=data, dry_run=self.dryRun or ('dryrun' in router_data), registration_id=regid, time_to_live=router_ttl, ) except pyfcm.errors.AuthenticationError as e: self.log.error("Authentication Error: %s" % e) raise RouterException("Server error", status_code=500) except ConnectionError as e: self.metrics.increment("notification.bridge.error", tags=make_tags( self._base_tags, error=502, errno=0, reason="connection_unavailable")) self.log.warn("Could not connect to FCM server: %s" % e) raise RouterException("Server error", status_code=502, log_exception=False) except Exception as e: self.log.error("Unhandled FCM Error: %s" % e) raise RouterException("Server error", status_code=500) return self._process_reply(result, notification, router_data, ttl=router_ttl)
[docs] def _error(self, err, status, **kwargs): """Error handler that raises the RouterException""" self.log.debug(err, **kwargs) return RouterException(err, status_code=status, response_body=err, **kwargs)
[docs] def _process_reply(self, reply, notification, router_data, ttl): """Process FCM send reply""" # acks: # for reg_id, msg_id in reply.success.items(): # updates result = reply.get('results', [{}])[0] if reply.get('canonical_ids'): old_id = router_data['token'] new_id = result.get('registration_id') self.log.debug("FCM id changed : {old} => {new}", old=old_id, new=new_id) self.metrics.increment("notification.bridge.error", tags=make_tags(self._base_tags, error=502, errno=0, reason="reregister")) return RouterResponse(status_code=503, response_body="Please try request again.", router_data=dict(token=new_id)) if reply.get('failure'): self.metrics.increment("notification.bridge.error", tags=make_tags(self._base_tags, error=502, errno=0, reason="failure")) reason = result.get('error', "Unreported") err = self.reasonTable.get(reason) if err.get("crit", False): self.log.critical( err['msg'], nlen=notification.data_length, regid=router_data["token"], senderid=router_data.get('token'), ttl=notification.ttl, ) raise RouterException("FCM failure to deliver", status_code=err['err'], response_body="Please try request " "later.", log_exception=False) self.log.debug("{msg} : {info}", msg=err['msg'], info={"app_id": router_data["app_id"], "reason": reason}) return RouterResponse( status_code=err['err'], errno=err['errno'], response_body=err['msg'], router_data={}, ) self.metrics.increment("notification.bridge.sent", tags=self._base_tags) self.metrics.increment("notification.message_data", notification.data_length, tags=make_tags(self._base_tags, destination="Direct")) location = "%s/m/%s" % (self.conf.endpoint_url, notification.version) return RouterResponse(status_code=201, response_body="", headers={"TTL": ttl, "Location": location}, logged_status=200)