aiomas.channel
¶
This module implements and asyncio asyncio.Protocol
protocol for a
request-reply Channel
.
-
aiomas.channel.
open_connection
(addr, *, loop=None, codec=None, extra_serializers=(), timeout=0, **kwds)[source]¶ Return a
Channel
connected to addr.This is a convenience wrapper for
asyncio.BaseEventLoop.create_connection()
,asyncio.BaseEventLoop.create_unix_connection()
, andaiomas.local_queue.create_connection()
.If addr is a tuple
(host, port)
, a TCP connection will be created. If addr is a string, it should be a path name pointing to the unix domain socket to connect to. If addr is aaiomas.local_queue
instance, a LocalQueue connection will be created.You can optionally provide the event loop to use.
By default, the
JSON
codec is used. You can override this by passing any subclass ofaiomas.codecs.Codec
as codec.You can also pass a list of extra_serializers for the codec. The list entires need to be callables that return a tuple with the arguments for
add_serializer()
.With a timeout of 0 (the default), there will only be one connection attempt before an error is raised (
ConnectionRefusedError
for TCP sockets and LocalQueue,FileNotFoundError
for Unix domain sockets). If you set timeout to a number > 0 orNone
, this function will try to connect repeatedly for at most that many seconds (or indefinitely) before an error is raised. Use this if you need to start the client before the server.The remaining keyword argumens kwds are forwarded to
asyncio.BaseEventLoop.create_connection()
andasyncio.BaseEventLoop.create_unix_connection()
respectively.This function is a coroutine.
-
aiomas.channel.
start_server
(addr, client_connected_cb, *, loop=None, codec=None, extra_serializers=(), **kwds)[source]¶ Start a server listening on addr and call client_connected_cb for every client connecting to it.
This function is a convenience wrapper for
asyncio.BaseEventLoop.create_server()
,asyncio.BaseEventLoop.create_unix_server()
, andaiomas.local_queue.create_server()
.If addr is a tuple
(host, port)
, a TCP socket will be created. If addr is a string, a unix domain socket at this path will be created. If addr is aaiomas.local_queue
instance, a LocalQueue server will be created.The single argument of the callable client_connected_cb is a new instance of
Channel
.You can optionally provide the event loop to use.
By default, the
JSON
codec is used. You can override this by passing any subclass ofaiomas.codecs.Codec
as codec.You can also pass a list of extra_serializers for the codec. The list entires need to be callables that return a tuple with the arguments for
add_serializer()
.The remaining keyword argumens kwds are forwarded to
asyncio.BaseEventLoop.create_server()
andasyncio.BaseEventLoop.create_unix_server()
respectively.This function is a coroutine.
-
class
aiomas.channel.
ChannelProtocol
(codec, client_connected_cb=None, *, loop)[source]¶ Asyncio
asyncio.Protocol
which connects the low level transport with the high levelChannel
API.The codec is used to (de)serialize messages. It should be a sub-class of
aiomas.codecs.Codec
.Optionally you can also pass a function/coroutine client_connected_cb that will be executed when a new connection is made (see
start_server()
).-
connection_made
(transport)[source]¶ Create a new
Channel
instance for a new connection.Also call the client_connected_cb if one was passed to this class.
-
connection_lost
(exc)[source]¶ Set a
ConnectionError
to theChannel
to indicate that the connection is closed.
-
data_received
(data)[source]¶ Buffer incomming data until we have a complete message and then pass it to
Channel
.Messages are fixed length. The first four bytes (in network byte order) encode the length of the following payload. The payload is a triple
(msg_type, msg_id, content)
encoded with the specified codec.
-
eof_received
()[source]¶ Set a
ConnectionResetError
to theChannel
.
-
write
(len_bytes, content)[source]¶ Serialize content and write the result to the transport.
This method is a coroutine.
-
-
class
aiomas.channel.
Request
(content, message_id, protocol)[source]¶ Represents a request returned by
Channel.recv()
. You shoudn’t instantiate it yourself.content contains the incoming message.
msg_id is the ID for that message. It is unique within a channel.
protocol is the channel’s
ChannelProtocol
instance that is used for writing back the reply.To reply to that request you can
yield from
Request.reply()
orRequest.fail()
.-
content
¶ The content of the incoming message.
-
fail
(exception)[source]¶ Indicate a failure described by the exception instance.
This will raise a
RemoteException
on the other side of the channel.This method is a coroutine.
-
-
class
aiomas.channel.
Channel
(protocol, codec, transport, loop)[source]¶ A Channel represents a request-reply channel between two endpoints. An instance of it is returned by
open_connection()
or is passed to the callback ofstart_server()
.protocol is an instance of
ChannelProtocol
.transport is an
asyncio.BaseTransport
.loop is an instance of an
asyncio.BaseEventLoop
.-
codec
¶ The codec used to de-/encode messages send via the channel.
-
transport
¶ The transport of this channel (see the Python documentation for details).
-
send
(content)[source]¶ Send a request content to the other end and return a future which is triggered when a reply arrives.
One of the following exceptions may be raised:
ValueError
if the message is too long (the length of the encoded message does not fit into a long, which is ~ 4 GiB).RemoteException
: The remote site raised an exception during the computation of the result.ConnectionError
(or its subclassConnectionResetError
): The connection was closed during the request.RuntimeError
:- If an invalid message type was received.
- If the future returned by this method was already triggered or canceled by a third party when an answer to the request arrives (e.g., if a task containing the future is cancelled). You get more detailed exception messages if you enable asyncio’s debug mode
try: result = yield from channel.request('ohai') except RemoteException as exc: print(exc)
-
recv
()[source]¶ Wait for an incoming
Request
and return it.May raise one of the following exceptions:
ConnectionError
(or its subclassConnectionResetError
): The connection was closed during the request.RuntimeError
: If two processes try to read from the same channel or if an invalid message type was received.
This method is a coroutine.
-
get_extra_info
(name, default=None)[source]¶ Wrapper for
asyncio.BaseTransport.get_extra_info()
.
-