Module surrealpy.ws.event

Expand source code
import asyncio
import enum
from functools import partial
import queue
import threading
from types import NoneType
from typing import Any, Callable, Optional, Union
import uuid
from .models import SurrealResponse

__all__ = ("Event", "Events", "managers","EventManager")

managers: dict[str, "EventManager"] = {}


class Events(enum.Enum):
    """
    A class used to represent SurrealDB events.
    ...
    Attributes
    ----------
    CONNECTED: str
        The event that is called when the client is connected to the server
    DISCONNECTED: str
        The event that is called when the client is disconnected from the server
    LOGIN: str
        The event that is called while the client is logging to the server
    LOGOUT: str
        The event that is called when the client is logged out from the server
    LOGGED_IN: str
        The event that is called when the client is logged in to the server
    USE: str
        The event that is called when the client is switched to a different database or collection
    RECEIVED: str
        The event that is called when the client receives a response from the server
    ERROR: str
        The event that is called when the client receives an error from the server
    """

    CONNECTED = "connected"  # Implemented
    DISCONNECTED = "disconnected"  # Implemented
    LOGIN = "login"  # Implemented
    LOGGED_IN = "logged_in"  # Implemented
    LOGOUT = "logout"  # Not implemented
    USE = "use"  # Implemented
    RECEIVED = "received"  # Implemented
    ERROR = "error"  # Not Completely Implemented


class Event:
    """
    A class used to represent SurrealDB event.
    ...
    
    Attributes
    ----------
    event: Events
        The event that is called
    response: SurrealResponse
        The response that is returned
    """

    def __init__(
        self,
        event: Events,
        response: Optional[Union[SurrealResponse, Any]] = None,
        *,
        id: str = None,
    ):
        self.id: str = id or response.id
        self.event: Events = event
        self.response: Optional[Union[SurrealResponse, Any]] = response

    def __repr__(self):
        return f"Event({self.event}, {self.response})"


class EventManager:
    """
    A class used to represent SurrealDB events manager.
    ...
    Attributes
    ----------
    events: dict[str,set]
        A dict of events and their callbacks

    Methods
    -------
    add_event(event: str, callback: Callable[[SurrealResponse], Any])
        Add an event to the events manager.
    remove_event(event: str, callback: Callable[[SurrealResponse], Any])
        Remove an event from the events manager.
    clear()
        Clear all events from the events manager.
    emit(event: str, data: SurrealResponse)
        Emit an event to the events manager.
    on(event: Union[str, Callable[[SurrealResponse], Any]])
        A decorator to add an event to the events manager.
    """

    def __init__(self):
        """
        Initialize the events manager. This is called automatically.
        """
        self._name = uuid.uuid4().hex
        managers[self._name] = self
        self.events: dict[str, set] = {}
        self._async_loop = asyncio.get_event_loop()
        self._event_queue = queue.Queue()
        self._event_thread = threading.Thread(target=self._handle_events, daemon=True)
        self._event_thread.start()
        # iterates over all events and adds them to the events manager with the default callback function
        for name, func in self.__class__.__dict__.items():

            # if the name of the function starts with on_ then it is an event and it is added to the events manager
            if name.startswith("on_"):
                # the event name is the name of the function without the on_ prefix and the callback function is the function itself
                # To make class method work, we need to set the callback function to a partial function with the class instance as the first argument
                self.add_event(name[3:], partial(func, self))

    def _handle_events(self):
        """
        Handle events. This is called automatically. The event is emitted to the event queue and then handled by the event loop.
        """
        while True:
            # event is partially called function that is called when the event is emitted
            event: partial = self._event_queue.get()

            event()

    def add_event(self, event: str, callback: Callable[[SurrealResponse], Any]):
        """
        Add an event to the events manager.

        Parameters
        ----------
        event: str
            The event name
        callback: Callable[[SurrealResponse], Any]
            The callback function
        """
        if event not in self.events:
            self.events[event] = set()
        # check if callback is async

        self.events[event].add(callback)

    def remove_event(self, event: str, callback: Callable[[SurrealResponse], Any]):
        """
        Remove an event from the events manager.
        Parameters
        ----------
        event: str
            The event name
        callback: Callable[[SurrealResponse], Any]
            The callback function
        """
        if event in self.events:
            self.events[event].remove(callback)
        else:
            raise ValueError(f"Event {event} is not registered")
        if "all" in self.events and callback in self.events["all"]:
            self.events["all"].remove(callback)

    def clear(self):
        """
        Clear all events from the events manager.
        """
        self.events.clear()

    def emit(self, event: str, data: "Event"):
        """
        Emit an event to the events manager.
        Parameters
        ----------
        event: str
            The event name
        data: SurrealResponse
            The data to pass to the callback function
        """

        if isinstance(event, Events):
            event = event.value

        if event in self.events:
            for callback in self.events[event]:
                # check if callback is async
                if asyncio.iscoroutinefunction(callback):
                    self._async_loop.run_until_complete(callback(data))
                else:

                    self._event_queue.put(partial(callback, data))
            for callback in self.events.get("all", []):
                # check if callback is async
                if asyncio.iscoroutinefunction(callback):
                    self._async_loop.run_until_complete(callback(data))
                else:
                    self._event_queue.put(partial(callback, data))

    def on(self, event: Union[str, Callable[[Event], None]]) -> Callable:
        """
        A decorator to add an event to the events manager.
        Parameters
        ----------
        event: Union[str,Callable]
            The event name or the callback function
        Returns
        -------
        Callable
            The decorator
        """
        if callable(event):
            self.add_event(event.__name__, event)
            return event
        else:

            def decorator(func: Callable):
                self.add_event(event, func)
                return func

            return decorator

    def __del__(self):

        del managers[self._name]

