Source code for story_graph.engine

"""
Engine
======

"""

import asyncio
import logging
import random
import time
from copy import deepcopy
from datetime import datetime, timedelta
from typing import Any, AsyncGenerator, Dict, Optional, Union

import requests
from asgiref.sync import sync_to_async

from stream.frontend_types import Button, Checkbox, Dialog, Input, Text
from stream.models import Stream, StreamInstruction, StreamVariable

from .markdown_parser import md_to_ssml
from .models import AudioCell, CellType, Graph, Node, NodeDoor, ScriptCell

log = logging.getLogger(__name__)


[docs]class ScriptCellTimeout(Exception): pass
[docs]class GraphDeadEnd(Exception): pass
[docs]class InvalidPythonCode(Exception): pass
[docs]class Engine: """An engine executes a :class:`~story_graph.models.Graph` for a given :class:`~stream.models.StreamPoint`. Executing means to iterate over the :class:`~story_graph.models.Node` and executing each :class:`~story_graph.models.ScriptCell` within such a node. The engine runs in an async manner so it is possible to do awaits without blocking the server, which means execution is halted until a specific condition is met. :param graph: The graph to execute :param stream: The stream where the graph should be executed on :param raise_exceptions: Decides if an exception within e.g. a Python script cell can bring down the execution or if it ignores it but logs it. Defaults to False so an invalid Python script cell does not stop the whole graph. :param run_cleanup_procedure: If ``True`` it executes ``CmdPeriod.run`` on the SuperCollider server in order to clear all running sounds, patterns and any left running tasks, creating a clean environment. The default is ``None`` which will derive the necessary action based if there are already users on the stream (in which case no reset will be executed). """
[docs] def __init__( self, graph: Graph, stream: Stream, raise_exceptions: bool = False, run_cleanup_procedure: Optional[bool] = None, ) -> None: self.graph: Graph = graph self.stream = stream self._current_node: Node self.blocking_time: int = 60 * 60 * 3 self.raise_exceptions = raise_exceptions self.run_cleanup_procedure: bool if run_cleanup_procedure is not None: self.run_cleanup_procedure = run_cleanup_procedure else: self.run_cleanup_procedure = self.stream.num_listeners == 0 log.debug(f"Started engine for graph {self.graph.uuid}")
[docs] async def get_stream_variables(self) -> Dict[str, str]: """ Returns the associated :class:`~stream.models.StreamVariable` within this :class:`~stream.models.Stream` session. .. todo:: Could be a @property but this can be difficult in async contexts so we use explicit async via a getter method. """ v = {} stream_variable: StreamVariable async for stream_variable in self.stream.variables.all(): v[stream_variable.key] = stream_variable.value return v
[docs] async def wait_for_stream_variable( self, name: str, timeout: float = 100.0, update_speed: float = 0.5 ): """Waits for a stream variable to be set. If the variable was not found/set within the time period of ``timeout`` this function will raise the exception :class:`ScriptCellTimeout`. .. danger:: Within a script cell it is necessary to await this async function .. code-block:: python await wait_for_stream_variable('start') """ log.debug(f"Wait for stream variable {name}") start_time = datetime.now() while True: if (datetime.now() - start_time).seconds > timeout: raise ScriptCellTimeout() if name in (await self.get_stream_variables()).keys(): break await asyncio.sleep(update_speed)
[docs] async def execute_markdown_code(self, cell_code: str): """Runs the code of a markdown cell by parsing its content with the :class:`~story_graph.markdown_parser.GencasterRenderer`. """ log.debug(f"Execute markdown code '{cell_code}'") ssml_text = md_to_ssml(cell_code, await self.get_stream_variables()) instruction = await sync_to_async(self.stream.stream_point.speak_on_stream)( ssml_text ) yield instruction await self.wait_for_finished_instruction(instruction)
[docs] async def execute_sc_code( self, cell_code: str ) -> AsyncGenerator[StreamInstruction, None]: """Executes a SuperCollider code cell""" log.debug(f"Run SuperCollider code '{cell_code}'") instruction = await sync_to_async( self.stream.stream_point.send_raw_instruction )(cell_code) yield instruction await self.wait_for_finished_instruction(instruction)
[docs] async def execute_audio_cell( self, audio_cell: AudioCell ) -> AsyncGenerator[StreamInstruction, None]: """ Plays the associated :class:`~stream.models.AudioFile` of an :class:`~story_graph.models.AudioCell`. .. todo:: This does not respect the different Playback formats """ # text field has no enum restrictions but the database enforces this log.debug(f"Run audio cell {audio_cell.uuid}") instruction = await sync_to_async(self.stream.stream_point.play_audio_file)( audio_cell.audio_file, audio_cell.playback # type: ignore ) yield instruction await self.wait_for_finished_instruction(instruction)
[docs] @staticmethod def get_engine_global_vars( runtime_values: Optional[Dict[str, Any]] = None ) -> Dict[str, Dict[str, Any]]: """Generates the dictionary which contains all objects which are available for the execution engine of the graph. This acts as a security measurement. .. important:: If anything is changed here please execute .. code:: make engine-variables-json which will create an updated autocomplete JSON for the editor. :param runtime_values: Allows to add additional objects to the module namespace at runtime. These are injected within :func:`~story_graph.engine.Engine.execute_python_cell` and consist of .. list-table:: Runtime vars :header-rows: 1 * - key - value - info * - ``loop`` - loop - the current asyncio loop - can be used to execute additional async code * - ``vars`` - a dictionary of all stream variables - See :func:`~story_graph.engine.Engine.get_stream_variables` * - ``self`` - Current :class:`~story_graph.engine.Engine` instance - * - ``get_stream_variables`` - Callable - See :func:`~story_graph.engine.Engine.get_stream_variables` * - ``wait_for_stream_variable`` - Callable - See :func:`~story_graph.engine.Engine.wait_for_stream_variable` """ runtime_values = runtime_values if runtime_values else {} return { "__builtins__": { "asyncio": asyncio, "int": int, "float": float, "print": print, "time": time, "datetime": datetime, "timedelta": timedelta, "Text": Text, "Dialog": Dialog, "Button": Button, "Checkbox": Checkbox, "Input": Input, "list": list, "random": random, **runtime_values, } }
[docs] async def execute_python_cell(self, cell_code: str) -> AsyncGenerator[Dialog, None]: """Executes a python :class:`~story_graph.models.ScriptCell`. A python cell is run as an async generator, which allows to not just run synchronous code but also asynchronous mode. It is possible to yield immediate results from this. Currently only the yielding of a :class:`~stream.frontend_types.Dialog` instance is possible, but this could be extended. In order to secure at least a little bit the execution within such a script cell everything that is a available for execution needs to be stated explicitly here. """ log.debug(f"Run python code '{cell_code}'") stream_variables = await self.get_stream_variables() old_stream_variables = deepcopy(stream_variables) loop = asyncio.get_running_loop() try: loc: Dict[str, Any] = {} exec( # wrap the script cell in an async function f"async def __ex(): " + "".join(f"\n {l}" for l in (cell_code.split("\n") + ["yield None"])), # global variables which are module scoped - they can not be # overwritten, avoiding any kind of messing with the # internal engine self.get_engine_global_vars( { # please note any runtime variables changes also in # story_graph/management/commands/get_engine_vars.py # as this will generate the necessary JSON for # the autocomplete within the editor "loop": loop, "vars": stream_variables, "self": self, "get_stream_variables": self.get_stream_variables, "wait_for_stream_variable": self.wait_for_stream_variable, "requests": requests, } ), # locals which mirror the current namespace and allow for modification # and storing of values loc, ) # execute the wrapped async script cell code in our asyncio runtime # yielding allows to yield such things like a request for a Dialog async for x in loc["__ex"](): # avoid yielding none if x: yield x except Exception as e: log.error(f"Occured an exception during graph engine execution: {e}") if self.raise_exceptions: raise e # @todo # * skip functions / only use scalars for k, v in stream_variables.items(): # unset value is a hack b/c none maybe a desired state # @todo switch to async bulk create if old_stream_variables.get(k, "__unset_value__") != v: log.debug(f"New stream variable: {k} -> {v}") await StreamVariable.objects.aupdate_or_create( stream=self.stream, key=k, defaults={"value": v}, ) stream_variables.get("return", None)
async def wait_for_finished_instruction( self, instruction: StreamInstruction, timeout: float = 300.0, interval: float = 0.2, ) -> None: log.debug(f"Wait for finished instruction {instruction.uuid}") for _ in range(int(timeout / interval)): await sync_to_async(instruction.refresh_from_db)() if instruction.state == StreamInstruction.InstructionState.FINISHED: return await asyncio.sleep(interval) log.info(f"Timed out on waiting for stream instruction {instruction.uuid}") raise asyncio.TimeoutError()
[docs] async def execute_node( self, node: Node, blocking_sleep_time: int = 10000 ) -> AsyncGenerator[Union[StreamInstruction, Dialog], None]: """Executes all :class:`~story_graph.models.ScriptCell` of a given :class:`~story_graph.models.Node`.""" log.debug(f"Executing node {node.uuid}") script_cell: ScriptCell instruction: Union[StreamInstruction, Dialog] async for script_cell in node.script_cells.select_related("audio_cell", "audio_cell__audio_file").all(): # type: ignore cell_type = script_cell.cell_type if cell_type == CellType.COMMENT: continue elif cell_type == CellType.PYTHON: if script_cell.cell_code: async for instruction in self.execute_python_cell( script_cell.cell_code ): yield instruction elif cell_type == CellType.SUPERCOLLIDER: async for instruction in self.execute_sc_code(script_cell.cell_code): yield instruction elif cell_type == CellType.MARKDOWN: async for instruction in self.execute_markdown_code( script_cell.cell_code ): yield instruction elif cell_type == CellType.AUDIO: if script_cell.audio_cell: async for instruction in self.execute_audio_cell( script_cell.audio_cell ): yield instruction else: log.error(f"Occured invalid/unknown CellType {cell_type}")
async def _evaluate_python_code(self, code: str) -> bool: stream_variables = await self.get_stream_variables() try: r = eval( code, self.get_engine_global_vars( { "loop": asyncio.get_event_loop(), "vars": stream_variables, "self": self, "get_stream_variables": self.get_stream_variables, "wait_for_stream_variable": self.wait_for_stream_variable, } ), ) except Exception: raise InvalidPythonCode() if not isinstance(r, bool): log.debug(f"Return type of '{code}' is not a boolean but {type(r)}") raise InvalidPythonCode() return r
[docs] async def get_next_node(self) -> Node: """Iterates over each exit :class:`~NodeDoor` of the current node and evaluates its boolean value and decides. If the node door code consists of invalid code it will be skipped. If all boolean evaluations result in ``False`` or invalid code, the default exit will be used. If multiple out-going edges are connected to an active door, a random edge will be picked to follow for the next node. If the node does not have any out-going edges a :class:`~GraphDeadEnd` exception will be raised. """ exit_door: Optional[NodeDoor] async for node_door in NodeDoor.objects.filter( node=self._current_node, door_type=NodeDoor.DoorType.OUTPUT, ).prefetch_related("node"): try: active_exit = await self._evaluate_python_code(node_door.code) # a broad exception because many things can go wrong here while evaluating # python code (e.g. even raising a custom exception), therefore we catch all # possible exceptions here except Exception as e: log.debug( f"Exception raised on evaluating code of node door {node_door}: {e}" ) continue if active_exit: log.debug(f"Choose exit {node_door} on {self._current_node}") exit_door = node_door break else: log.debug(f"Fallback to default node door on {self._current_node}") exit_door = await NodeDoor.objects.filter( node=self._current_node, door_type=NodeDoor.DoorType.OUTPUT, is_default=True, ).afirst() # else return default out while True: if exit_door is None: raise GraphDeadEnd() try: return (await exit_door.out_edges.order_by("?").select_related("in_node_door__node").afirst()).in_node_door.node # type: ignore except AttributeError: if exit_door.is_default: raise GraphDeadEnd() log.info( f"Ran into a dead end on non-default door {exit_door.name} on node {self._current_node.name} - fallback to default door" ) exit_door = await NodeDoor.objects.filter( node=self._current_node, door_type=NodeDoor.DoorType.OUTPUT, is_default=True, ).afirst()
async def cleanup_sc_procedure(self) -> StreamInstruction: log.debug("Run cleanup procedure on graph") # do not wait for the execution because the OSC receiver callback # may b down because of CmdPeriod and it takes time to recover # from CmdPeriod instruction = await sync_to_async( self.stream.stream_point.send_raw_instruction )("0.01.wait;CmdPeriod.run;0.01.wait;") # wait for the CmdPeriod to re-init the OSC receiver callback await asyncio.sleep(0.2) return instruction
[docs] async def start( self, max_steps: int = 1000 ) -> AsyncGenerator[Union[StreamInstruction, Dialog, GraphDeadEnd], None]: """Starts the execution of the engine. This method is an async generator which eithor yields a :class:`~stream.models.StreamInstruction` or a :class:`~stream.frontend_types.Dialog`. .. note:: In order to avoid a clumping of the database a lay off period of 0.1 seconds is added between jumping nodes. """ self._current_node = await self.graph.aget_entry_node() if self.run_cleanup_procedure: await self.cleanup_sc_procedure() for _ in range(max_steps): async for instruction in self.execute_node(self._current_node): yield instruction if self._current_node.is_blocking_node: log.info("Accessed a blocking node") await asyncio.sleep(self.blocking_time) # search for next node try: self._current_node = await self.get_next_node() except GraphDeadEnd: log.info(f"Ran into a dead end on {self.graph} on {self._current_node}") return await asyncio.sleep(0.1) else: log.info(f"Reached maximum steps on graph {self.graph} - stop execution")