Python – Tornado and Celery integration
My new project involves both tornado (as frontend, using tornado-json) and celery (as backend). After googling too much I found two different ways to integrate celery with tornado: using an IOLoop to check for task completion (stackoverflow) using a unix socket (Cyberfart’s Blog) I prefer the IOLoop Here is a simple email sender example
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
import datetime from tornado import gen from tornado_json.requesthandlers import APIHandler from tornado_json import schema from ratlery.tasks import send_text_email from tornado_json.gen import coroutine from functools import partial from tornado.ioloop import IOLoop class CeleryResultMixin(object): """ Adds a callback function which could wait for the result asynchronously """ def wait_for_result(self, task, callback): if task.ready(): callback(task.result) else: IOLoop.instance().add_timeout(datetime.timedelta(0.00001), partial(self.wait_for_result, task, callback)) class SendTextEmailHandler(APIHandler, CeleryResultMixin): @schema.validate( input_schema={ "type": "object", "properties": { "from": {"type": "string"}, "to": {"type": "string"}, "cc": {"type": "string"}, "bcc": {"type": "string"}, "subject": {"type": "string"}, "body": {"type": "string"}, }, "required": ["from", "to", "subject", "body"] }, input_example={ "from": "somebody@somedomain.com", "to": "receipt01@somedomain.com,receipt02@somedomain.com", "cc": "receipt03@somedomain.com,receipt04@somedomain.com", "bcc": "receipt05@somedomain.com,receipt06@somedomain.com", "subject": "This is the email subject", "body": "And this is the body" }, output_schema={ "type": "object", "properties": { "status_code": {"type": "integer"}, "status_msg": {"type": "string"} } }, output_example={ "status_code": 0, "status_msg": "server refuse connections" }, ) @coroutine def post(self): """Send text email""" task = send_text_email.delay(self.body, 'localhost', 25) status_code, status_msg = yield gen.Task(self.wait_for_result, task) return {'status_code': status_code, 'status_msg': status_msg} |
[…]