Using the Qpid Proton C++ library to understand AMQP

Antenna logo

Apache Qpid Proton is a library in C and C++ for carrying out messaging operations using AMQP (there's also a very similar Python library). Very often these messaging operations will be between some kind of message broker (e.g., Apache Artemis) and its clients, but AMQP can be used for peer-to-peer communication, as will be demonstrated in this article.

AMQP is not a trivially-straightforward protocol and, for better or worse, developers sometimes need to understand the low-level details to make effective use of it. This understanding is, perhaps, less pressing for Java developers, who may just use a JMS wrapper around AMQP, like Qpid-JMS. But even in that case, it helps to understand the protocol, if only to understand the error messages that the client runtime produces. For example, it's difficult to know what to do about the notorious "remote did not respond to a drain request in time" message, if you don't know what a "drain request is"; and it's difficult to understand a drain request unless you understand the flow performative and link credit.

It's not really my intention to explain any of those things in this article; my much more modest goal is to demonstrate that a good way to investigate how AMQP works is to use simple Qpid Proton applications, along with the built-in packet trace that Proton provides. This approach can give a useful insight into what goes on at the wire protocol level, and there are plenty of simple Proton applications to experiment with.

To this end, I am maintaining a small collection of simple, fully-documented Proton C+ examples. The code is available from my GitHub repository as usual. Instructions on building and running the code are in the README file in the repository -- you'll need the Proton development headers and the Proton library and its dependencies. It can be illustrative to run the examples whilst capturing raw network data, using a tool like WireShark or tcpdump. However, I won't be showing raw TCP data in this article.

I'm not going to explain here how to obtain and install the Proton C++ library -- I did that in an earlier article. I'm assuming here that you have the library and development headers available, and that you have some familiarity with C++ programming. I'm also using Linux as my demonstration platform, although Proton is available for other platforms.

The code examples

For demonstration purposes, I'll show a simple AMQP client and server. In the repository I referenced earlier, the source code is server.cpp for the server, and send_lots.cpp for the client. The server code just responds to incoming requests to create a messaging session on an address called foo. The client sends a number of messages to that address.

As supplied, the client and server use port 5672. That's the default for AMQP, so you may need to ensure that you aren't running any other AMQP-aware software on the same host as the server process. Also by default, the client connects to the server on localhost, but that's easy to change in the source.

The Proton library primarily works through callbacks. That is, the application provides a class that implements certain methods, and the library calls those methods at the appropriate stages of the AMQP conversation. In my code examples, most of these methods -- even when they are not relevant to the specific application -- generate some log output, to make it easier to trace what is going on.

Running the client and server

Assuming you've built the samples (which, in most cases, should amount to running make) you can run both the client and server with AMQP tracing enabled, like this:

$ PN_TRACE_FRM=1 ./bin/server

And in a different terminal:

$ PN_TRACE_FRM=1 ./bin/send_lots

Examining the output

Here is the sample output from send_lots -- I've removed some duplicate and irrelevant lines, to make it easier to follow.

creating sender
created sender
[0x11fd500]: SASL:FRAME:  -> SASL
[0x11fd500]: SASL:FRAME:  <- SASL
[0x11fd500]: AMQP:FRAME:0 <- @sasl-mechanisms(64) [sasl-server-mechanisms=@[:GSS-SPNEGO, :GSSAPI, :ANONYMOUS]]
[0x11fd500]: AMQP:FRAME:0 -> @sasl-init(65) [mechanism=:ANONYMOUS, initial-response=b"anonymous@fedora"]
[0x11fd500]: AMQP:FRAME:0 <- @sasl-outcome(68) [code=0x0]
[0x11fd500]: AMQP:FRAME:  -> AMQP
[0x11fd500]: AMQP:FRAME:0 -> @open(16) [container-id="4f021f4e-e4a8-4a5d-a637-14219a74bca0", hostname="127.0.0.1", channel-max=0x7fff]
[0x11fd500]: AMQP:FRAME:0 -> @begin(17) [next-outgoing-id=0x0, incoming-window=0x7fffffff, outgoing-window=0x7fffffff]
[0x11fd500]: AMQP:FRAME:0 -> @attach(18) [name="00a31a02-cb86-48d9-b20d-437ddc6289ef", handle=0x0, role=false, snd-settle-mode=0x2, rcv-settle-mode=0x0, source=@source(40) [durable=0x0, timeout=0x0, dynamic=false], target=@target(41) [address="foo", durable=0x0, timeout=0x0, dynamic=false], initial-delivery-count=0x0, max-message-size=0x0]
[0x11fd500]: AMQP:FRAME:  <- AMQP
[0x11fd500]: AMQP:FRAME:0 <- @open(16) [container-id="38ebe19c-cb6f-4f4c-8cb6-19a3739ccd30", channel-max=0x7fff]
[0x11fd500]: AMQP:FRAME:0 <- @begin(17) [remote-channel=0x0, next-outgoing-id=0x0, incoming-window=0x7fffffff, outgoing-window=0x7fffffff]
[0x11fd500]: AMQP:FRAME:0 <- @attach(18) [name="00a31a02-cb86-48d9-b20d-437ddc6289ef", handle=0x0, role=true, snd-settle-mode=0x2, rcv-settle-mode=0x0, target=@target(41) [address="foo", durable=0x0, timeout=0x0, dynamic=false], initial-delivery-count=0x0, max-message-size=0x0]
[0x11fd500]: AMQP:FRAME:0 <- @flow(19) [next-incoming-id=0x0, incoming-window=0x7fffffff, next-outgoing-id=0x0, outgoing-window=0x7fffffff, handle=0x0, delivery-count=0x0, link-credit=0xa, drain=false]
on_transport_open
on_connection_open
on_session_open
on_sender_open
on_sendable
my link credit is now 10
Sending message
sent messages = 1
Sending message
sent messages = 2
... (more) ...
Sending message
sent messages = 10
[0x11fd500]: AMQP:FRAME:0 -> @transfer(20) [handle=0x0, delivery-id=0x0, delivery-tag=b"\x01\x00\x00\x00\x00\x00\x00\x00", message-format=0x0] (32) \x00SpE\x00Ss\xc0\x06\x01\xa1\x03foo\x00Sw\xa1\x0cHello, world
[0x11fd500]: AMQP:FRAME:0 -> @transfer(20) [handle=0x0, delivery-id=0x1, delivery-tag=b"\x02\x00\x00\x00\x00\x00\x00\x00", message-format=0x0] (32) \x00SpE\x00Ss\xc0\x06\x01\xa1\x03foo\x00Sw\xa1\x0cHello, world
... (more) ...
[0x11fd500]: AMQP:FRAME:0 -> @transfer(20) [handle=0x0, delivery-id=0x9, delivery-tag=b"\x0a\x00\x00\x00\x00\x00\x00\x00", message-format=0x0] (32) \x00SpE\x00Ss\xc0\x06\x01\xa1\x03foo\x00Sw\xa1\x0cHello, world
[0x11fd500]: AMQP:FRAME:0 <- @flow(19) [next-incoming-id=0xa, incoming-window=0x7fffffff, next-outgoing-id=0x0, outgoing-window=0x7fffffff, handle=0x0, delivery-count=0xa, link-credit=0xa, drain=false]
[0x11fd500]: AMQP:FRAME:0 <- @disposition(21) [role=true, first=0x0, last=0x9, settled=true, state=@accepted(36) []]
on_sendable
my link credit is now 10
on_tracker_accept
Closing connection
on_tracker_settle
on_tracker_accept
on_tracker_settle
on_tracker_accept
on_tracker_settle
... (more) ...
on_tracker_accept
on_tracker_settle
[0x11fd500]: AMQP:FRAME:0 -> @close(24) []
[0x11fd500]:   IO:FRAME:  -> EOS
[0x11fd500]: AMQP:FRAME:0 <- @close(24) []
[0x11fd500]:   IO:FRAME:  <- EOS
on_connection_close

I'm not going to explain the AMQP protocol in this article, but a few interesting features are worth some attention.

• The conversation starts with SASL authentication, even though the server does not require credentials. This is, therefore, 'anonymous' authentication, as shown by the token mechanism=:ANONYMOUS.

• The client (sender) then sends the @open, @begin, and @link frames. The tokens 'open', etc., are called 'performatives' in AMQP jargon. open initiates the communication, and exchanges capabilities and limits. begin opens a session -- a 'session' in AMQP terminology is a pair of unidirectional communication channels, one inbound and one outbound. TCP is, of course, full-duplex; but AMQP could conceivably be used with other communications infrastructure. Note that the receiver responds with its own @open, @begin, and @link, but the sender does not wait for each response before sending its next frame. This willingness to proceed without specific confirmation is called 'pipelining' in AMQP.

• The sender can not proceed until the receiver has sent a @flow packet, setting out the amount of link credit it will allow. Link credit is usually specified in terms of messages, so the token link-credit=0xa means "you can send ten messages without checking with me again" (hexadecimal 0x0A = 10).

• As the sender proceeds to send messages, the Proton library generates a bunch of @transfer frames. These contain the actual message data. Note that the sender does not wait for any confirmation -- in fact, all ten of the message payloads in this example will probably be packed into a single TCP packet. So when does the sender get an acknowledgement from the receiver?

• The receiver sends a bunch of @disposition frames, indicating what it has done with the messages. state=@accepted represents a positive acknowledgement -- the message was received and processed. Of course, what 'processed' amounts to depends on the application.

• The sender and receiver both send @close frames, to indicate that they are finished with the current connection. They might send @detach and @end as well, but the specification does not require this.

If you're programming with Proton, it's helpful to note how the way that Proton invokes the application's callback handlers (like on_sendable()) is interleaved with the actual AMQP frames. In particular, these callbacks are not necessarily called exactly when the corresponding AMQP action takes place. Notice, for example, how a bunch of "accepted" disposition frames are received before the application's on_tracker_accept() method gets called for the first time.

Closing remarks

The AMQP specification does not make for easy reading and, if you're using high-level AMQP libraries like Qpid-JMS the details might not be important. For low-level programming -- particularly when programming Proton server applications -- knowing the fine details really matters. Using Proton's built-in packet trace, along with at least an outline understanding of the AMQP protocol, is a good way to debug this kind of application.