Python Async Extension
Example: The Default Python Async Extension
import asyncio
from ten import AsyncExtension, AsyncTenEnv
class DefaultAsyncExtension(AsyncExtension):
async def on_configure(self, ten_env: AsyncTenEnv) -> None:
# Mock async operation, e.g. network, file I/O.
await asyncio.sleep(0.5)
async def on_init(self, ten_env: AsyncTenEnv) -> None:
# Mock async operation, e.g. network, file I/O.
await asyncio.sleep(0.5)
async def on_start(self, ten_env: AsyncTenEnv) -> None:
# Mock async operation, e.g. network, file I/O.
await asyncio.sleep(0.5)
async def on_deinit(self, ten_env: AsyncTenEnv) -> None:
# Mock async operation, e.g. network, file I/O.
await asyncio.sleep(0.5)
async def on_cmd(self, ten_env: AsyncTenEnv, cmd: Cmd) -> None:
cmd_json = cmd.to_json()
ten_env.log_debug(f"DefaultAsyncExtension on_cmd: {cmd_json}")
# Mock async operation, e.g. network, file I/O.
await asyncio.sleep(0.5)
# Send a new command to other extensions and wait for the result. The
# result will be returned to the original sender.
new_cmd = Cmd.create("hello")
cmd_result = await ten_env.send_cmd(new_cmd)
ten_env.return_result(cmd_result, cmd)FAQ
Aysnc loop for event handling
Conclusion
Last updated