aiomas.local_queue
¶
The local queue transport roughly mimics a normal TCP transport, but it sends
and receives messages via two asyncio.Queue
instances.
Its purpose is to aid the development and debugging of complex networking
algorithms and distributed or multi-agent systems. In contrast to normal
network transports, messages send via the LocalQueueTransport
will
always arrive in a deterministic order [1].
This transport does not work across multiple processes and is not thread safe, so it should only be used within a single thread and process.
The easiest way to use it is to create a LocalQueue
instance via the
get_queue()
function and pass it to
aiomas.channel.start_server()
/aiomas.channel.open_connection()
or aiomas.agent.Container.create()
as addr argument.
[1] | Actually, message sent via a single TCP connection also arrive at a deterministic order (this is a property of the TCP/IP protocol). So the LocalQueue transport won’t give you any benefits in this case. However, if you have multiple connections to the same server and send message through them in parallel, it’s no longer deterministic in which order the messages arrive from the different connections. In this case, the LocalQueue transport can help you. |
-
aiomas.local_queue.
get_queue
(queue_id)[source]¶ Return a
LocalQueue
instance for the given queue_id.If no instance is cached yet, create a new one.
Queue IDs must be strings and must not contain the
/
character. Raise aValueError
if these rules are violated.
-
aiomas.local_queue.
create_connection
(protocol_factory, lq, *, loop=None, **kwds)[source]¶ Connect to a
LocalQueue
lq and return a (transport, protocol) pair.The protocol_factory must be a callable returning a protocol instance.
Before a connection to lq can be made, a server must be started for this instance (see
create_server()
).
-
aiomas.local_queue.
create_server
(protocol_factory, lq, **kwds)[source]¶ Create a
LocalQueue
server bound to lq.The protocol_factory must be a callable returning a protocol instance.
Return a
LocalQueueServer
instance. That instance is also set asserver
for lq.
-
class
aiomas.local_queue.
LocalQueue
(queue_id)[source]¶ An instance of this class serves as transport description when creating a server or connection.
The functions
create_server()
andcreate_connection()
both require an instance of this class. Alternatively, instances of this class can be passed as addr argument toaiomas.channel.start_server()
andaiomas.channel.open_connection()
A server needs to be started before any connections can be made.
-
queue_id
¶ The queue’s ID.
-
server
¶ The
LocalQueueServer
instance that was bound to this instance orNone
if no server has yet been started.
-
set_server
(server)[source]¶ Set a
LocalQueueServer
instance.Raise a
RuntimeError
if a server has already been bound to this instance.This method is called by
create_server()
.
-
unset_server
()[source]¶ Unset the server from this instance.
This method is called when the server is closed (see
LocalQueueServer.close()
).
-
new_connection
(sendq, recvq, loop=None)[source]¶ Create a connection endpoint on the server side.
This method is called by
create_connection()
.sendq and recvq are the queues used for sending and receiving messages to and from the client.
-
-
class
aiomas.local_queue.
LocalQueueServer
(protocol_factory, lq)[source]¶ Implements
asyncio.events.AbstractServer
. An instance of this class is returned bycreate_server()
.lq is the
LocalQueue
instance that this server was bound to.protocol_factory is a callable that is called for each new client connection in order to create a new protocol instance.
-
lq
¶ The
LocalQueue
the server is bound to.
-
new_connection
(sendq, recvq, loop)[source]¶ Create a new protocol and transport instance.
Call the protocol factory, create a new
LocalQueueTransport
with sendq and recvq and wire them together.Called by
create_connection()
viaLocalQueue.new_connection()
.
-
close
()[source]¶ Close the server and unset this instance from the associated
LocalQueue
instance.
-
-
class
aiomas.local_queue.
LocalQueueTransport
(lq, sendq, recvq, protocol, loop)[source]¶ Implements
asyncio.transports.Transport
.A LocalQueueTransport has two asynchronous queues (instances of
asyncio.Queue
) – one for sending messages to the other side and one for receiving messages from it.-
close
()[source]¶ Close the transport.
Buffered data will be flushed asynchronously. No more data will be received. After all buffered data is flushed, the protocol’s
connection_lost()
method will (eventually) be called withNone
as its argument.
-