Using the Qpid Proton C++ library to understand AMQP
Apache Qpid Proton is a library in C and C++ for carrying out messaging operations using AMQP (there's also a very similar Python library). Very often these messaging operations will be between some kind of message broker (e.g., Apache Artemis) and its clients, but AMQP can be used for peer-to-peer communication, as will be demonstrated in this article.
AMQP is not a trivially-straightforward protocol and, for better or worse, developers sometimes need to understand the low-level details to make effective use of it. This understanding is, perhaps, less pressing for Java developers, who may just use a JMS wrapper around AMQP, like Qpid-JMS. But even in that case, it helps to understand the protocol, if only to understand the error messages that the client runtime produces. For example, it's difficult to know what to do about the notorious "remote did not respond to a drain request in time" message, if you don't know what a "drain request is"; and it's difficult to understand a drain request unless you understand the flow performative and link credit.
It's not really my intention to explain any of those things in this article; my much more modest goal is to demonstrate that a good way to investigate how AMQP works is to use simple Qpid Proton applications, along with the built-in packet trace that Proton provides. This approach can give a useful insight into what goes on at the wire protocol level, and there are plenty of simple Proton applications to experiment with.
To this end, I am maintaining a small collection of simple, fully-documented Proton C+ examples. The code is available from my GitHub repository as usual. Instructions on building and running the code are in the README file in the repository -- you'll need the Proton development headers and the Proton library and its dependencies. It can be illustrative to run the examples whilst capturing raw network data, using a tool like WireShark or tcpdump. However, I won't be showing raw TCP data in this article.
I'm not going to explain here how to obtain and install the Proton C++ library -- I did that in an earlier article. I'm assuming here that you have the library and development headers available, and that you have some familiarity with C++ programming. I'm also using Linux as my demonstration platform, although Proton is available for other platforms.
The code examples
For demonstration purposes, I'll show a simple AMQP client and server.
In the repository I referenced earlier, the source code is
server.cpp
for the server, and send_lots.cpp
for the client. The server code just responds to incoming requests to
create a messaging session on an address called foo
.
The client sends a number of messages to that address.
As supplied, the client and server use port 5672. That's the default for
AMQP, so you may need to ensure that you aren't running any other AMQP-aware
software on the same host as the server process. Also by default, the client
connects to the server on localhost
, but that's easy to change
in the source.
Running the client and server
Assuming you've built the samples (which, in most cases, should
amount to running make
) you can run both the client and
server with AMQP tracing enabled, like this:
$ PN_TRACE_FRM=1 ./bin/server
And in a different terminal:
$ PN_TRACE_FRM=1 ./bin/send_lots
Examining the output
Here is the sample output from send_lots
-- I've removed some
duplicate and irrelevant lines, to make it easier to follow.
creating sender created sender [0x11fd500]: SASL:FRAME: -> SASL [0x11fd500]: SASL:FRAME: <- SASL [0x11fd500]: AMQP:FRAME:0 <- @sasl-mechanisms(64) [sasl-server-mechanisms=@[:GSS-SPNEGO, :GSSAPI, :ANONYMOUS]] [0x11fd500]: AMQP:FRAME:0 -> @sasl-init(65) [mechanism=:ANONYMOUS, initial-response=b"anonymous@fedora"] [0x11fd500]: AMQP:FRAME:0 <- @sasl-outcome(68) [code=0x0] [0x11fd500]: AMQP:FRAME: -> AMQP [0x11fd500]: AMQP:FRAME:0 -> @open(16) [container-id="4f021f4e-e4a8-4a5d-a637-14219a74bca0", hostname="127.0.0.1", channel-max=0x7fff] [0x11fd500]: AMQP:FRAME:0 -> @begin(17) [next-outgoing-id=0x0, incoming-window=0x7fffffff, outgoing-window=0x7fffffff] [0x11fd500]: AMQP:FRAME:0 -> @attach(18) [name="00a31a02-cb86-48d9-b20d-437ddc6289ef", handle=0x0, role=false, snd-settle-mode=0x2, rcv-settle-mode=0x0, source=@source(40) [durable=0x0, timeout=0x0, dynamic=false], target=@target(41) [address="foo", durable=0x0, timeout=0x0, dynamic=false], initial-delivery-count=0x0, max-message-size=0x0] [0x11fd500]: AMQP:FRAME: <- AMQP [0x11fd500]: AMQP:FRAME:0 <- @open(16) [container-id="38ebe19c-cb6f-4f4c-8cb6-19a3739ccd30", channel-max=0x7fff] [0x11fd500]: AMQP:FRAME:0 <- @begin(17) [remote-channel=0x0, next-outgoing-id=0x0, incoming-window=0x7fffffff, outgoing-window=0x7fffffff] [0x11fd500]: AMQP:FRAME:0 <- @attach(18) [name="00a31a02-cb86-48d9-b20d-437ddc6289ef", handle=0x0, role=true, snd-settle-mode=0x2, rcv-settle-mode=0x0, target=@target(41) [address="foo", durable=0x0, timeout=0x0, dynamic=false], initial-delivery-count=0x0, max-message-size=0x0] [0x11fd500]: AMQP:FRAME:0 <- @flow(19) [next-incoming-id=0x0, incoming-window=0x7fffffff, next-outgoing-id=0x0, outgoing-window=0x7fffffff, handle=0x0, delivery-count=0x0, link-credit=0xa, drain=false] on_transport_open on_connection_open on_session_open on_sender_open on_sendable my link credit is now 10 Sending message sent messages = 1 Sending message sent messages = 2 ... (more) ... Sending message sent messages = 10 [0x11fd500]: AMQP:FRAME:0 -> @transfer(20) [handle=0x0, delivery-id=0x0, delivery-tag=b"\x01\x00\x00\x00\x00\x00\x00\x00", message-format=0x0] (32) \x00SpE\x00Ss\xc0\x06\x01\xa1\x03foo\x00Sw\xa1\x0cHello, world [0x11fd500]: AMQP:FRAME:0 -> @transfer(20) [handle=0x0, delivery-id=0x1, delivery-tag=b"\x02\x00\x00\x00\x00\x00\x00\x00", message-format=0x0] (32) \x00SpE\x00Ss\xc0\x06\x01\xa1\x03foo\x00Sw\xa1\x0cHello, world ... (more) ... [0x11fd500]: AMQP:FRAME:0 -> @transfer(20) [handle=0x0, delivery-id=0x9, delivery-tag=b"\x0a\x00\x00\x00\x00\x00\x00\x00", message-format=0x0] (32) \x00SpE\x00Ss\xc0\x06\x01\xa1\x03foo\x00Sw\xa1\x0cHello, world [0x11fd500]: AMQP:FRAME:0 <- @flow(19) [next-incoming-id=0xa, incoming-window=0x7fffffff, next-outgoing-id=0x0, outgoing-window=0x7fffffff, handle=0x0, delivery-count=0xa, link-credit=0xa, drain=false] [0x11fd500]: AMQP:FRAME:0 <- @disposition(21) [role=true, first=0x0, last=0x9, settled=true, state=@accepted(36) []] on_sendable my link credit is now 10 on_tracker_accept Closing connection on_tracker_settle on_tracker_accept on_tracker_settle on_tracker_accept on_tracker_settle ... (more) ... on_tracker_accept on_tracker_settle [0x11fd500]: AMQP:FRAME:0 -> @close(24) [] [0x11fd500]: IO:FRAME: -> EOS [0x11fd500]: AMQP:FRAME:0 <- @close(24) [] [0x11fd500]: IO:FRAME: <- EOS on_connection_close
I'm not going to explain the AMQP protocol in this article, but a few interesting features are worth some attention.
• The conversation starts with SASL authentication, even though the
server does not require credentials. This is, therefore, 'anonymous' authentication, as shown by the token mechanism=:ANONYMOUS
.
• The client (sender) then sends the @open
,
@begin
, and @link
frames. The tokens
'open', etc., are called 'performatives' in AMQP jargon.
open
initiates the communication, and exchanges capabilities
and limits. begin
opens a session -- a 'session' in AMQP
terminology is a pair of unidirectional communication channels, one inbound
and one outbound. TCP is, of course, full-duplex; but AMQP could conceivably
be used with other communications infrastructure. Note that the
receiver responds with its own @open
,
@begin
, and @link
, but the sender does not
wait for each response before sending its next frame. This willingness
to proceed without specific confirmation is called 'pipelining' in AMQP.
• The sender can not proceed until the receiver has sent a
@flow
packet, setting out the amount of link credit it will
allow. Link credit is usually specified in terms of messages, so the
token link-credit=0xa
means "you can send ten messages
without checking with me again" (hexadecimal 0x0A = 10).
• As the sender proceeds to send messages, the Proton library
generates a bunch of @transfer
frames. These contain the
actual message data. Note that the sender does not wait for any
confirmation -- in fact, all ten of the message payloads in this example
will probably be packed into a single TCP packet.
So when does the sender get an acknowledgement from the
receiver?
• The receiver sends a bunch of @disposition
frames,
indicating what it has done with the messages. state=@accepted
represents a positive acknowledgement -- the message was received and
processed. Of course, what 'processed' amounts to depends on the application.
• The sender and receiver both send @close
frames, to
indicate that they are finished with the current
connection. They might send @detach
and
@end
as well, but the specification does not require this.
If you're programming with Proton, it's helpful to note how the way that
Proton invokes the application's callback handlers
(like on_sendable()
) is interleaved with the actual AMQP frames.
In particular, these callbacks are not necessarily called exactly when
the corresponding AMQP action takes place. Notice, for example, how
a bunch of "accepted" disposition frames are received before the
application's on_tracker_accept()
method gets called for the
first time.
Closing remarks
The AMQP specification does not make for easy reading and, if you're using high-level AMQP libraries like Qpid-JMS the details might not be important. For low-level programming -- particularly when programming Proton server applications -- knowing the fine details really matters. Using Proton's built-in packet trace, along with at least an outline understanding of the AMQP protocol, is a good way to debug this kind of application.