Source code for osc_server.server

"""
Server
======

A naive server to receive OSC messages from SuperCollider.
"""

import logging
import os
from typing import Any, Callable, Dict, List, Optional, Tuple

import django

django.setup()  # type: ignore

from django.utils import timezone
from pythonosc.dispatcher import Dispatcher
from pythonosc.osc_server import BlockingOSCUDPServer

from story_graph.markdown_parser import md_to_ssml
from stream.models import StreamInstruction, StreamPoint, TextToSpeech

from .models import (
    RemoteActionMessage,
    RemoteActionType,
    SCAcknowledgeMessage,
    SCBeaconMessage,
)

log = logging.getLogger(__name__)


[docs]class OSCServer: """ Uses pythonosc to map the routes .. list-table:: Routes :header-rows: 1 * - route - method - message * - `/beacon` - :func:`~OSCServer.beacon_handler` - :ref:`OSC beacon message` * - `/acknowledge_handler` - :func:`~OSCServer.acknowledge_handler` - :ref:`OSC acknowledge message` * - `/remote/action` - :func:`~OSCServer.remote_action_handler` - :ref:`OSC remote action message` """
[docs] def __init__(self): self._dispatcher: Optional[Dispatcher] = None self.mapper: Dict[str, Callable[[Tuple[str, int], str, List[Any]], None]] = { "/beacon": self.beacon_handler, "/acknowledge": self.acknowledge_handler, "/remote/action": self.remote_action_handler, }
def serve_blocking(self, host: str = "0.0.0.0", port: int = 7000): dispatcher = self.get_dispatcher() server = BlockingOSCUDPServer((host, port), dispatcher) try: log.info( f"Start blocking OSC server on port {port} with routes: {', '.join([k for k in dispatcher._map.keys()])}" ) server.serve_forever() except KeyboardInterrupt: log.info("Received stop signal") server.shutdown() finally: log.info("Close server") server.server_close()
[docs] def beacon_handler( self, client_address: Tuple[str, int], address: str, *osc_args: List[Any] ) -> None: """Accepts a beacon from SuperCollider and creates a :class:`~StreamPoint` from it so it gets discovered by the backend. """ beacon_message = SCBeaconMessage.from_osc_args(*osc_args) point: StreamPoint point, created = StreamPoint.objects.get_or_create( host=client_address[0], port=beacon_message.lang_port, ) point.last_live = timezone.now() point.use_input = beacon_message.use_input point.janus_in_port = beacon_message.janus_in_port point.janus_out_port = beacon_message.janus_out_port point.janus_in_room = beacon_message.janus_in_room point.janus_out_room = beacon_message.janus_out_room point.janus_public_ip = beacon_message.janus_public_ip point.sc_name = beacon_message.name point.save() if created: log.info(f"Found new stream point: {point}") else: log.debug(f"Received live signal from {point}")
[docs] def acknowledge_handler( self, client_address: Tuple[str, int], address: str, *osc_args: List[Any] ) -> None: """Acknowledges a message and updates its associated :class:`~story_graph.models.StreamInstruction`. """ ack_message = SCAcknowledgeMessage.from_osc_args(*osc_args) try: stream_instruction: StreamInstruction = StreamInstruction.objects.get( uuid=ack_message.uuid, ) except StreamInstruction.DoesNotExist: log.error(f"Could not find StreamInstruction with UUID {ack_message.uuid}") return stream_instruction.state = StreamInstruction.InstructionState.from_sc_string( ack_message.status ) if ack_message.return_value: stream_instruction.return_value = ack_message.return_value stream_instruction.save()
[docs] def remote_action_handler( self, client_address: Tuple[str, int], address: str, *osc_args: List[Any] ) -> None: """Remote actions are used to trigger actions on a SuperCollider instance and can be send in form of a :class:`~stream.models.StreamInstruction` or raw from a local running SuperCollider instance which can be used to live code the SuperCollider instances managed by Gencaster. """ remote_message = RemoteActionMessage.from_osc_args(*osc_args) stream_points = StreamPoint.objects.free_stream_points().filter() if remote_message.target: stream_points = stream_points.filter(janus_out_room=remote_message.target) if stream_points.count() == 0: log.error( f"Could not find active matching streaming point with filter {remote_message.target}" ) return if remote_message.action == RemoteActionType.code: log.info(f"Execute on {remote_message.target}: '{remote_message.cmd}'") for stream_point in stream_points.all(): stream_point.send_raw_instruction(remote_message.cmd) elif remote_message.action == RemoteActionType.speak: log.info(f"Speak on {remote_message.target}: '{remote_message.cmd}'") # to allow for caching when sending to multiple stream_points # we create the text here so the streaming points can rely on the cache ssml_text = md_to_ssml(remote_message.cmd) TextToSpeech.create_from_text(ssml_text) for stream_point in stream_points.all(): stream_point.speak_on_stream(ssml_text) else: log.critical(f"Unknown action {remote_message.action}")
def get_dispatcher(self) -> Dispatcher: if self._dispatcher is not None: return self._dispatcher self._dispatcher = Dispatcher() for address, callback in self.mapper.items(): self._dispatcher.map(address, callback, needs_reply_address=True) # type: ignore return self._dispatcher
if __name__ == "__main__": # pragma: no cover port = int(os.environ.get("BACKEND_OSC_PORT", 7000)) logging_level = os.environ.get("BACKEND_OSC_LOG_LEVEL", "INFO") log.setLevel(logging_level) server = OSCServer() server.serve_blocking(port=port)