activity.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. import celery
  2. import json
  3. import rdflib
  4. import requests
  5. import requests_http_signature
  6. from .. import activitypub
  7. from .. import graph
  8. from .. import model
  9. from .. import settings
  10. from . import broker_url
  11. from . import broker
  12. from . import database_session
  13. log = celery.utils.log.get_task_logger(__name__)
  14. log.setLevel(settings.LOG_LEVEL)
  15. @broker.task
  16. def post(activity, recipient_uri, key_uri, depth):
  17. """
  18. POST an Activity to a remote Actor. If the remote object is a Collection,
  19. a new task is scheduled for every item of that collection.
  20. If an error occurs during the HTTP request, the task is automatically
  21. rescheduled a number of times depending on the Celery configuration.
  22. IMPORTANT: This task is only responsible for the actual HTTP POSTing of the
  23. Activity to the remote Actor's INBOX. This task will *not* attempt to
  24. perform any sort of validation of the Activity being sent. In other
  25. words: whoever is using this task should validate its own Activity
  26. before sending it.
  27. :param activity: The Activity to be sent.
  28. :type activity: dict
  29. :param recipient_uri: The URI of a remote object to send the Activity to.
  30. :param key_uri: Which RSA key to use for signing the HTTP request.
  31. :param depth: Number of indirections to follow if recipient_uri is a
  32. Collection. See the settings file for more info about this option.
  33. """
  34. with database_session() as (pagure_db, forgefed_graph):
  35. actor_uri = activity['actor']
  36. # Make sure the Actor is not sending the Activity to itself.
  37. # https://www.w3.org/TR/activitypub/#delivery
  38. if actor_uri == recipient_uri:
  39. log.debug('Activity actor and recipient are the same. '
  40. 'Refuting to send.')
  41. return
  42. # If the Activity is addressed to as:Public, the Activity is not POSTed
  43. # to anyone.
  44. # https://www.w3.org/TR/activitypub/#public-addressing
  45. if recipient_uri == 'Public':
  46. log.debug('Not sending to as:Public.')
  47. return
  48. # Retrieve remote object
  49. remote_object = activitypub.fetch(recipient_uri)
  50. # Make sure we got an object, or abort task
  51. assert remote_object, 'Could not fetch remote actor.'
  52. # Select the Actor INBOX. Use sharedInbox if there is one.
  53. # https://www.w3.org/TR/activitypub/#sharedInbox
  54. if 'endpoints' in remote_object and 'sharedInbox' in remote_object['endpoints']:
  55. recipient_inbox = remote_object['endpoints']['sharedInbox']
  56. elif 'inbox' in remote_object:
  57. recipient_inbox = remote_object['inbox']
  58. else:
  59. recipient_inbox = None
  60. # If the remote object does not have an INBOX, we check if it's a
  61. # Collection, in which case we retrieve all its items.
  62. if not recipient_inbox:
  63. log.debug('Recipient is not an Actor. Checking if it\'s a '
  64. 'collection.')
  65. # Do not follow any more Collections.
  66. if depth < 1:
  67. log.debug('Max number of indirections reached. I will not '
  68. 'expand any more collections.')
  69. return
  70. if any(collection == remote_object['type'] for collection in
  71. [ 'Collection', 'OrderedCollection',
  72. 'CollectionPage', 'OrderedCollectionPage' ]):
  73. items = []
  74. page = []
  75. if 'items' in remote_object:
  76. if isinstance(remote_object['items'], str):
  77. items.append(remote_object['items'])
  78. else:
  79. items.extend(remote_object['items'])
  80. if 'orderedItems' in remote_object:
  81. if isinstance(remote_object['orderedItems'], str):
  82. items.append(remote_object['orderedItems'])
  83. else:
  84. items.extend(remote_object['orderedItems'])
  85. if 'first' in remote_object:
  86. page.append(remote_object['first'])
  87. if 'next' in remote_object:
  88. page.append(remote_object['next'])
  89. # Schedule a new delivery for every object found in the collection
  90. for recipient_uri in items:
  91. post.delay(activity, recipient_uri, key_uri, depth - 1)
  92. # TODO If a page "next" links to a previous page (which should
  93. # not happen), this will not detect the loop.
  94. for recipient_uri in page:
  95. post.delay(activity, recipient_uri, key_uri, depth)
  96. if len(items) == 0 and len(page) == 0:
  97. log.debug('Collection found, but it contains no items. '
  98. + 'Activity will not be sent.')
  99. # Since this object does *not* have an INBOX, we stop here for this
  100. # task.
  101. return
  102. # Check if this Activity was already sent (successfully) to this INBOX.
  103. # If it was, we do not resend the same Activity twice. This situation
  104. # could happen with a sharedInbox, or if for some reasons the same
  105. # Activity is sent twice (maybe the job queue didn't remove it?).
  106. # NOTE This collection (recipient_inbox) is fictitious because we
  107. # cannot have access to the remote Actor INBOX.
  108. if forgefed_graph.collection_contains(recipient_inbox, activity['id']):
  109. log.warning(
  110. 'Activity ' + activity['id'] + ' was already delivered to '
  111. 'the INBOX ' + recipient_inbox + '. Not sending again.')
  112. return
  113. # Check if the given RSA key belongs to the actor
  114. if not forgefed_graph.is_valid_key(actor_uri, key_uri):
  115. log.critical('Invalid key. Refuting to sign HTTP request.')
  116. return
  117. # Retrieve the private key of the local Actor for signing the HTTP request
  118. private_key = forgefed_graph.value(
  119. subject = rdflib.URIRef(key_uri),
  120. predicate = graph.SEC.privateKeyPem)
  121. # This will add a "Signature:" header to the HTTP request.
  122. # Keys are stored as UTF-8 PEM files in the database, but
  123. # HTTPSignatureHeaderAuth requires the actual byte string without any
  124. # encoding applied to it, hence why we encode() UTF-8 back to bytes.
  125. auth_method = requests_http_signature.HTTPSignatureHeaderAuth(
  126. key = private_key.encode('UTF-8'),
  127. key_id = key_uri,
  128. algorithm = 'rsa-sha256',
  129. headers = [ '(request-target)', 'host', 'date', 'digest' ])
  130. # passphrase = None,
  131. # expires_in = None)
  132. log.debug('Posting Activity ' + activity['id'] + ' to ' + recipient_inbox)
  133. log.debug(json.dumps(activity, indent=4, sort_keys=True))
  134. # Finally! Send out the Activity to the INBOX of the remote Actor
  135. response = requests.post(
  136. recipient_inbox,
  137. headers=activitypub.REQUEST_HEADERS,
  138. data=json.dumps(activity).encode('UTF-8'),
  139. auth=auth_method)
  140. log.debug('Activity ' + activity['id'] + ' POST return code: '
  141. + str(response.status_code))
  142. # Some software could return "200 OK", other "202 Accepted" upon
  143. # receiving the Activity.
  144. assert 200 <= response.status_code < 300
  145. log.debug('Activity posted without errors.')
  146. # Save in the database that this Activity was delivered successfully to
  147. # the remote Actor. This way it will not be resent twice (just in case).
  148. forgefed_graph.add_collection_item(recipient_inbox, activity['id'])
  149. @broker.task
  150. def validate(actor_serialized, activity):
  151. """
  152. This task is scheduled after receiving a new activity from a remote actor.
  153. It's called from the actor_receive() view in app.py.
  154. :param actor_serialized: Representation of an Actor as created by
  155. __repr__() (see model.py). Actors need to be serialized because they're
  156. passed from the Flask view to the celery queue for processing. By
  157. default celery serializes objects with JSON. We could use something
  158. else such as pickle, but it would not really be portable in case
  159. somebody decides to process the queue with some non-python script. So
  160. we use JSON to serialize enough properties (class type and actor id) to
  161. reconstruct the actor from the task (from this function).
  162. :type activity: dict
  163. :param activity: The incoming activity document.
  164. :type activity: dict
  165. """
  166. if not actor_serialized:
  167. log.info('Missing Actor. Ignoring task.')
  168. return
  169. if not activity:
  170. log.info('Missing Activity. Ignoring task.')
  171. return
  172. log.debug('Actor ' + actor_serialized['actor_uri'] + ' has received a new Activity '
  173. +'with id ' + activity['id'])
  174. log.debug(json.dumps(activity, indent=4, sort_keys=True))
  175. with database_session() as (pagure_db, forgefed_graph):
  176. # Recreate the actor class from its serialized name
  177. actor_class = getattr(model, actor_serialized['class'], None)
  178. assert actor_class, 'Cannot reconstruct Actor.'
  179. # Recreate the actor object
  180. actor = pagure_db.query(actor_class) \
  181. .filter(actor_class.id == actor_serialized['id']) \
  182. .one_or_none()
  183. if not actor:
  184. log.debug('Actor doesn\'t exist. Ignoring incoming Activity.')
  185. return
  186. # Check if this Activity was already delivered to this Actor. If it was,
  187. # we don't do anything since it was already processed in the past. New
  188. # activities should not have the same ID of an old one.
  189. if forgefed_graph.collection_contains(actor.inbox_uri, activity['id']):
  190. return
  191. # Let's save a copy of the Activity in the database
  192. if (rdflib.URIRef(activity['id']), None, None) not in forgefed_graph:
  193. forgefed_graph.parse(data=json.dumps(activity), format='json-ld')
  194. # Save the Activity to the Actor INBOX
  195. forgefed_graph.add_collection_item(actor.inbox_uri, activity['id'])
  196. #######################################################################
  197. # Now, we could stop here but Pagure is not just a server it also works
  198. # as a user client with an interface that allows user interactions.
  199. # A user could in theory fetch his ActivityPub INBOX manually from the
  200. # Pagure web interface, but that would be rather inconvenient. Instead,
  201. # we automatically react and update the Pagure database as soon as we
  202. # receive a new Activity.
  203. # The INBOX can also be read by other clients (eg. desktop apps).
  204. #######################################################################
  205. # Because JSON-LD can represent the same graph in several different ways
  206. # we normalize the JSON object before passing it to the actor for
  207. # processing. This simplifies working with the activity.
  208. # Normalization consist of passing only the Activity node to the actor,
  209. # without any of the linked nodes.
  210. activity = forgefed_graph.get_json_node(activity['id'])
  211. actor.handle_incoming_activity(activity)