The RPC layer¶
Remote procedure calls let you, as the name suggest, call functions or methods of remote objects via a network connection (nearly) like you would call local functions. This often leads to more readable code compared to using the lower level channels.
Basics¶
The basic idea behind RPC is as follows: You have a remote object with some methods. On the local side of the connection you have a proxy object which has the same signature, but when you call one of the proxy’s methods, it actually sends a message (method_name, args, kwargs) to the peer. The peer has a router that maps method_name to an actual method. It calls the method and sends its return value back to the proxy. The proxy method returns this value as if it was calculated locally. This works very similarly to how web-frameworks like Django resolve URLs and map them to views.
The following list briefly explains the most important components of aiomas RPC:
Service side:
- An RPC server: It starts a server socket and as a root object whose methods can be called by clients.
- An RPC service (or a hierarchy of services): RPC services are classes with methods that clients can call. Instead of classes with methods you can also use dicts with normal functions. Services can be nested to created hierarchies.
- RPC routers: Routers map function names (or paths) to actual methods. An class with an RPC service automatically creates a new router for each of its instances.
- Exposed methods: Methods/functions need to be explicitly exposed via a simple decorator. This is a security and safety measure which makes sure that clients can only call functions they are intended to.
Client side:
- An RPC client: It represents a network connection to an RPC server and provides a proxy object to its service.
- RPC proxies: Proxy objects represent the remote services. They resemble the signature of the services they represent and delegate method calls to them.
Here is a simple example that demonstrate how these components work together:
>>> import aiomas
>>>
>>>
>>> class MathServer:
... # The "Service" creates a router for each instance of "MathServer":
... router = aiomas.rpc.Service()
...
... # Exposed methods can be called by clients:
... @aiomas.expose
... def add(self, a, b):
... return a + b
...
>>>
>>> async def client():
... """Client coroutine: Call the server's "add()" method."""
... # Connect to the RPC server and get an "RpcClient":
... rpc_con = await aiomas.rpc.open_connection(('localhost', 5555))
... # "remote" is a Proxy to the remote service.
... # We cann call its "add()" method:
... rep = await rpc_con.remote.add(3, 4)
... print('What’s 3 + 4?', rep)
... await rpc_con.close()
>>>
>>> # Start the RPC server with an instance of the "MathServer" service:
>>> server = aiomas.run(aiomas.rpc.start_server(('localhost', 5555), MathServer()))
>>>
>>> aiomas.run(client())
What’s 3 + 4? 7
>>> server.close()
>>> aiomas.run(server.wait_closed())
Let’s discuss the details of what we just did:
The class MathServer
is going to be the root node of our RPC server.
Therefore, it needs to be marked as RPC service by giving it a router
attribute with an aiomas.rpc.Service
instance. Service
is
a descriptor; when you access the router attribute through the MathServer
class, you get the Service
instance, but when you access it via a
MathServer
instance, you get an aiomas.rpc.Router
instance
instead. The Service
descriptor makes sure that every instance of
MathServer
automatically gets its own Router
instance.
The add()
method is decorated with expose()
which makes
it available for RPC calls. The arguments and return values of exposed
functions must be serializable by the Codec
used.
Numbers, booleans, strings, lists and dicts should always work.
When we start our RPC server (via aiomas.rpc.start_server()
) we need to
pass an instance of our MathServer
class to it.
In the client, we create an RPC connection via
aiomas.rpc.open_connection()
. It returns an
aiomas.rpc.RpcClient
instance. We can get the proxy to the RPC root
node via its remote
attribute. In contrast to
normal method calls, we need to use the await
(or yield
from
) statement for remote method calls.
Using dictionaries with functions as RPC services¶
Sometimes, you don’t want or don’t need classes but plain Python functions. With aiomas you can put them in a dict and expose them as an RPC service, too. Here’s a rewrite of out math server example that we discussed in the last section:
>>> @aiomas.expose
... def add(a, b):
... return a + b
...
>>> math_service = aiomas.rpc.ServiceDict({
... 'add': add,
... })
>>>
>>> # Start the RPC server with the math service:
>>> server = aiomas.run(aiomas.rpc.start_server(('localhost', 5555), math_service))
>>>
>>> # The client stays the same as in our last example:
>>> aiomas.run(client())
What’s 3 + 4? 7
>>> server.close()
>>> aiomas.run(server.wait_closed())
You just need a dict mapping names to the respective functions and wrap it with
aiomas.rpc.ServiceDict
. You can then uses this to start an RPC
server.
How to build hierarchies of RPC services¶
When you want to expose a lot of functions, you may wish to group and
categorize them. You can do this by building hierarchies of RPC services (just
think of the RPC services as folders and the exposed methods as files, for
example). On the client side, you use the .
operator to access
a sub-service (e.g., root_service.sub_service.method()
).
When you build service hierarchies, you can freely mix class-based and dictionary-based services.
If the parent service is a dictionary, you can add sub services as a new
name: service_instance
pair:
>>> @aiomas.expose
... def add(a, b):
... return a + b
...
>>> # A Sub-service for addition
>>> adding_service = aiomas.rpc.ServiceDict({
... 'add': add,
... })
>>>
>>> # A Sub-service for subtraction
>>> class SubService:
... router = aiomas.rpc.Service()
...
... @aiomas.expose
... def sub(self, a, b):
... return a - b
...
>>> # Service dict with two sub-services:
>>> root_service = aiomas.rpc.ServiceDict({
... 'addition': adding_service, # Service dict
... 'subtraction': SubService(), # Instance(!) of service class
... })
>>>
>>> async def client():
... rpc_con = await aiomas.rpc.open_connection(('localhost', 5555))
... # Call the addition service:
... rep = await rpc_con.remote.addition.add(3, 4)
... print('What’s 3 + 4?', rep)
... # Call the subtraction service:
... rep = await rpc_con.remote.subtraction.sub(4, 3)
... print('What’s 4 - 3?', rep)
... await rpc_con.close()
>>>
>>> server = aiomas.run(aiomas.rpc.start_server(('localhost', 5555), root_service))
>>>
>>> aiomas.run(client())
What’s 3 + 4? 7
What’s 4 - 3? 1
>>> server.close()
>>> aiomas.run(server.wait_closed())
As you can see, this is very straight forward. Like a folder that can contain
sub-folders and files, a ServiceDict
can contain
sub-services and exposed functions.
Adding sub-services to a service class looks a little bit more complicated, but basically works the same:
>>> @aiomas.expose
... def add(a, b):
... return a + b
...
>>> # A Sub-service for addition
>>> adding_service = aiomas.rpc.ServiceDict({
... 'add': add,
... })
>>>
>>> # A Sub-service for subtraction
>>> class SubService:
... router = aiomas.rpc.Service()
...
... @aiomas.expose
... def sub(self, a, b):
... return a - b
...
>>> class RootService:
... # You first have to declare that instances of this class will have
... # the following sub-services:
... router = aiomas.rpc.Service(['addition', 'subtraction'])
...
... def __init__(self):
... # For each(!) instance, you have to add instances of the
... # declared sub-services:
... self.addition = adding_service
... self.subtraction = SubService()
>>>
>>>
>>> server = aiomas.run(aiomas.rpc.start_server(('localhost', 5555), RootService()))
>>>
>>> # The client remains the same
>>> aiomas.run(client())
What’s 3 + 4? 7
What’s 4 - 3? 1
>>> server.close()
>>> aiomas.run(server.wait_closed())
What makes adding sub-services to classes a bit more complicated is the fact
that classes define the service hierarchy but you use instances for the actual
RPC servers. That’s why you first need to declare at class level which
attributes will hold sub-services and then actually add these sub-services in
the class’ __init__()
.
You can also manually compose hierarchies with the router’s
add()
and
set_sub_router()
methods. These methods give you
a bit more flexibility to create service hierarchies on-the-fly:
>>> @aiomas.expose
... def add(a, b):
... return a + b
...
>>> # A Sub-service for addition
>>> adding_service = aiomas.rpc.ServiceDict({
... 'add': add,
... })
>>>
>>> # A Sub-service for subtraction
>>> class SubService:
... router = aiomas.rpc.Service()
...
... @aiomas.expose
... def sub(self, a, b):
... return a - b
...
>>> class RootService:
... # In contrast to the last example, we don't declare any sub-services:
... router = aiomas.rpc.Service()
...
... def __init__(self):
... # Add a sub-services via the router's "add()" method:
... self.addition = adding_service
... self.router.add('addition')
...
... # Add a sub-service via the router's "set_sub_router()" method:
... self.subtraction = SubService()
... self.router.set_sub_router(self.subtraction.router, 'subtraction')
>>>
>>>
>>> server = aiomas.run(aiomas.rpc.start_server(('localhost', 5555), RootService()))
>>>
>>> # The client remains the same
>>> aiomas.run(client())
What’s 3 + 4? 7
What’s 4 - 3? 1
>>> server.close()
>>> aiomas.run(server.wait_closed())
The method add()
looks the associated object has an
attribute with the specified name that holds the sub services. That service is
then exposed via the same name.
Using the method set_sub_router()
, you can set any
router as a sub-router and expose it via the specified name. This provides the
most flexibility for building service hierarchies.
Bi-directional RPC: How to allow callbacks from server to client¶
Aiomas supports bi-directional RPC. That means that not only can a client call server methods, but a server can also call client methods.
For uni-directional RPC, the server specifies an RPC services and a client gets a proxy to it when it makes a connection to the server. For bi-directional RPC, you also need to define a service for your client. The client can pass its service instance as argument of an RPC to the server. The server will then receive a proxy to that service, that it can use to make calls back to the client.
That works because objects with a router
attribute that is an RPC router
can be serialized and be sent to the peer where they get deserialized to an RPC
proxy object.
Let’s look at an example to see how it works. The first example uses class-based services:
>>> import aiomas
>>>
>>>
>>> class Client:
... # The client needs to be marked as RPC service:
... router = aiomas.rpc.Service()
...
... def __init__(self, name):
... self.name = name
...
... async def run(self):
... # When we open a connection, we need to pass the service instance
... # ("self" in this case) so that a background task for it can be
... # started:
... rpc_con = await aiomas.rpc.open_connection(('localhost', 5555),
... rpc_service=self)
...
... # We can now pass the service to the server when we call one of its
... # methods:
... rep = await rpc_con.remote.server_method(self)
... print('Server reply:', rep)
...
... await rpc_con.close()
...
... # This method is exposed to the server:
... @aiomas.expose
... def get_client_name(self):
... return self.name
>>>
>>>
>>> class Server:
... router = aiomas.rpc.Service()
...
... @aiomas.expose
... async def server_method(self, client_proxy):
... # When a client passes a reference to its service, we'll receive it as
... # a proxy object which we can use to call a client method:
... client_name = await client_proxy.get_client_name()
... return 'Client name is "%s"' % client_name
>>>
>>>
>>> server = aiomas.run(aiomas.rpc.start_server(('localhost', 5555), Server()))
>>>
>>> aiomas.run(Client('Monty').run())
Server reply: Client name is "Monty"
>>>
>>> server.close()
>>> aiomas.run(server.wait_closed())
Bi-directional RPC works with class-based as well as dict-based services. Furthermore, if your server or client provide a hierarchy of services, you can not only pass the root service but also any of its sub-services as function arguments.
How to handle remote exceptions¶
If an RPC raises an error, aiomas wraps it with
a RemoteException
and forwards it to the caller. It
also provides you the source (peer name) of the exception and its original
traceback:
>>> @aiomas.expose
... def fail_badly():
... raise ValueError('"spam" is not a number')
>>>
>>> service = aiomas.rpc.ServiceDict({'fail_badly': fail_badly})
>>>
>>> async def client():
... rpc_con = await aiomas.rpc.open_connection(('127.0.0.1', 5555))
... try:
... await rpc_con.remote.fail_badly()
... except aiomas.RemoteException as exc:
... print('Origin:', exc.origin)
... print('Traceback:', exc.remote_traceback)
>>>
>>> server = aiomas.run(aiomas.rpc.start_server(('127.0.0.1', 5555), service))
>>>
>>> aiomas.run(client())
Origin: ('127.0.0.1', 5555)
Traceback: Traceback (most recent call last):
...
ValueError: "spam" is not a number
>>> server.close()
>>> aiomas.run(server.wait_closed())
It is currently not possible to forward the original exception instance, because the caller might not have the required code available (However, I won’t rule out the possibility that I might eventually implement this).
How to get a list of connected clients¶
An RPC service on the server side does not know if or when a new client
connects. However, you can pass a client connected callback to
aiomas.rpc.start_server()
that cats called once for each new
connection. Its only argument is the RpcClient
for that
connection. You can uses this, for example, to close the connection with the
client or call the client’s exposed methods (if there are some).
>>> service = aiomas.rpc.ServiceDict({})
>>>
>>> async def client():
... rpc_con = await aiomas.rpc.open_connection(('127.0.0.1', 5555))
... await rpc_con.close()
>>>
>>> def client_connected_cb(rpc_client):
... print('Client connected:', rpc_client)
>>>
>>> server = aiomas.run(aiomas.rpc.start_server(('127.0.0.1', 5555), service,
... client_connected_cb))
>>>
>>> aiomas.run(client())
Client connected: <aiomas.rpc.RpcClient object at 0x...>
>>> server.close()
>>> aiomas.run(server.wait_closed())
How to handle connection losses¶
For many reasons, the connection between two endpoints can be lost at any time.
If you are in a coroutine and actively doing RPC, you will get
a ConnectionResetError
thrown into your coroutine if the connection
drops:
>>> import aiomas
>>>
>>>
>>> async def client():
... rpc_con = await aiomas.rpc.open_connection(('localhost', 5555))
... # The server will close the connection when we make the following call:
... try:
... await rpc_con.remote.close_connection()
... except ConnectionResetError:
... print('Connection lost :(')
>>>
>>>
>>> class Server:
... router = aiomas.rpc.Service()
...
... def __init__(self):
... self.clients = []
...
... def client_connected(self, client):
... """*Client connected cb.* that adds new clients to ``self.clients``"""
... self.clients.append(client)
...
... @aiomas.expose
... async def close_connection(self):
... """Close all open connections and remove them from ``self.clients``."""
... while self.clients:
... client = self.clients.pop()
... await client.close()
>>>
>>> server_service = Server()
>>> server = aiomas.run(aiomas.rpc.start_server(('localhost', 5555),
... server_service,
... server_service.client_connected))
>>>
>>> aiomas.run(client())
Connection lost :(
>>>
>>> server.close()
>>> aiomas.run(server.wait_closed())
If you only serve an RPC service, it gets a little bit more complicated,
because RPC services are not connection-aware. However,
aiomas.rpc.RpcClient.on_connection_reset()
lets you register a callback
that gets called when the connection is lost. (You get an instance of
RpcClient
as return value from
open_connection()
or via
start_server()
‘s client connected callback.)
In the following example, the server again has a list of connected clients. But this time, the client disconnects and the server removes the closed connection from its list of clients:
>>> import aiomas
>>>
>>>
>>> async def client():
... rpc_con = await aiomas.rpc.open_connection(('localhost', 5555))
... await rpc_con.close()
>>>
>>>
>>> class Server:
... router = aiomas.rpc.Service()
...
... def __init__(self):
... self.clients = []
...
... def client_connected(self, client):
... # Register a callback that removes the client from our list
... # when it disconnects:
... def remove_client(exc):
... print('Client disconnected :(')
... self.clients.remove(client)
...
... client.on_connection_reset(remove_client)
... print('Client connected :)')
... self.clients.append(client)
>>>
>>> server_service = Server()
>>> server = aiomas.run(aiomas.rpc.start_server(('localhost', 5555),
... server_service,
... server_service.client_connected))
>>>
>>> aiomas.run(client())
Client connected :)
Client disconnected :(
>>>
>>> server.close()
>>> aiomas.run(server.wait_closed())