Classes

class Event (event: Events, response: Union[surrealpy.ws.models.SurrealResponse, Any, ForwardRef(None)] = None, *, id: str = None)

A class used to represent SurrealDB event. …

Attributes

event : Events
The event that is called
response : SurrealResponse
The response that is returned
Expand source code
class Event:
    """
    A class used to represent SurrealDB event.
    ...
    
    Attributes
    ----------
    event: Events
        The event that is called
    response: SurrealResponse
        The response that is returned
    """

    def __init__(
        self,
        event: Events,
        response: Optional[Union[SurrealResponse, Any]] = None,
        *,
        id: str = None,
    ):
        self.id: str = id or response.id
        self.event: Events = event
        self.response: Optional[Union[SurrealResponse, Any]] = response

    def __repr__(self):
        return f"Event({self.event}, {self.response})"
class EventManager

A class used to represent SurrealDB events manager. … Attributes


events : dict[str,set]
A dict of events and their callbacks

Methods

add_event(event: str, callback: Callable[[SurrealResponse], Any]) Add an event to the events manager. remove_event(event: str, callback: Callable[[SurrealResponse], Any]) Remove an event from the events manager. clear() Clear all events from the events manager. emit(event: str, data: SurrealResponse) Emit an event to the events manager. on(event: Union[str, Callable[[SurrealResponse], Any]]) A decorator to add an event to the events manager.

Initialize the events manager. This is called automatically.

