The channel layer¶
The channel layer is aiomas’ lowest layer of abstraction. It lets you send and receive complete messages. In contrast to asyncio’s built-in stream protocol which just sends byte strings, messages are JSON-encoded [*] data (which is a lot more convenient).
[*] | Actually, whether JSON is used for encoding, depends on the codec that the channel uses. JSON is the default, but you can also use MsgPack or something else. At the bottom of this document, there’s a section explaining aiomas’ message format in detail. |
Here is a minimal example that shows how the Channel
can be used:
>>> import aiomas
>>>
>>>
>>> async def client():
... """Client coroutine: Send a greeting to the server and wait for a
... reply."""
... channel = await aiomas.channel.open_connection(('localhost', 5555))
... rep = await channel.send('ohai')
... print(rep)
... await channel.close()
>>>
>>>
>>> async def handle_client(channel):
... """Handle a client connection."""
... req = await channel.recv()
... print(req.content)
... await req.reply('cya')
... await channel.close()
>>>
>>>
>>> server = aiomas.run(aiomas.channel.start_server(('localhost', 5555), handle_client))
>>> aiomas.run(client())
ohai
cya
>>> server.close()
>>> aiomas.run(server.wait_closed())
A communication channel has two sides: The client side is created and returned
by open_connection()
. For each client connection, the server creates
a Channel
instance and starts a new background task of the client
connected callback (client_connected_cb) to which it passes that channel
instance.
Both, the client and server side, can send and receive messages. In the
example above, the client starts to send a request and the server side waits
for incoming requests. A request has a content
attribute
which holds the actual message. To send a reply, you can either use
Request.reply()
or Request.fail()
. Channel.send()
and
Request.reply()
take any data that the channel’s codec can serialize
(e.g., strings, numbers, lists, dicts, ...). Request.fail()
takes an
exception instance which is raised at the requesting side as
RemoteException
, as the following example
demonstrates:
>>> import aiomas
>>>
>>>
>>> async def client():
... """Client coroutine: Send a greeting to the server and wait for a
... reply."""
... channel = await aiomas.channel.open_connection(('localhost', 5555))
... try:
... rep = await channel.send('ohai')
... print(rep)
... except aiomas.RemoteException as e:
... print('Got an error:', str(e))
... finally:
... await channel.close()
>>>
>>>
>>> async def handle_client(channel):
... """Handle a client connection."""
... req = await channel.recv()
... print(req.content)
... await req.fail(ValueError(42))
... await channel.close()
>>>
>>>
>>> server = aiomas.run(aiomas.channel.start_server(('127.0.0.1', 5555), handle_client))
>>> aiomas.run(client())
ohai
Got an error: Origin: ('127.0.0.1', 5555)
ValueError: 42
>>> server.close()
>>> aiomas.run(server.wait_closed())
These are the basics of the channel layer. The following sections answer some detail questions.
How can I use and another codec?¶
In order to use another codec as the default JSON
one,
just pass the corresponding codec class (e.g., MsgPack
to open_connection()
and start_server()
:
>>> import aiomas
>>>
>>> CODEC = aiomas.codecs.MsgPack
>>>
>>> async def client():
... """Client coroutine: Send a greeting to the server and wait for a
... reply."""
... channel = await aiomas.channel.open_connection(('localhost', 5555),
... codec=CODEC)
... rep = await channel.send('ohai')
... print(rep)
... await channel.close()
>>>
>>>
>>> async def handle_client(channel):
... """Handle a client connection."""
... req = await channel.recv()
... print(req.content)
... await req.reply('cya')
... await channel.close()
>>>
>>>
>>> server = aiomas.run(aiomas.channel.start_server(('localhost', 5555), handle_client,
... codec=CODEC))
>>> aiomas.run(client())
ohai
cya
>>> server.close()
>>> aiomas.run(server.wait_closed())
Note, that the codecs aiomas.codecs.MsgPack
and
aiomas.codecs.MsgPackBlosc
are not available by default but have to
be explicitly enabled.
How can I serialize custom data types?¶
Both, open_connection()
and start_server()
take a list of
extra_serializers. Such a serializer is basically a function returning
a three-tuple (type, serialize, deserialize). You can find more details in
the codecs guide. Here is just a simple example:
>>> import aiomas
>>>
>>>
>>> class MyType:
... """Our serializable type."""
... def __init__(self, value):
... self.value = value
...
... def __repr__(self):
... return '%s(%r)' % (self.__class__.__name__, self.value)
>>>
>>>
>>> def serialize_mytype(obj):
... """Return a JSON serializable version "MyType" instances."""
... return obj.value
>>>
>>>
>>> def deserialize_mytype(value):
... """Make a "MyType" instance from *value*."""
... return MyType(value)
>>>
>>>
>>> def mytype_serializer():
... return (MyType, serialize_mytype, deserialize_mytype)
>>>
>>>
>>> EXTRA_SERIALIZERS = [mytype_serializer]
>>>
>>>
>>> async def client():
... """Client coroutine: Send a greeting to the server and wait for a
... reply."""
... channel = await aiomas.channel.open_connection(
... ('localhost', 5555), extra_serializers=EXTRA_SERIALIZERS)
... rep = await channel.send(['ohai', MyType(42)])
... print(rep)
... await channel.close()
>>>
>>>
>>> async def handle_client(channel):
... """Handle a client connection."""
... req = await channel.recv()
... print(req.content)
... await req.reply(MyType('cya'))
... await channel.close()
>>>
>>>
>>> server = aiomas.run(aiomas.channel.start_server(('localhost', 5555), handle_client,
... extra_serializers=EXTRA_SERIALIZERS))
>>> aiomas.run(client())
['ohai', MyType(42)]
MyType('cya')
>>> server.close()
>>> aiomas.run(server.wait_closed())
A shorter version for common cases is using the
aiomas.codecs.serializable()
decorator:
>>> import aiomas
>>>
>>>
>>> @aiomas.codecs.serializable
... class MyType:
... """Our serializable type."""
... def __init__(self, value):
... self.value = value
>>>
>>>
>>> EXTRA_SERIALIZERS = [MyType.__serializer__]
>>>
>>>
>>> async def client():
... """Client coroutine: Send a greeting to the server and wait for a
... reply."""
... channel = await aiomas.channel.open_connection(
... ('localhost', 5555), extra_serializers=EXTRA_SERIALIZERS)
... rep = await channel.send(['ohai', MyType(42)])
... print(rep)
... await channel.close()
>>>
>>>
>>> async def handle_client(channel):
... """Handle a client connection."""
... req = await channel.recv()
... print(req.content)
... await req.reply(MyType('cya'))
... await channel.close()
>>>
>>>
>>> server = aiomas.run(aiomas.channel.start_server(('localhost', 5555), handle_client,
... extra_serializers=EXTRA_SERIALIZERS))
>>> aiomas.run(client())
['ohai', MyType(value=42)]
MyType(value='cya')
>>> server.close()
>>> aiomas.run(server.wait_closed())
How can I bind a server socket to a random port?¶
You cannot ask your OS for an available port but have to try a randomly chosen port until you succeed:
>>> import errno
>>> import random
>>>
>>> max_tries = 100
>>> port_range = (49152, 65536)
>>>
>>> async def random_server(host, port_range, max_tries):
... for i in range(max_tries):
... try:
... port = random.randrange(*port_range)
... server = await aiomas.channel.start_server(
... (host, port), handle_client)
... except OSError as oe:
... if oe.errno != errno.EADDRINUSE:
... # Re-raise if not errno 48 ("address already in use")
... raise
... else:
... return server, port
... raise RuntimeError('Could not bind server to a random port.')
>>>
>>> server, port = aiomas.run(random_server('localhost', port_range, max_tries))
>>> server.close()
>>> aiomas.run(server.wait_closed())
Connection timeouts / Starting clients before the server¶
Sometimes, you need to start a client before the server is started. Therefore,
the function open_connection()
lets you specify a timeout. It
repeatedly retries to connect until timeout seconds have passed. By default,
timeout is 0 which means there is only one try.
>>> import asyncio
>>> import aiomas
>>>
>>>
>>> async def client():
... """Client coroutine: Send a greeting to the server and wait for a
... reply."""
... # Try to connect for 1s:
... channel = await aiomas.channel.open_connection(('localhost', 5555),
... timeout=1)
... rep = await channel.send('ohai')
... print(rep)
... await channel.close()
>>>
>>>
>>> async def handle_client(channel):
... """Handle a client connection."""
... req = await channel.recv()
... print(req.content)
... await req.reply('cya')
... await channel.close()
>>>
>>>
>>> # Start the client in background, ...
>>> t_client = asyncio.async(client())
>>> # wait 0.5 seconds, ...
>>> aiomas.run(asyncio.sleep(0.5))
>>> # and finally start the server:
>>> server = aiomas.run(aiomas.channel.start_server(('localhost', 5555), handle_client))
>>> aiomas.run(t_client)
ohai
cya
>>> server.close()
>>> aiomas.run(server.wait_closed())
How exactly do messages look like?¶
This section explains how aiomas messages look and how they are constructed. You can easily implement this protocol in other languages, too, and write programs that can communicate with aiomas.
Network messages consists of a four bytes long header and a payload of arbitrary length. The header is an unsigned integer (uint32) in network byte order (big-endian) and stores the number of bytes in the payload. The payload itself is an encoded [†] list containing the message type, a message ID and the actual content:
[†] | Depending on the codec you use, the payload may be
a UTF-8 encoded JSON string
(json.dumps().encode('utf-8') ) (this is the default), a MsgPack list (msgpack.packb() ), or whatever else
the codec produces. |
Messages send between two peers must follow the request-reply pattern. That means, every request
that one peer makes must be responded to by the other peer. Request use the
message type 0
, replies use 1
for success or 2
to indicate
a failure. The message ID is an integer that is unique for every request that
a network socket makes. Replies (no matter if successful or failed) need to
use the message ID of the corresponding request.
On the channel layer, the content of a request can be anything. On the RPC level, it a three-tuple (function_path, args, kwargs), e.g.:
[function, [arg0, arg1, ...], {kwarg0: val0, kwarg1: val1}]
Thereby, function is always a string containing the name of an exposed
functions; if you use nested services, sub-services and the function names are
separated by slashes (/
) as in URLs. The type of the arguments and keyword
arguments may vary
depending on the function.
The content types of replies are the same for both, the channel layer and the RPC layer. Normal (successful) replies can be anything. The content of failure replies are strings with the error message and/or a stack trace.