I am trying to run python multiprocessing to execute a remote function that I call with the help of celery. My program runs ok if I just run one process. But when I run more than one process, I get the error below. Is it because of incompatible versions in python plugins and rabbitmq? Process Process-1: Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self._target(*self._args, **self._kwargs) File "testHello.py", line 16, in test_hello_aux print output.get() File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 169, in get no_ack=no_ack, File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 155, in wait_for on_interval=on_interval) File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 229, in consume no_ack=no_ack, accept=self.accept) as consumer: File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 359, in __init__ self.revive(self.channel) File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 371, in revive self.declare() File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 381, in declare queue.declare() File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 505, in declare self.queue_declare(nowait, passive=False) File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 531, in queue_declare nowait=nowait) File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 1254, in queue_declare self._send_method((50, 10), args) File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 56, in _send_method self.channel_id, method_sig, args, content, File "/usr/local/lib/python2.7/dist-packages/amqp/method_framing.py", line 221, in write_method write_frame(1, channel, payload) File "/usr/local/lib/python2.7/dist-packages/amqp/transport.py", line 177, in write_frame frame_type, channel, size, payload, 0xce, File "/usr/lib/python2.7/socket.py", line 224, in meth return getattr(self._sock,name)(*args) error: [Errno 32] Broken pipe Process Process-2: Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self._target(*self._args, **self._kwargs) File "testHello.py", line 16, in test_hello_aux print output.get() File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 169, in get no_ack=no_ack, File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 155, in wait_for on_interval=on_interval) File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 229, in consume no_ack=no_ack, accept=self.accept) as consumer: File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 359, in __init__ Process Process-3: self.revive(self.channel) Traceback (most recent call last): File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 371, in revive self.declare() File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 381, in declare queue.declare() File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 504, in declare self.run() self.exchange.declare(nowait) File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 166, in declare self._target(*self._args, **self._kwargs) nowait=nowait, passive=passive, File "testHello.py", line 16, in test_hello_aux File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 613, in exchange_declare print output.get() File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 169, in get no_ack=no_ack, File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 155, in wait_for on_interval=on_interval) File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 229, in consume no_ack=no_ack, accept=self.accept) as consumer: File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 359, in __init__ self._send_method((40, 10), args) File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 56, in _send_method self.channel_id, method_sig, args, content, File "/usr/local/lib/python2.7/dist-packages/amqp/method_framing.py", line 221, in write_method self.revive(self.channel) File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 371, in revive self.declare() File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 381, in declare write_frame(1, channel, payload) queue.declare() File "/usr/local/lib/python2.7/dist-packages/amqp/transport.py", line 177, in write_frame File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 504, in declare frame_type, channel, size, payload, 0xce, File "/usr/lib/python2.7/socket.py", line 224, in meth self.exchange.declare(nowait) File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 166, in declare nowait=nowait, passive=passive, File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 620, in exchange_declare return getattr(self._sock,name)(*args) error: [Errno 32] Broken pipe (40, 11), # Channel.exchange_declare_ok File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 67, in wait self.channel_id, allowed_methods) File "/usr/local/lib/python2.7/dist-packages/amqp/connection.py", line 237, in _wait_method self.method_reader.read_method() File "/usr/local/lib/python2.7/dist-packages/amqp/method_framing.py", line 189, in read_method raise m error: [Errno 104] Connection reset by peer Process Process-4: Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self._target(*self._args, **self._kwargs) File "testHello.py", line 16, in test_hello_aux print output.get() File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 169, in get no_ack=no_ack, File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 155, in wait_for on_interval=on_interval) File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 229, in consume no_ack=no_ack, accept=self.accept) as consumer: File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 359, in __init__ self.revive(self.channel) File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 371, in revive self.declare() File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 381, in declare queue.declare() File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 505, in declare self.queue_declare(nowait, passive=False) File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 531, in queue_declare nowait=nowait) File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 1258, in queue_declare (50, 11), # Channel.queue_declare_ok File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 67, in wait self.channel_id, allowed_methods) File "/usr/local/lib/python2.7/dist-packages/amqp/connection.py", line 270, in _wait_method self.wait() File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 69, in wait return self.dispatch_method(method_sig, args, content) File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 87, in dispatch_method return amqp_method(self, args) File "/usr/local/lib/python2.7/dist-packages/amqp/connection.py", line 526, in _close (class_id, method_id), ConnectionError) UnexpectedFrame: Basic.publish: (505) UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead _______________________________________________ rabbitmq-discuss mailing list [hidden email] https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss |
On 22 April 2014 at 01:41:11, xeon Mailinglist ([hidden email]) wrote:
> > Is it because of incompatible versions in python plugins and > rabbitmq? The issue is with concurrent publishing on the same channel. Channels should not be shared between threads/etc. For every publish you do, your client sends at least 2 frames on the wire: [basic.publish method][content header][payload]* RabbitMQ expects frames to arrive in this order. With concurrent publishing on the same channel, frames can get interleaved like so: [basic.publish method 1][content header 1][basic.publish method 2][content header 2][payload 1][payload 2] or get interleaved with other protocol methods, e.g. basic.consume in your case. I’m not familiar with Celery enough to tell how to resolve this. Using a connection or at least channel per multiprocessing “thread” should help. -- MK Software Engineer, Pivotal/RabbitMQ _______________________________________________ rabbitmq-discuss mailing list [hidden email] https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss |
Free forum by Nabble | Edit this page |