Skip to main content

Aiohttp

Aiohttp server exposing an OpenRPC server over HTTP and Websocket.

import asyncio

from aiohttp import web, WSMessage
from openrpc import RPCServer

rpc = RPCServer(title="TransportDemo", version="1.0.0")


@rpc.method()
def add(a: int, b: int) -> int:
return a + b


async def ws_process_rpc(request: web.BaseRequest) -> web.WebSocketResponse:
"""Process RPC requests through websocket."""
ws = web.WebSocketResponse()
await ws.prepare(request)

async def _process_rpc(rpc_request: str) -> None:
json_rpc_response = await rpc.process_request_async(rpc_request)
if json_rpc_response is not None:
await ws.send_str(json_rpc_response)

async for msg in ws:
msg: WSMessage = msg # type: ignore
asyncio.create_task(_process_rpc(msg.data))

return ws


async def http_process_rpc(request: web.Request) -> web.Response:
"""Process RPC request through HTTP server."""
request_body = await request.text()
response = await rpc.process_request_async(request_body)
if response is not None:
return web.Response(text=response)
return web.Response()


def main() -> None:
"""Main entry point."""
app = web.Application()
app.router.add_route("GET", "/api/v1", ws_process_rpc)
app.router.add_route("POST", "/api/v1", http_process_rpc)
web.run_app(app)


if __name__ == "__main__":
main()