Expand source code
class EventManager:
    """
    A class used to represent SurrealDB events manager.
    ...
    Attributes
    ----------
    events: dict[str,set]
        A dict of events and their callbacks

    Methods
    -------
    add_event(event: str, callback: Callable[[SurrealResponse], Any])
        Add an event to the events manager.
    remove_event(event: str, callback: Callable[[SurrealResponse], Any])
        Remove an event from the events manager.
    clear()
        Clear all events from the events manager.
    emit(event: str, data: SurrealResponse)
        Emit an event to the events manager.
    on(event: Union[str, Callable[[SurrealResponse], Any]])
        A decorator to add an event to the events manager.
    """

    def __init__(self):
        """
        Initialize the events manager. This is called automatically.
        """
        self._name = uuid.uuid4().hex
        managers[self._name] = self
        self.events: dict[str, set] = {}
        self._async_loop = asyncio.get_event_loop()
        self._event_queue = queue.Queue()
        self._event_thread = threading.Thread(target=self._handle_events, daemon=True)
        self._event_thread.start()
        # iterates over all events and adds them to the events manager with the default callback function
        for name, func in self.__class__.__dict__.items():

            # if the name of the function starts with on_ then it is an event and it is added to the events manager
            if name.startswith("on_"):
                # the event name is the name of the function without the on_ prefix and the callback function is the function itself
                # To make class method work, we need to set the callback function to a partial function with the class instance as the first argument
                self.add_event(name[3:], partial(func, self))

    def _handle_events(self):
        """
        Handle events. This is called automatically. The event is emitted to the event queue and then handled by the event loop.
        """
        while True:
            # event is partially called function that is called when the event is emitted
            event: partial = self._event_queue.get()

            event()

    def add_event(self, event: str, callback: Callable[[SurrealResponse], Any]):
        """
        Add an event to the events manager.

        Parameters
        ----------
        event: str
            The event name
        callback: Callable[[SurrealResponse], Any]
            The callback function
        """
        if event not in self.events:
            self.events[event] = set()
        # check if callback is async

        self.events[event].add(callback)

    def remove_event(self, event: str, callback: Callable[[SurrealResponse], Any]):
        """
        Remove an event from the events manager.
        Parameters
        ----------
        event: str
            The event name
        callback: Callable[[SurrealResponse], Any]
            The callback function
        """
        if event in self.events:
            self.events[event].remove(callback)
        else:
            raise ValueError(f"Event {event} is not registered")
        if "all" in self.events and callback in self.events["all"]:
            self.events["all"].remove(callback)

    def clear(self):
        """
        Clear all events from the events manager.
        """
        self.events.clear()

    def emit(self, event: str, data: "Event"):
        """
        Emit an event to the events manager.
        Parameters
        ----------
        event: str
            The event name
        data: SurrealResponse
            The data to pass to the callback function
        """

        if isinstance(event, Events):
            event = event.value

        if event in self.events:
            for callback in self.events[event]:
                # check if callback is async
                if asyncio.iscoroutinefunction(callback):
                    self._async_loop.run_until_complete(callback(data))
                else:

                    self._event_queue.put(partial(callback, data))
            for callback in self.events.get("all", []):
                # check if callback is async
                if asyncio.iscoroutinefunction(callback):
                    self._async_loop.run_until_complete(callback(data))
                else:
                    self._event_queue.put(partial(callback, data))

    def on(self, event: Union[str, Callable[[Event], None]]) -> Callable:
        """
        A decorator to add an event to the events manager.
        Parameters
        ----------
        event: Union[str,Callable]
            The event name or the callback function
        Returns
        -------
        Callable
            The decorator
        """
        if callable(event):
            self.add_event(event.__name__, event)
            return event
        else:

            def decorator(func: Callable):
                self.add_event(event, func)
                return func

            return decorator

    def __del__(self):

        del managers[self._name]

Methods

def add_event(self, event: str, callback: Callable[[surrealpy.ws.models.SurrealResponse], Any])

Add an event to the events manager.

Parameters

event : str
The event name
callback : Callable[[SurrealResponse], Any]
The callback function
Expand source code
def add_event(self, event: str, callback: Callable[[SurrealResponse], Any]):
    """
    Add an event to the events manager.

    Parameters
    ----------
    event: str
        The event name
    callback: Callable[[SurrealResponse], Any]
        The callback function
    """
    if event not in self.events:
        self.events[event] = set()
    # check if callback is async

    self.events[event].add(callback)
def clear(self)

Clear all events from the events manager.

Expand source code
def clear(self):
    """
    Clear all events from the events manager.
    """
    self.events.clear()
def emit(self, event: str, data: Event)

Emit an event to the events manager. Parameters


