123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266 |
- import celery
- import json
- import rdflib
- import requests
- import requests_http_signature
- from .. import activitypub
- from .. import graph
- from .. import model
- from .. import settings
- from . import broker_url
- from . import broker
- from . import database_session
- log = celery.utils.log.get_task_logger(__name__)
- log.setLevel(settings.LOG_LEVEL)
- @broker.task
- def post(activity, recipient_uri, key_uri, depth):
- """
- POST an Activity to a remote Actor. If the remote object is a Collection,
- a new task is scheduled for every item of that collection.
- If an error occurs during the HTTP request, the task is automatically
- rescheduled a number of times depending on the Celery configuration.
-
- IMPORTANT: This task is only responsible for the actual HTTP POSTing of the
- Activity to the remote Actor's INBOX. This task will *not* attempt to
- perform any sort of validation of the Activity being sent. In other
- words: whoever is using this task should validate its own Activity
- before sending it.
-
- :param activity: The Activity to be sent.
- :type activity: dict
-
- :param recipient_uri: The URI of a remote object to send the Activity to.
-
- :param key_uri: Which RSA key to use for signing the HTTP request.
-
- :param depth: Number of indirections to follow if recipient_uri is a
- Collection. See the settings file for more info about this option.
- """
-
- with database_session() as (pagure_db, forgefed_graph):
-
- actor_uri = activity['actor']
-
- # Make sure the Actor is not sending the Activity to itself.
- # https://www.w3.org/TR/activitypub/#delivery
- if actor_uri == recipient_uri:
- log.debug('Activity actor and recipient are the same. '
- 'Refuting to send.')
- return
-
- # If the Activity is addressed to as:Public, the Activity is not POSTed
- # to anyone.
- # https://www.w3.org/TR/activitypub/#public-addressing
- if recipient_uri == 'Public':
- log.debug('Not sending to as:Public.')
- return
-
- # Retrieve remote object
- remote_object = activitypub.fetch(recipient_uri)
-
- # Make sure we got an object, or abort task
- assert remote_object, 'Could not fetch remote actor.'
-
- # Select the Actor INBOX. Use sharedInbox if there is one.
- # https://www.w3.org/TR/activitypub/#sharedInbox
- if 'endpoints' in remote_object and 'sharedInbox' in remote_object['endpoints']:
- recipient_inbox = remote_object['endpoints']['sharedInbox']
- elif 'inbox' in remote_object:
- recipient_inbox = remote_object['inbox']
- else:
- recipient_inbox = None
-
- # If the remote object does not have an INBOX, we check if it's a
- # Collection, in which case we retrieve all its items.
- if not recipient_inbox:
- log.debug('Recipient is not an Actor. Checking if it\'s a '
- 'collection.')
-
- # Do not follow any more Collections.
- if depth < 1:
- log.debug('Max number of indirections reached. I will not '
- 'expand any more collections.')
- return
-
- if any(collection == remote_object['type'] for collection in
- [ 'Collection', 'OrderedCollection',
- 'CollectionPage', 'OrderedCollectionPage' ]):
-
- items = []
- page = []
-
- if 'items' in remote_object:
- if isinstance(remote_object['items'], str):
- items.append(remote_object['items'])
- else:
- items.extend(remote_object['items'])
-
- if 'orderedItems' in remote_object:
- if isinstance(remote_object['orderedItems'], str):
- items.append(remote_object['orderedItems'])
- else:
- items.extend(remote_object['orderedItems'])
-
- if 'first' in remote_object:
- page.append(remote_object['first'])
-
- if 'next' in remote_object:
- page.append(remote_object['next'])
-
- # Schedule a new delivery for every object found in the collection
- for recipient_uri in items:
- post.delay(activity, recipient_uri, key_uri, depth - 1)
-
- # TODO If a page "next" links to a previous page (which should
- # not happen), this will not detect the loop.
- for recipient_uri in page:
- post.delay(activity, recipient_uri, key_uri, depth)
-
- if len(items) == 0 and len(page) == 0:
- log.debug('Collection found, but it contains no items. '
- + 'Activity will not be sent.')
-
- # Since this object does *not* have an INBOX, we stop here for this
- # task.
- return
-
- # Check if this Activity was already sent (successfully) to this INBOX.
- # If it was, we do not resend the same Activity twice. This situation
- # could happen with a sharedInbox, or if for some reasons the same
- # Activity is sent twice (maybe the job queue didn't remove it?).
- # NOTE This collection (recipient_inbox) is fictitious because we
- # cannot have access to the remote Actor INBOX.
- if forgefed_graph.collection_contains(recipient_inbox, activity['id']):
- log.warning(
- 'Activity ' + activity['id'] + ' was already delivered to '
- 'the INBOX ' + recipient_inbox + '. Not sending again.')
-
- return
-
- # Check if the given RSA key belongs to the actor
- if not forgefed_graph.is_valid_key(actor_uri, key_uri):
- log.critical('Invalid key. Refuting to sign HTTP request.')
- return
-
- # Retrieve the private key of the local Actor for signing the HTTP request
- private_key = forgefed_graph.value(
- subject = rdflib.URIRef(key_uri),
- predicate = graph.SEC.privateKeyPem)
-
- # This will add a "Signature:" header to the HTTP request.
- # Keys are stored as UTF-8 PEM files in the database, but
- # HTTPSignatureHeaderAuth requires the actual byte string without any
- # encoding applied to it, hence why we encode() UTF-8 back to bytes.
- auth_method = requests_http_signature.HTTPSignatureHeaderAuth(
- key = private_key.encode('UTF-8'),
- key_id = key_uri,
- algorithm = 'rsa-sha256',
- headers = [ '(request-target)', 'host', 'date', 'digest' ])
- # passphrase = None,
- # expires_in = None)
-
- log.debug('Posting Activity ' + activity['id'] + ' to ' + recipient_inbox)
- log.debug(json.dumps(activity, indent=4, sort_keys=True))
-
- # Finally! Send out the Activity to the INBOX of the remote Actor
- response = requests.post(
- recipient_inbox,
- headers=activitypub.REQUEST_HEADERS,
- data=json.dumps(activity).encode('UTF-8'),
- auth=auth_method)
-
- log.debug('Activity ' + activity['id'] + ' POST return code: '
- + str(response.status_code))
-
- # Some software could return "200 OK", other "202 Accepted" upon
- # receiving the Activity.
- assert 200 <= response.status_code < 300
-
- log.debug('Activity posted without errors.')
-
- # Save in the database that this Activity was delivered successfully to
- # the remote Actor. This way it will not be resent twice (just in case).
- forgefed_graph.add_collection_item(recipient_inbox, activity['id'])
- @broker.task
- def validate(actor_serialized, activity):
- """
- This task is scheduled after receiving a new activity from a remote actor.
- It's called from the actor_receive() view in app.py.
-
- :param actor_serialized: Representation of an Actor as created by
- __repr__() (see model.py). Actors need to be serialized because they're
- passed from the Flask view to the celery queue for processing. By
- default celery serializes objects with JSON. We could use something
- else such as pickle, but it would not really be portable in case
- somebody decides to process the queue with some non-python script. So
- we use JSON to serialize enough properties (class type and actor id) to
- reconstruct the actor from the task (from this function).
- :type activity: dict
-
- :param activity: The incoming activity document.
- :type activity: dict
- """
-
- if not actor_serialized:
- log.info('Missing Actor. Ignoring task.')
- return
-
- if not activity:
- log.info('Missing Activity. Ignoring task.')
- return
-
- log.debug('Actor ' + actor_serialized['actor_uri'] + ' has received a new Activity '
- +'with id ' + activity['id'])
- log.debug(json.dumps(activity, indent=4, sort_keys=True))
-
- with database_session() as (pagure_db, forgefed_graph):
-
- # Recreate the actor class from its serialized name
- actor_class = getattr(model, actor_serialized['class'], None)
- assert actor_class, 'Cannot reconstruct Actor.'
-
- # Recreate the actor object
- actor = pagure_db.query(actor_class) \
- .filter(actor_class.id == actor_serialized['id']) \
- .one_or_none()
-
- if not actor:
- log.debug('Actor doesn\'t exist. Ignoring incoming Activity.')
- return
-
- # Check if this Activity was already delivered to this Actor. If it was,
- # we don't do anything since it was already processed in the past. New
- # activities should not have the same ID of an old one.
- if forgefed_graph.collection_contains(actor.inbox_uri, activity['id']):
- return
-
- # Let's save a copy of the Activity in the database
- if (rdflib.URIRef(activity['id']), None, None) not in forgefed_graph:
- forgefed_graph.parse(data=json.dumps(activity), format='json-ld')
-
- # Save the Activity to the Actor INBOX
- forgefed_graph.add_collection_item(actor.inbox_uri, activity['id'])
-
- #######################################################################
- # Now, we could stop here but Pagure is not just a server it also works
- # as a user client with an interface that allows user interactions.
- # A user could in theory fetch his ActivityPub INBOX manually from the
- # Pagure web interface, but that would be rather inconvenient. Instead,
- # we automatically react and update the Pagure database as soon as we
- # receive a new Activity.
- # The INBOX can also be read by other clients (eg. desktop apps).
- #######################################################################
-
- # Because JSON-LD can represent the same graph in several different ways
- # we normalize the JSON object before passing it to the actor for
- # processing. This simplifies working with the activity.
- # Normalization consist of passing only the Activity node to the actor,
- # without any of the linked nodes.
- activity = forgefed_graph.get_json_node(activity['id'])
-
- actor.handle_incoming_activity(activity)
|