123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- """
- (c) 2020 - Copyright ...
-
- Authors:
- zPlus <zplus@peers.community>
- """
- from .. import graph
- from .. import settings
- import celery
- import logging
- import os
- import pagure.config
- import pagure.lib.model
- import pagure.lib.model_base
- log = celery.utils.log.get_task_logger(__name__)
- log.setLevel(settings.LOG_LEVEL)
- broker_url = pagure.config.config.get('BROKER_URL', None)
- if not broker_url:
- log.critical('Broker not defined.')
- raise Exception('Broker not defined.')
- log.info('Using broker: ' + broker_url)
- broker = celery.Celery('forgefed', broker=broker_url, backend=broker_url)
- broker.conf.update({
- **pagure.config.config["CELERY_CONFIG"],
-
-
-
-
-
-
- 'task_annotations': {
- '*': { 'queue': 'forgefed',
- 'autoretry_for': [ Exception ],
- 'retry_kwargs': { 'max_retries': 20 },
- 'retry_backoff': True,
- 'retry_backoff_max': 60*60*24,
- 'retry_jitter': True,
-
- }}
- })
- class database_session:
- """
- Because tasks are used from the independent Celery workers, these functions
- do not have access to the database sessions that are automatically created
- at the beginning of HTTP requests within Flask. So the job of this
- Context Manager is to create a new database session that tasks can use.
- This class can be used like this:
-
- with database_session() as (pagure_db, forgefed_graph):
- pagure_db.query()...
- forgefed_graph.query()...
- ...
-
- NOTE An alternative way of obtaining the same behavior would be with a
- decorator that contains a try...except...finally block.
- """
-
- def __init__(self):
- self.pagure = None
- self.forgefed = None
-
- def __enter__(self):
- self.pagure = pagure.lib.model_base.create_session(pagure.config.config['DB_URL'])
- self.forgefed = graph.Graph()
-
- return (self.pagure, self.forgefed)
-
- def __exit__(self, exception_type, exception_value, exception_traceback):
-
-
- if exception_type:
- self.pagure.rollback()
- self.forgefed.rollback()
- else:
- self.pagure.commit()
- self.forgefed.commit()
-
-
- self.pagure.remove()
- self.forgefed.disconnect()
-
-
-
-
-
-
- from . import activity, notification, person, project, repository
|