event : str
The event name
data : SurrealResponse
The data to pass to the callback function
Expand source code
def emit(self, event: str, data: "Event"):
    """
    Emit an event to the events manager.
    Parameters
    ----------
    event: str
        The event name
    data: SurrealResponse
        The data to pass to the callback function
    """

    if isinstance(event, Events):
        event = event.value

    if event in self.events:
        for callback in self.events[event]:
            # check if callback is async
            if asyncio.iscoroutinefunction(callback):
                self._async_loop.run_until_complete(callback(data))
            else:

                self._event_queue.put(partial(callback, data))
        for callback in self.events.get("all", []):
            # check if callback is async
            if asyncio.iscoroutinefunction(callback):
                self._async_loop.run_until_complete(callback(data))
            else:
                self._event_queue.put(partial(callback, data))
def on(self, event: Union[str, Callable[[Event], None]]) ‑> Callable

A decorator to add an event to the events manager. Parameters


event : Union[str,Callable]
The event name or the callback function

Returns

Callable
The decorator
Expand source code
def on(self, event: Union[str, Callable[[Event], None]]) -> Callable:
    """
    A decorator to add an event to the events manager.
    Parameters
    ----------
    event: Union[str,Callable]
        The event name or the callback function
    Returns
    -------
    Callable
        The decorator
    """
    if callable(event):
        self.add_event(event.__name__, event)
        return event
    else:

        def decorator(func: Callable):
            self.add_event(event, func)
            return func

        return decorator
def remove_event(self, event: str, callback: Callable[[surrealpy.ws.models.SurrealResponse], Any])

Remove an event from the events manager. Parameters


event : str
The event name
callback : Callable[[SurrealResponse], Any]
The callback function
Expand source code
def remove_event(self, event: str, callback: Callable[[SurrealResponse], Any]):
    """
    Remove an event from the events manager.
    Parameters
    ----------
    event: str
        The event name
    callback: Callable[[SurrealResponse], Any]
        The callback function
    """
    if event in self.events:
        self.events[event].remove(callback)
    else:
        raise ValueError(f"Event {event} is not registered")
    if "all" in self.events and callback in self.events["all"]:
        self.events["all"].remove(callback)
class Events (value, names=None, *, module=None, qualname=None, type=None, start=1)

A class used to represent SurrealDB events. … Attributes


CONNECTED : str
The event that is called when the client is connected to the server
DISCONNECTED : str
The event that is called when the client is disconnected from the server
LOGIN : str
The event that is called while the client is logging to the server
LOGOUT : str
The event that is called when the client is logged out from the server
LOGGED_IN : str
The event that is called when the client is logged in to the server
USE : str
The event that is called when the client is switched to a different database or collection
RECEIVED : str
The event that is called when the client receives a response from the server
ERROR : str
The event that is called when the client receives an error from the server
Expand source code
class Events(enum.Enum):
    """
    A class used to represent SurrealDB events.
    ...
    Attributes
    ----------
    CONNECTED: str
        The event that is called when the client is connected to the server
    DISCONNECTED: str
        The event that is called when the client is disconnected from the server
    LOGIN: str
        The event that is called while the client is logging to the server
    LOGOUT: str
        The event that is called when the client is logged out from the server
    LOGGED_IN: str
        The event that is called when the client is logged in to the server
    USE: str
        The event that is called when the client is switched to a different database or collection
    RECEIVED: str
        The event that is called when the client receives a response from the server
    ERROR: str
        The event that is called when the client receives an error from the server
    """

    CONNECTED = "connected"  # Implemented
    DISCONNECTED = "disconnected"  # Implemented
    LOGIN = "login"  # Implemented
    LOGGED_IN = "logged_in"  # Implemented
    LOGOUT = "logout"  # Not implemented
    USE = "use"  # Implemented
    RECEIVED = "received"  # Implemented
    ERROR = "error"  # Not Completely Implemented

Ancestors

  • enum.Enum

Class variables

var CONNECTED
var DISCONNECTED
var ERROR
var LOGGED_IN
var LOGIN
var LOGOUT
var RECEIVED
var USE