UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead

xeon Mailinglist
I am trying to run python multiprocessing to execute a remote function that I call with the help of celery. My program runs  ok if I just run one process. But when I run more than one process, I get the error below. Is it because of incompatible versions in python plugins and rabbitmq?

Process Process-1:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "testHello.py", line 16, in test_hello_aux
    print output.get()
  File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 169, in get
    no_ack=no_ack,
  File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 155, in wait_for
    on_interval=on_interval)
  File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 229, in consume
    no_ack=no_ack, accept=self.accept) as consumer:
  File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 359, in __init__
    self.revive(self.channel)
  File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 371, in revive
    self.declare()
  File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 381, in declare
    queue.declare()
  File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 505, in declare
    self.queue_declare(nowait, passive=False)
  File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 531, in queue_declare
    nowait=nowait)
  File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 1254, in queue_declare
    self._send_method((50, 10), args)
  File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 56, in _send_method
    self.channel_id, method_sig, args, content,
  File "/usr/local/lib/python2.7/dist-packages/amqp/method_framing.py", line 221, in write_method
    write_frame(1, channel, payload)
  File "/usr/local/lib/python2.7/dist-packages/amqp/transport.py", line 177, in write_frame
    frame_type, channel, size, payload, 0xce,
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 32] Broken pipe
Process Process-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "testHello.py", line 16, in test_hello_aux
    print output.get()
  File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 169, in get
    no_ack=no_ack,
  File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 155, in wait_for
    on_interval=on_interval)
  File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 229, in consume
    no_ack=no_ack, accept=self.accept) as consumer:
  File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 359, in __init__
Process Process-3:
    self.revive(self.channel)
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 371, in revive
    self.declare()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
  File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 381, in declare
    queue.declare()
  File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 504, in declare
    self.run()
    self.exchange.declare(nowait)
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
  File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 166, in declare
    self._target(*self._args, **self._kwargs)
    nowait=nowait, passive=passive,
  File "testHello.py", line 16, in test_hello_aux
  File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 613, in exchange_declare
    print output.get()
  File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 169, in get
    no_ack=no_ack,
  File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 155, in wait_for
    on_interval=on_interval)
  File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 229, in consume
    no_ack=no_ack, accept=self.accept) as consumer:
  File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 359, in __init__
    self._send_method((40, 10), args)
  File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 56, in _send_method
    self.channel_id, method_sig, args, content,
  File "/usr/local/lib/python2.7/dist-packages/amqp/method_framing.py", line 221, in write_method
    self.revive(self.channel)
  File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 371, in revive
    self.declare()
  File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 381, in declare
    write_frame(1, channel, payload)
    queue.declare()
  File "/usr/local/lib/python2.7/dist-packages/amqp/transport.py", line 177, in write_frame
  File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 504, in declare
    frame_type, channel, size, payload, 0xce,
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    self.exchange.declare(nowait)
  File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 166, in declare
    nowait=nowait, passive=passive,
  File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 620, in exchange_declare
    return getattr(self._sock,name)(*args)
error: [Errno 32] Broken pipe
    (40, 11),  # Channel.exchange_declare_ok
  File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 67, in wait
    self.channel_id, allowed_methods)
  File "/usr/local/lib/python2.7/dist-packages/amqp/connection.py", line 237, in _wait_method
    self.method_reader.read_method()
  File "/usr/local/lib/python2.7/dist-packages/amqp/method_framing.py", line 189, in read_method
    raise m
error: [Errno 104] Connection reset by peer
Process Process-4:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "testHello.py", line 16, in test_hello_aux
    print output.get()
  File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 169, in get
    no_ack=no_ack,
  File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 155, in wait_for
    on_interval=on_interval)
  File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 229, in consume
    no_ack=no_ack, accept=self.accept) as consumer:
  File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 359, in __init__
    self.revive(self.channel)
  File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 371, in revive
    self.declare()
  File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 381, in declare
    queue.declare()
  File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 505, in declare
    self.queue_declare(nowait, passive=False)
  File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 531, in queue_declare
    nowait=nowait)
  File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 1258, in queue_declare
    (50, 11),  # Channel.queue_declare_ok
  File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 67, in wait
    self.channel_id, allowed_methods)
  File "/usr/local/lib/python2.7/dist-packages/amqp/connection.py", line 270, in _wait_method
    self.wait()
  File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 69, in wait
    return self.dispatch_method(method_sig, args, content)
  File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 87, in dispatch_method
    return amqp_method(self, args)
  File "/usr/local/lib/python2.7/dist-packages/amqp/connection.py", line 526, in _close
    (class_id, method_id), ConnectionError)
UnexpectedFrame: Basic.publish: (505) UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead


_______________________________________________
rabbitmq-discuss mailing list
[hidden email]
https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Reply | Threaded
Open this post in threaded view
|

Re: UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead

Michael Klishin-2
On 22 April 2014 at 01:41:11, xeon Mailinglist ([hidden email]) wrote:
> > Is it because of incompatible versions in python plugins and  
> rabbitmq?

The issue is with concurrent publishing on the same channel. Channels should
not be shared between threads/etc.

For every publish you do, your client sends at least 2 frames on the wire:

[basic.publish method][content header][payload]*

RabbitMQ expects frames to arrive in this order. With concurrent publishing on the same
channel, frames can get interleaved like so:

[basic.publish method 1][content header 1][basic.publish method 2][content header 2][payload 1][payload 2]

or get interleaved with other protocol methods, e.g. basic.consume in your case.

I’m not familiar with Celery enough to tell how to resolve this. Using a connection
or at least channel per multiprocessing “thread” should help. 
--  
MK  

Software Engineer, Pivotal/RabbitMQ
_______________________________________________
rabbitmq-discuss mailing list
[hidden email]
https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss