Module surrealpy.ws

Expand source code
import dataclasses
import threading
import time
from typing import Any, Callable, Optional, Union
from websocket import create_connection, WebSocket
from surrealpy.ws.models import LoginParams, SurrealRequest, SurrealResponse
from surrealpy.utils import json_dumps, json_loads
from surrealpy.exceptions import SurrealError, WebSocketError
from surrealpy.ws import event
import atexit

__all__ = ("SurrealClient","SurrealClientThread")


def unthread(func):
    """
    Decorator to run a function in the main thread.

    Parameters
    ----------
    func : function
        The function to run in the main thread.

    Returns
    -------
    function
        The function to run in the main thread.
    """

    def wrapper(*args: Any, **kwargs: dict[str, Any]) -> Any:
        """
        Wrapper function.

        Parameters
        ----------
        *args : Any
            The arguments to pass to the function.
        **kwargs : dict[str,Any]
            The keyword arguments to pass to the function.

        Returns
        -------
        Any
            The return value of the function.

        Raises
        ------
        RuntimeError
            If the function is not run in the main thread.
        """
        assert (
            threading.current_thread() is threading.main_thread()
        ), "This function is not thread safe"

        return func(*args, **kwargs)

    return wrapper


class SurrealClient:
    """
    A class used to represent SurrealClient
    ...

    note: This class is not thread safe

    Attributes
    ----------
    url: str
        The url of the websocket server
    ws: WebSocket
        The websocket connection
    namespace: Optional[str]
        The namespace of the database
    database: Optional[str]
        The database name

    Methods
    -------
    connect() -> None
        Connect to the SurrealDB server
    disconnect() -> None
        Disconnect from the SurrealDB server
    ping() -> str
        Ping the SurrealDB server
    use(namespace: str, database: str) -> None
        Use a database
    info() -> dict[str, Any]
        Get current SurrealDB server's authentication info
    register(params: dict[str, Any]) -> str
        Signup to the SurrealDB server
    login(params: dict[str, Any]) -> None
        Login to the SurrealDB server
    invalidate() -> None
        Invalidate the current session
    authenticate(token: str) -> None
        Authenticate the current session
    killProcess(id: str) -> None
        Kill the given id process
    let(key: str, value: Any) -> None
        Set a let variable to the SurrealDB server
    query(sql: str, params: dict[str, Any]) -> list[dict[str, Any]]
        Query the SurrealDB server with the given query and params (optional) that returns a list of dict
    find(tid: str) -> list[dict[str, Any]]
        Find the given tid (table or record id) in the SurrealDB server and returns a list of dict
    find_one(tid: str) -> dict[str, Any]
        Find the given tid (table or record id) in the SurrealDB server and returns a dict
    create(tid: str, data: Union[Any, dict[str, Any]]) -> list[dict[str, Any]]
        Create a record in the SurrealDB server with the given tid (table id) and data, returns a list of dict of the created record(s)
    update(tid: str, data: Union[Any, dict[str, Any]]) -> list[dict[str, Any]]
        Update a record in the SurrealDB server with the given tid (table id) and data, returns a list of dict of the updated record(s)
    change(tid: str, data: Union[Any, dict[str, Any]]) -> list[dict[str, Any]]
        Change a record in the SurrealDB server with the given tid (table id) and data, returns a list of dict of the changed record(s)
    modify(tid: str, data: Union[Any, dict[str, Any]]) -> list[dict[str, Any]]
        Modify a record in the SurrealDB server with the given tid (table id) and data, returns a list of dict of the modified record(s)
    delete(tid: str) -> list[dict[str, Any]]
        Delete a record in the SurrealDB server with the given tid (table id), returns a list of dict of the deleted record(s)
    """

    def __init__(self, url):
        """
        Initialize the SurrealClient instance.

        Parameters
        ----------
        url : str
            The url of the websocket server
        """
        if url.startswith("http://"):
            url = url.replace("http://", "ws://")
            url = "ws://" + url
        elif url.startswith("https://"):
            url = url.replace("https://", "wss://")
            url = "wss://" + url
        self.__url: str = url
        self._ws: WebSocket
        self._counter: int = 0
        self._namespace: Optional[str] = None
        self._database: Optional[str] = None
        self._let_variables: list[str] = set()
        atexit.register(self._atexit)

    def _atexit(self):
        """
        This is a private function that is used to disconnect the websocket connection when the program exits. It is not recommended to use this function.
        """
        self.disconnect()

    def _count(self) -> str:
        """
        This is a private function that is used to count the number of requests. It is not recommended to use this function.

        Returns
        -------
        str
            The number of requests
        """
        self._counter += 1
        return str(self._counter)

    def _set_let(self, key: str):
        """
        This is a private function that is used to set the let variables. It is not recommended to use this function. May use in the future.

        Parameters
        ----------
        key: str
            The key of the let variable
        """
        self._let_variables.add(key)

    @property
    def ws(self) -> WebSocket:
        """The websocket connection.

        Returns
        -------
        WebSocket
            The websocket connection"""
        return self._ws

    @property
    def namespace(self):
        """The namespace of the database.

        Returns
        -------
        Optional[str]
            The namespace of the database
        """
        return self._namespace

    @property
    def database(self):
        """The database name.

        Returns
        -------
        Optional[str]
            The database name
        """
        return self._database

    @property
    def url(self):
        """The url of the websocket server.

        Returns
        -------
        str
            The url of the websocket server
        """
        return self.__url

    @unthread
    def connect(self) -> None:
        """
        Connect to the SurrealDB server.

        Raises
        ------
        WebSocketError
            If the connection is already established
        """
        if hasattr(self, "ws"):
            raise WebSocketError("Already connected")
        self._ws = create_connection(self.url)

    @unthread
    def disconnect(self) -> None:
        """
        Disconnects from the websocket server.

        Raises
        ------
        WebSocketError
            If the connection is not established

        """
        if not hasattr(self, "ws"):
            raise WebSocketError("Not connected")
        elif self.ws.connected:
            self.ws.close()
        else:
            raise WebSocketError("Not connected")

    def _raw_send(self, request: SurrealRequest) -> dict[str, Any]:
        """
        This is a private function that is used to send the request to the SurrealDB server. It is not recommended to use this function. May be deprecated in the future.

        Parameters
        ----------
        request: SurrealRequest
            The request to send to the SurrealDB server

        Raises
        ------
        WebSocketException
            If the connection is not established

        Returns
        -------
        dict[str, Any]
            The response from the SurrealDB server
        """
        self.ws.send(json_dumps(request))
        return json_loads(self.ws.recv())

    @unthread
    def _send(self, method: str, *params: Any) -> Any:
        """
        Sends a request to the websocket server

        Parameters
        ----------
        method: str
            The method of the request
        *params: Any
            The parameters of the request

        Raises
        ------
        WebSocketError
            if surrealDB server returns an error(s)

        Returns
        -------
        Any
            The response from the SurrealDB server
        """
        request = SurrealRequest(id=self._count(), method=method, params=params)
        self.ws.send(json_dumps(request))
        data = self.ws.recv()
        returnData = json_loads(data)
        if returnData.get("error") is not None:
            raise WebSocketError(returnData["error"])

        return returnData["id"], returnData["result"]

    def _clean_dict_params(self, params: dict[str, Any]) -> dict[str, Any]:
        """
        Clean the params of the request. It is not recommended to use this function. May cause unexpected behavior.

        Parameters
        ----------
        params: dict[str, Any]
            The params to clean

        Returns
        -------
        dict[str, Any]
            The cleaned params
        """
        return {k: v for k, v in params.items() if v is not None}

    def ping(self) -> bool:
        """
        Ping the SurrealDB server.

        Returns
        -------
        bool
            True if the SurrealDB server is alive else False
        """
        return self._send("ping")[1]

    def use(self, namespace: str, database: str) -> None:
        """
        Use a database.

        Parameters
        ----------
        namespace: str
            The namespace of the database
        database: str
            The database name

        """
        return self._send("use", namespace, database)[1]

    def register(self, params: dict[str, Any]) -> str:
        """
        Signup to the SurrealDB server.

        Parameters
        ----------
        params: dict[str, Any]
            The params of the request

        Returns
        -------
        str
            The token of the user
        """
        clean_params = self._clean_dict_params(params)
        return self._send("signup", clean_params)[1]

    def login(self, params: Union[dict[str, Any], LoginParams]) -> None:
        """
        Login to the SurrealDB server.

        Parameters
        ----------
        params: Union[dict[str, Any], LoginParams]
            The credentials of the login request
        """
        if isinstance(params, LoginParams):
            params = params.to_dict()
        clean_params = self._clean_dict_params(params)
        return self._send("signin", clean_params)[1]

    def invalidate(self) -> None:
        """
        Invalidate the current session.
        """
        return self._send("invalidate")[1]

    def authenticate(self, token: str):
        """Authenticate the current session."""
        return self._send("authenticate", token)[1]

    def killProcess(self, id: str) -> None:
        """
        Kill the current process.
        """
        return self._send("kill", id)[1]

    def let(self, key: str, value: Any) -> None:
        """Set a let variable."""

        return self._send("let", key, value)[1]

    def query(
        self,
        sql: str,
        params: Union[dict[str, Any], list[Any], tuple[Any], set[Any]] = None,
    ) -> list[dict[str, Any]]:
        """Query the SurrealDB server.

        Parameters
        ----------
        sql: str
            The sql query to execute on the SurrealDB server
        params: Any
            The params of the query (optional)

        Returns
        -------
        list[dict[str, Any]]
            The result of the query as a list of dictionaries (rows) with the column names as keys and the values as values of the dictionary (row)
        """
        if params is None:
            id, query_result = self._send("query", sql, params)
        elif isinstance(params, (list, tuple, set)):
            print(sql.format(*params))
            id, query_result = self._send("query", sql.format(*params))
        elif isinstance(params, dict):
            id, query_result = self._send("query", sql.format(**params))
        else:
            raise TypeError("params must be a list, tuple, set or dict")
        return SurrealResponse(id=id, results=[r["result"] for r in query_result])

    def find(self, tid: str) -> SurrealResponse:
        """Find documents by their ids or table name.
        Parameters
        ----------
        tid: str
            The id or table name of the document(s)

        Returns
        -------
        SurrealResponse
            The result of the query as a SurrealResponse object containing the id of the query and the results as a list of dictionaries (rows) with the column names as keys and the values as values of the dictionary (row)
        """
        returnData = self._send("select", tid)
        return SurrealResponse(id=returnData[0], results=returnData[1])

    def find_one(self, tid: str) -> Optional[SurrealResponse]:
        """Find a document by its id or table name.
        Parameters
        ----------
        tid: str
            The id or table name of the document

        Returns
        -------
        Optional[SurrealResponse]
            The result of the query as a SurrealResponse object containing the results as a list of dictionaries (rows) with the column names as keys and the values as values of the dictionary (row)

        """
        response = self.find(tid)
        if len(response.results) == 0:
            return None
        return response.results[0]

    def create(
        self, tid: str, data: Union[Any, dict[str, Any]]
    ) -> list[dict[str, Any]]:
        """Create a document.
        Parameters
        ----------
        tid: str
            The id or table name of the document
        data: Union[Any,dict[str,Any]]
            The data of the document

        Returns
        -------
        list[dict[str,Any]]
            The document(s) created as a list of dictionaries (rows) with the column names as keys and the values as values of the dictionary (row)

        """
        return self._send("create", tid, data)[1]

    def update(
        self, tid: str, data: Union[Any, dict[str, Any]]
    ) -> list[dict[str, Any]]:
        """Update a document.
        Parameters
        ----------
        tid: str
            The id or table name of the document
        data: Union[Any,dict[str,Any]]
            The data of the document

        Returns
        -------
        list[dict[str,Any]]
            The document(s) updated as a list of dictionaries (rows) with the column names as keys and the values as values of the dictionary (row)

        """
        return self._send("update", tid, data)[id]

    def change(
        self, tid: str, data: Union[Any, dict[str, Any]]
    ) -> list[dict[str, Any]]:
        """Change a document.
        Parameters
        ----------
        tid: str
            The id or table name of the document
        data: Union[Any,dict[str,Any]]
            The data of the document

        Returns
        -------
        list[dict[str,Any]]
            The document(s) changed as a list of dictionaries (rows) with the column names as keys and the values as values of the dictionary (row)

        """
        return self._send("change", tid, data)[1]

    def modify(
        self, tid: str, data: Union[Any, dict[str, Any]]
    ) -> list[dict[str, Any]]:
        """Modify a document.
        Parameters
        ----------
        tid: str
            The id or table name of the document
        data: Union[Any,dict[str,Any]]
            The data of the document

        Returns
        -------
        list[dict[str,Any]]
            The document(s) modified as a list of dictionaries (rows) with the column names as keys and the values as values of the dictionary (row)

        """
        return self._send("modify", tid, data)[1]

    def delete(self, tid: str) -> list[dict[str, Any]]:
        """Delete document(s) by given tid.
        Parameters
        ----------
        tid: str
            The id or table name of the document

        Returns
        -------
        list[dict[str,Any]]
            The document(s) deleted as a list of dictionaries (rows) with the column names as keys and the values as values of the dictionary (row)

        """
        return self._send("delete", tid)[1]

    @unthread
    def __enter__(self):
        """Enter the context manager.

        Returns
        -------
        SurrealClient
            The SurrealClient instance
        """
        self.connect()
        return self

    def __exit__(
        self, exc_type: Optional[Any], exc_val: Optional[Any], exc_tb: Optional[Any]
    ):
        """
        Exit the context manager.

        Parameters
        ----------
        exc_type: Any
            The exception type (optional) that was raised in the context manager (if any)
        exc_val: Any
            The exception value (optional) that was raised in the context manager (if any)
        exc_tb: Any
            The exception traceback (optional) that was raised in the context manager (if any)
        """
        self.disconnect()


class SurrealClientThread(SurrealClient):
    """
    A class used to represent SurrealClient
    ...

    note: This class is not thread safe

    Parameters
    ----------
    url: str
        The url of the SurrealDB server
    eventManager: Optional[event.EventManager]
        The event manager of the client (optional) if you want to use custom event manager (default is None)

    Attributes
    ----------
    url: str
        The url of the websocket server
    ws: WebSocket
        The websocket connection
    namespace: Optional[str]
        The namespace of the database
    database: Optional[str]
        The database name
    eventManager: Optional[event.EventManager]
        The event manager of the client (optional) if you want to use custom event manager (default is None)
    Methods
    -------
    connect() -> None:
        Connect to the SurrealDB server
    disconnect() -> None:
        Disconnect from the SurrealDB server
    ping() -> str:
        Ping the SurrealDB server
    use(namespace: str, database: str) -> None:
        Use a database
    info() -> dict[str, Any]:
        Get current SurrealDB server's authentication info
    register(params: dict[str, Any]) -> str
        Signup to the SurrealDB server
    login(params: dict[str, Any]) -> None
        Login to the SurrealDB server
    invalidate() -> None
        Invalidate the current session
    authenticate(token: str) -> None
        Authenticate the current session
    killProcess(id: str) -> None
        Kill the given id process
    let(key: str, value: Any) -> None
        Set a let variable to the SurrealDB server
    query(sql: str, params: dict[str, Any]) -> list[dict[str, Any]]
        Query the SurrealDB server with the given query and params (optional) that returns a list of dict
    find(tid: str) -> list[dict[str, Any]]
        Find the given tid (table or record id) in the SurrealDB server and returns a list of dict
    find_one(tid: str) -> dict[str, Any]
        Find the given tid (table or record id) in the SurrealDB server and returns a dict
    create(tid: str, data: Union[Any, dict[str, Any]]) -> list[dict[str, Any]]
        Create a record in the SurrealDB server with the given tid (table id) and data, returns a list of dict of the created record(s)
    update(tid: str, data: Union[Any, dict[str, Any]]) -> list[dict[str, Any]]
        Update a record in the SurrealDB server with the given tid (table id) and data, returns a list of dict of the updated record(s)
    change(tid: str, data: Union[Any, dict[str, Any]]) -> list[dict[str, Any]]
        Change a record in the SurrealDB server with the given tid (table id) and data, returns a list of dict of the changed record(s)
    modify(tid: str, data: Union[Any, dict[str, Any]]) -> list[dict[str, Any]]
        Modify a record in the SurrealDB server with the given tid (table id) and data, returns a list of dict of the modified record(s)
    delete(tid: str) -> list[dict[str, Any]]
        Delete a record in the SurrealDB server with the given tid (table id), returns a list of dict of the deleted record(s)
    on(event: Union[str, Callable]) -> Callable
        Decorator to register an event handler for the given event name or function name (if event is a function) to the EventManager and returns a function that can be used to unregister the event handler from the EventManager
    """

    def __init__(self, url: str, *, eventManager: Optional[event.EventManager] = None):
        """Initialize the SurrealClientThread instance.

        Parameters
        ----------
        url: str
            The url of the SurrealDB server

        """
        super().__init__(url)
        self._receive_thread: threading.Thread = threading.Thread(
            target=self._receive_responses, daemon=True
        )
        self._lock = threading.Lock()
        self._responses: dict[str, Any] = {}
        self._event_manager: event.EventManager = eventManager or event.EventManager()

        self.on = self._event_manager.on

    def count(self) -> int:
        """
        Count the number of responses.

        Returns
        -------
        int
            The number of responses
        """
        with self._lock:
            return len(self._responses)

    def _count(self) -> str:
        """
        This is a private function that is used to count the number of requests. It is not recommended to use this function.

        Returns
        -------
        str
            The number of requests
        """
        with self._lock:
            self._counter += 1
            return str(self._counter)

    def _receive_responses(self):
        """
        This is a private function that is used to receive responses from the SurrealDB server. It is not recommended to use this function.
        """
        while self.ws.connected:
            # Receive response from server and add it to the responses dictionary
            response = self._ws.recv()
            if response is None or response == "":
                # If the response is empty, then the connection has been closed
                continue
            # Parse the response as a dictionary
            response = json_loads(response)
            # Add the response to the responses dictionary
            self._responses[response["id"]] = response
            self._event_manager.emit(event.Events.RECEIVED, response)

    def _send(self, method: str, *params: Any) -> Any:
        """
        Sends a request to the websocket server

        Parameters
        ----------
        method: str
            The method of the request
        *params: Any
            The parameters of the request

        Raises
        ------
        WebSocketError
            if surrealDB server returns an error(s)

        Returns
        -------
        Any
            The response from the SurrealDB server
        """
        request = SurrealRequest(id=self._count(), method=method, params=params)
        self._ws.send(json_dumps(request))
        # Wait for the response to be received
        while request.id not in self._responses:
            time.sleep(0.01)
        # Get the response from the responses dictionary and remove it from the dictionary
        response = self._responses.pop(request.id, None)
        if response is None:
            # If the response is None, then the connection has been closed
            err = SurrealError("Response is None")
            # Emit the error event
            self._event_manager.emit(
                event.Events.ERROR,
                event.Event(
                    event=event.Events.ERROR, response=SurrealResponse("-1", (err,))
                ),
            )
            raise err
        if response.get("error") is not None:
            # If the response has an error, then raise the error and return None
            err = WebSocketError(response["error"])
            # Emit the error event
            self._event_manager.emit(
                event.Events.ERROR,
                event.Event(event=event.Events.ERROR, response=response, id=request.id),
            )
            raise err
        return response["id"], response["result"]

    def connect(self):
        """
        Connect to the SurrealDB server.

        Raises
        ------
        WebSocketError
            If the connection is already established
        """
        # check if thread is already running, if so, raise an error
        if self._receive_thread.is_alive():
            raise WebSocketError("Connection is already established")
        self._ws = create_connection(self.url)
        self._receive_thread.start()
        self._event_manager.emit(
            event.Events.CONNECTED,
            event.Event(
                event.Events.CONNECTED, response=SurrealResponse("-1", ("Connected",))
            ),
        )

    def login(self, params: Union[dict[str, Any], LoginParams]) -> None:
        """
        Login to the SurrealDB server.

        Parameters
        ----------
        params: Union[dict[str, Any], LoginParams]
            The credentials of the login request
        """
        self._event_manager.emit(
            event.Events.LOGIN,
            event.Event(
                event.Events.LOGIN, response=SurrealResponse("-1", ("Logging in...",))
            ),
        )
        if isinstance(params, LoginParams):
            params = params.to_dict()
        clean_params = self._clean_dict_params(params)
        result = (self._send("signin", clean_params)[1],)
        self._event_manager.emit(
            event.Events.LOGGED_IN,
            event.Event(event.Events.LOGGED_IN, response=SurrealResponse("-1", result)),
        )
        return result

    def use(self, namespace: str, database: str) -> None:
        """
        Use a database.

        Parameters
        ----------
        namespace: str
            The namespace of the database
        database: str
            The database name

        """
        result = self._send("use", namespace, database)
        self._event_manager.emit(
            event.Events.USE,
            event.Event(
                event.Events.USE,
                response=SurrealResponse(
                    result[0], {"namespace": namespace, "database": database}
                ),
                id=result[0],
            ),
        )
        return result[1]

    def disconnect(self):
        """
        Disconnect from the SurrealDB server.

        Raises
        ------
        WebSocketError
            If the connection is already closed
        """
        self._ws.close()
        self._receive_thread.join()
        self._event_manager.emit(
            event.Events.DISCONNECTED,
            event.Event(
                event.Events.DISCONNECTED,
                response=SurrealResponse("-1", ("Disconnected",)),
            ),
        )

    def __enter__(self):
        """Enter the context manager.

        Returns
        -------
        SurrealClient
            The SurrealClient instance
        """
        self.connect()
        return self

Sub-modules

surrealpy.ws.event
surrealpy.ws.models

Classes

class SurrealClient (url)

A class used to represent SurrealClient …

note: This class is not thread safe

Attributes

url : str
The url of the websocket server
ws : WebSocket
The websocket connection
namespace : Optional[str]
The namespace of the database
database : Optional[str]
The database name

Methods

connect() -> None Connect to the SurrealDB server disconnect() -> None Disconnect from the SurrealDB server ping() -> str Ping the SurrealDB server use(namespace: str, database: str) -> None Use a database info() -> dict[str, Any] Get current SurrealDB server's authentication info register(params: dict[str, Any]) -> str Signup to the SurrealDB server login(params: dict[str, Any]) -> None Login to the SurrealDB server invalidate() -> None Invalidate the current session authenticate(token: str) -> None Authenticate the current session killProcess(id: str) -> None Kill the given id process let(key: str, value: Any) -> None Set a let variable to the SurrealDB server query(sql: str, params: dict[str, Any]) -> list[dict[str, Any]] Query the SurrealDB server with the given query and params (optional) that returns a list of dict find(tid: str) -> list[dict[str, Any]] Find the given tid (table or record id) in the SurrealDB server and returns a list of dict find_one(tid: str) -> dict[str, Any] Find the given tid (table or record id) in the SurrealDB server and returns a dict create(tid: str, data: Union[Any, dict[str, Any]]) -> list[dict[str, Any]] Create a record in the SurrealDB server with the given tid (table id) and data, returns a list of dict of the created record(s) update(tid: str, data: Union[Any, dict[str, Any]]) -> list[dict[str, Any]] Update a record in the SurrealDB server with the given tid (table id) and data, returns a list of dict of the updated record(s) change(tid: str, data: Union[Any, dict[str, Any]]) -> list[dict[str, Any]] Change a record in the SurrealDB server with the given tid (table id) and data, returns a list of dict of the changed record(s) modify(tid: str, data: Union[Any, dict[str, Any]]) -> list[dict[str, Any]] Modify a record in the SurrealDB server with the given tid (table id) and data, returns a list of dict of the modified record(s) delete(tid: str) -> list[dict[str, Any]] Delete a record in the SurrealDB server with the given tid (table id), returns a list of dict of the deleted record(s)

Initialize the SurrealClient instance.

Parameters

url : str
The url of the websocket server
Expand source code
class SurrealClient:
    """
    A class used to represent SurrealClient
    ...

    note: This class is not thread safe

    Attributes
    ----------
    url: str
        The url of the websocket server
    ws: WebSocket
        The websocket connection
    namespace: Optional[str]
        The namespace of the database
    database: Optional[str]
        The database name

    Methods
    -------
    connect() -> None
        Connect to the SurrealDB server
    disconnect() -> None
        Disconnect from the SurrealDB server
    ping() -> str
        Ping the SurrealDB server
    use(namespace: str, database: str) -> None
        Use a database
    info() -> dict[str, Any]
        Get current SurrealDB server's authentication info
    register(params: dict[str, Any]) -> str
        Signup to the SurrealDB server
    login(params: dict[str, Any]) -> None
        Login to the SurrealDB server
    invalidate() -> None
        Invalidate the current session
    authenticate(token: str) -> None
        Authenticate the current session
    killProcess(id: str) -> None
        Kill the given id process
    let(key: str, value: Any) -> None
        Set a let variable to the SurrealDB server
    query(sql: str, params: dict[str, Any]) -> list[dict[str, Any]]
        Query the SurrealDB server with the given query and params (optional) that returns a list of dict
    find(tid: str) -> list[dict[str, Any]]
        Find the given tid (table or record id) in the SurrealDB server and returns a list of dict
    find_one(tid: str) -> dict[str, Any]
        Find the given tid (table or record id) in the SurrealDB server and returns a dict
    create(tid: str, data: Union[Any, dict[str, Any]]) -> list[dict[str, Any]]
        Create a record in the SurrealDB server with the given tid (table id) and data, returns a list of dict of the created record(s)
    update(tid: str, data: Union[Any, dict[str, Any]]) -> list[dict[str, Any]]
        Update a record in the SurrealDB server with the given tid (table id) and data, returns a list of dict of the updated record(s)
    change(tid: str, data: Union[Any, dict[str, Any]]) -> list[dict[str, Any]]
        Change a record in the SurrealDB server with the given tid (table id) and data, returns a list of dict of the changed record(s)
    modify(tid: str, data: Union[Any, dict[str, Any]]) -> list[dict[str, Any]]
        Modify a record in the SurrealDB server with the given tid (table id) and data, returns a list of dict of the modified record(s)
    delete(tid: str) -> list[dict[str, Any]]
        Delete a record in the SurrealDB server with the given tid (table id), returns a list of dict of the deleted record(s)
    """

    def __init__(self, url):
        """
        Initialize the SurrealClient instance.

        Parameters
        ----------
        url : str
            The url of the websocket server
        """
        if url.startswith("http://"):
            url = url.replace("http://", "ws://")
            url = "ws://" + url
        elif url.startswith("https://"):
            url = url.replace("https://", "wss://")
            url = "wss://" + url
        self.__url: str = url
        self._ws: WebSocket
        self._counter: int = 0
        self._namespace: Optional[str] = None
        self._database: Optional[str] = None
        self._let_variables: list[str] = set()
        atexit.register(self._atexit)

    def _atexit(self):
        """
        This is a private function that is used to disconnect the websocket connection when the program exits. It is not recommended to use this function.
        """
        self.disconnect()

    def _count(self) -> str:
        """
        This is a private function that is used to count the number of requests. It is not recommended to use this function.

        Returns
        -------
        str
            The number of requests
        """
        self._counter += 1
        return str(self._counter)

    def _set_let(self, key: str):
        """
        This is a private function that is used to set the let variables. It is not recommended to use this function. May use in the future.

        Parameters
        ----------
        key: str
            The key of the let variable
        """
        self._let_variables.add(key)

    @property
    def ws(self) -> WebSocket:
        """The websocket connection.

        Returns
        -------
        WebSocket
            The websocket connection"""
        return self._ws

    @property
    def namespace(self):
        """The namespace of the database.

        Returns
        -------
        Optional[str]
            The namespace of the database
        """
        return self._namespace

    @property
    def database(self):
        """The database name.

        Returns
        -------
        Optional[str]
            The database name
        """
        return self._database

    @property
    def url(self):
        """The url of the websocket server.

        Returns
        -------
        str
            The url of the websocket server
        """
        return self.__url

    @unthread
    def connect(self) -> None:
        """
        Connect to the SurrealDB server.

        Raises
        ------
        WebSocketError
            If the connection is already established
        """
        if hasattr(self, "ws"):
            raise WebSocketError("Already connected")
        self._ws = create_connection(self.url)

    @unthread
    def disconnect(self) -> None:
        """
        Disconnects from the websocket server.

        Raises
        ------
        WebSocketError
            If the connection is not established

        """
        if not hasattr(self, "ws"):
            raise WebSocketError("Not connected")
        elif self.ws.connected:
            self.ws.close()
        else:
            raise WebSocketError("Not connected")

    def _raw_send(self, request: SurrealRequest) -> dict[str, Any]:
        """
        This is a private function that is used to send the request to the SurrealDB server. It is not recommended to use this function. May be deprecated in the future.

        Parameters
        ----------
        request: SurrealRequest
            The request to send to the SurrealDB server

        Raises
        ------
        WebSocketException
            If the connection is not established

        Returns
        -------
        dict[str, Any]
            The response from the SurrealDB server
        """
        self.ws.send(json_dumps(request))
        return json_loads(self.ws.recv())

    @unthread
    def _send(self, method: str, *params: Any) -> Any:
        """
        Sends a request to the websocket server

        Parameters
        ----------
        method: str
            The method of the request
        *params: Any
            The parameters of the request

        Raises
        ------
        WebSocketError
            if surrealDB server returns an error(s)

        Returns
        -------
        Any
            The response from the SurrealDB server
        """
        request = SurrealRequest(id=self._count(), method=method, params=params)
        self.ws.send(json_dumps(request))
        data = self.ws.recv()
        returnData = json_loads(data)
        if returnData.get("error") is not None:
            raise WebSocketError(returnData["error"])

        return returnData["id"], returnData["result"]

    def _clean_dict_params(self, params: dict[str, Any]) -> dict[str, Any]:
        """
        Clean the params of the request. It is not recommended to use this function. May cause unexpected behavior.

        Parameters
        ----------
        params: dict[str, Any]
            The params to clean

        Returns
        -------
        dict[str, Any]
            The cleaned params
        """
        return {k: v for k, v in params.items() if v is not None}

    def ping(self) -> bool:
        """
        Ping the SurrealDB server.

        Returns
        -------
        bool
            True if the SurrealDB server is alive else False
        """
        return self._send("ping")[1]

    def use(self, namespace: str, database: str) -> None:
        """
        Use a database.

        Parameters
        ----------
        namespace: str
            The namespace of the database
        database: str
            The database name

        """
        return self._send("use", namespace, database)[1]

    def register(self, params: dict[str, Any]) -> str:
        """
        Signup to the SurrealDB server.

        Parameters
        ----------
        params: dict[str, Any]
            The params of the request

        Returns
        -------
        str
            The token of the user
        """
        clean_params = self._clean_dict_params(params)
        return self._send("signup", clean_params)[1]

    def login(self, params: Union[dict[str, Any], LoginParams]) -> None:
        """
        Login to the SurrealDB server.

        Parameters
        ----------
        params: Union[dict[str, Any], LoginParams]
            The credentials of the login request
        """
        if isinstance(params, LoginParams):
            params = params.to_dict()
        clean_params = self._clean_dict_params(params)
        return self._send("signin", clean_params)[1]

    def invalidate(self) -> None:
        """
        Invalidate the current session.
        """
        return self._send("invalidate")[1]

    def authenticate(self, token: str):
        """Authenticate the current session."""
        return self._send("authenticate", token)[1]

    def killProcess(self, id: str) -> None:
        """
        Kill the current process.
        """
        return self._send("kill", id)[1]

    def let(self, key: str, value: Any) -> None:
        """Set a let variable."""

        return self._send("let", key, value)[1]

    def query(
        self,
        sql: str,
        params: Union[dict[str, Any], list[Any], tuple[Any], set[Any]] = None,
    ) -> list[dict[str, Any]]:
        """Query the SurrealDB server.

        Parameters
        ----------
        sql: str
            The sql query to execute on the SurrealDB server
        params: Any
            The params of the query (optional)

        Returns
        -------
        list[dict[str, Any]]
            The result of the query as a list of dictionaries (rows) with the column names as keys and the values as values of the dictionary (row)
        """
        if params is None:
            id, query_result = self._send("query", sql, params)
        elif isinstance(params, (list, tuple, set)):
            print(sql.format(*params))
            id, query_result = self._send("query", sql.format(*params))
        elif isinstance(params, dict):
            id, query_result = self._send("query", sql.format(**params))
        else:
            raise TypeError("params must be a list, tuple, set or dict")
        return SurrealResponse(id=id, results=[r["result"] for r in query_result])

    def find(self, tid: str) -> SurrealResponse:
        """Find documents by their ids or table name.
        Parameters
        ----------
        tid: str
            The id or table name of the document(s)

        Returns
        -------
        SurrealResponse
            The result of the query as a SurrealResponse object containing the id of the query and the results as a list of dictionaries (rows) with the column names as keys and the values as values of the dictionary (row)
        """
        returnData = self._send("select", tid)
        return SurrealResponse(id=returnData[0], results=returnData[1])

    def find_one(self, tid: str) -> Optional[SurrealResponse]:
        """Find a document by its id or table name.
        Parameters
        ----------
        tid: str
            The id or table name of the document

        Returns
        -------
        Optional[SurrealResponse]
            The result of the query as a SurrealResponse object containing the results as a list of dictionaries (rows) with the column names as keys and the values as values of the dictionary (row)

        """
        response = self.find(tid)
        if len(response.results) == 0:
            return None
        return response.results[0]

    def create(
        self, tid: str, data: Union[Any, dict[str, Any]]
    ) -> list[dict[str, Any]]:
        """Create a document.
        Parameters
        ----------
        tid: str
            The id or table name of the document
        data: Union[Any,dict[str,Any]]
            The data of the document

        Returns
        -------
        list[dict[str,Any]]
            The document(s) created as a list of dictionaries (rows) with the column names as keys and the values as values of the dictionary (row)

        """
        return self._send("create", tid, data)[1]

    def update(
        self, tid: str, data: Union[Any, dict[str, Any]]
    ) -> list[dict[str, Any]]:
        """Update a document.
        Parameters
        ----------
        tid: str
            The id or table name of the document
        data: Union[Any,dict[str,Any]]
            The data of the document

        Returns
        -------
        list[dict[str,Any]]
            The document(s) updated as a list of dictionaries (rows) with the column names as keys and the values as values of the dictionary (row)

        """
        return self._send("update", tid, data)[id]

    def change(
        self, tid: str, data: Union[Any, dict[str, Any]]
    ) -> list[dict[str, Any]]:
        """Change a document.
        Parameters
        ----------
        tid: str
            The id or table name of the document
        data: Union[Any,dict[str,Any]]
            The data of the document

        Returns
        -------
        list[dict[str,Any]]
            The document(s) changed as a list of dictionaries (rows) with the column names as keys and the values as values of the dictionary (row)

        """
        return self._send("change", tid, data)[1]

    def modify(
        self, tid: str, data: Union[Any, dict[str, Any]]
    ) -> list[dict[str, Any]]:
        """Modify a document.
        Parameters
        ----------
        tid: str
            The id or table name of the document
        data: Union[Any,dict[str,Any]]
            The data of the document

        Returns
        -------
        list[dict[str,Any]]
            The document(s) modified as a list of dictionaries (rows) with the column names as keys and the values as values of the dictionary (row)

        """
        return self._send("modify", tid, data)[1]

    def delete(self, tid: str) -> list[dict[str, Any]]:
        """Delete document(s) by given tid.
        Parameters
        ----------
        tid: str
            The id or table name of the document

        Returns
        -------
        list[dict[str,Any]]
            The document(s) deleted as a list of dictionaries (rows) with the column names as keys and the values as values of the dictionary (row)

        """
        return self._send("delete", tid)[1]

    @unthread
    def __enter__(self):
        """Enter the context manager.

        Returns
        -------
        SurrealClient
            The SurrealClient instance
        """
        self.connect()
        return self

    def __exit__(
        self, exc_type: Optional[Any], exc_val: Optional[Any], exc_tb: Optional[Any]
    ):
        """
        Exit the context manager.

        Parameters
        ----------
        exc_type: Any
            The exception type (optional) that was raised in the context manager (if any)
        exc_val: Any
            The exception value (optional) that was raised in the context manager (if any)
        exc_tb: Any
            The exception traceback (optional) that was raised in the context manager (if any)
        """
        self.disconnect()

Subclasses

Instance variables

var database

The database name.

Returns

Optional[str]
The database name
Expand source code
@property
def database(self):
    """The database name.

    Returns
    -------
    Optional[str]
        The database name
    """
    return self._database
var namespace

The namespace of the database.

Returns

Optional[str]
The namespace of the database
Expand source code
@property
def namespace(self):
    """The namespace of the database.

    Returns
    -------
    Optional[str]
        The namespace of the database
    """
    return self._namespace
var url

The url of the websocket server.

Returns

str
The url of the websocket server
Expand source code
@property
def url(self):
    """The url of the websocket server.

    Returns
    -------
    str
        The url of the websocket server
    """
    return self.__url
var ws : websocket._core.WebSocket

The websocket connection.

Returns

WebSocket
The websocket connection
Expand source code
@property
def ws(self) -> WebSocket:
    """The websocket connection.

    Returns
    -------
    WebSocket
        The websocket connection"""
    return self._ws

Methods

def authenticate(self, token: str)

Authenticate the current session.

Expand source code
def authenticate(self, token: str):
    """Authenticate the current session."""
    return self._send("authenticate", token)[1]
def change(self, tid: str, data: Union[Any, dict[str, Any]]) ‑> list[dict[str, typing.Any]]

Change a document. Parameters


tid : str
The id or table name of the document
data : Union[Any,dict[str,Any]]
The data of the document

Returns

list[dict[str,Any]]
The document(s) changed as a list of dictionaries (rows) with the column names as keys and the values as values of the dictionary (row)
Expand source code
def change(
    self, tid: str, data: Union[Any, dict[str, Any]]
) -> list[dict[str, Any]]:
    """Change a document.
    Parameters
    ----------
    tid: str
        The id or table name of the document
    data: Union[Any,dict[str,Any]]
        The data of the document

    Returns
    -------
    list[dict[str,Any]]
        The document(s) changed as a list of dictionaries (rows) with the column names as keys and the values as values of the dictionary (row)

    """
    return self._send("change", tid, data)[1]
def connect(*args: Any, **kwargs: dict[str, typing.Any]) ‑> Any

Wrapper function.

Parameters

*args : Any
The arguments to pass to the function.
**kwargs : dict[str,Any]
The keyword arguments to pass to the function.

Returns

Any
The return value of the function.

Raises

RuntimeError
If the function is not run in the main thread.
Expand source code
def wrapper(*args: Any, **kwargs: dict[str, Any]) -> Any:
    """
    Wrapper function.

    Parameters
    ----------
    *args : Any
        The arguments to pass to the function.
    **kwargs : dict[str,Any]
        The keyword arguments to pass to the function.

    Returns
    -------
    Any
        The return value of the function.

    Raises
    ------
    RuntimeError
        If the function is not run in the main thread.
    """
    assert (
        threading.current_thread() is threading.main_thread()
    ), "This function is not thread safe"

    return func(*args, **kwargs)
def create(self, tid: str, data: Union[Any, dict[str, Any]]) ‑> list[dict[str, typing.Any]]

Create a document. Parameters


tid : str
The id or table name of the document
data : Union[Any,dict[str,Any]]
The data of the document

Returns

list[dict[str,Any]]
The document(s) created as a list of dictionaries (rows) with the column names as keys and the values as values of the dictionary (row)
Expand source code
def create(
    self, tid: str, data: Union[Any, dict[str, Any]]
) -> list[dict[str, Any]]:
    """Create a document.
    Parameters
    ----------
    tid: str
        The id or table name of the document
    data: Union[Any,dict[str,Any]]
        The data of the document

    Returns
    -------
    list[dict[str,Any]]
        The document(s) created as a list of dictionaries (rows) with the column names as keys and the values as values of the dictionary (row)

    """
    return self._send("create", tid, data)[1]
def delete(self, tid: str) ‑> list[dict[str, typing.Any]]

Delete document(s) by given tid. Parameters


tid : str
The id or table name of the document

Returns

list[dict[str,Any]]
The document(s) deleted as a list of dictionaries (rows) with the column names as keys and the values as values of the dictionary (row)
Expand source code
def delete(self, tid: str) -> list[dict[str, Any]]:
    """Delete document(s) by given tid.
    Parameters
    ----------
    tid: str
        The id or table name of the document

    Returns
    -------
    list[dict[str,Any]]
        The document(s) deleted as a list of dictionaries (rows) with the column names as keys and the values as values of the dictionary (row)

    """
    return self._send("delete", tid)[1]
def disconnect(*args: Any, **kwargs: dict[str, typing.Any]) ‑> Any

Wrapper function.

Parameters

*args : Any
The arguments to pass to the function.
**kwargs : dict[str,Any]
The keyword arguments to pass to the function.

Returns

Any
The return value of the function.

Raises

RuntimeError
If the function is not run in the main thread.
Expand source code
def wrapper(*args: Any, **kwargs: dict[str, Any]) -> Any:
    """
    Wrapper function.

    Parameters
    ----------
    *args : Any
        The arguments to pass to the function.
    **kwargs : dict[str,Any]
        The keyword arguments to pass to the function.

    Returns
    -------
    Any
        The return value of the function.

    Raises
    ------
    RuntimeError
        If the function is not run in the main thread.
    """
    assert (
        threading.current_thread() is threading.main_thread()
    ), "This function is not thread safe"

    return func(*args, **kwargs)
def find(self, tid: str) ‑> surrealpy.ws.models.SurrealResponse

Find documents by their ids or table name. Parameters


tid : str
The id or table name of the document(s)

Returns

SurrealResponse
The result of the query as a SurrealResponse object containing the id of the query and the results as a list of dictionaries (rows) with the column names as keys and the values as values of the dictionary (row)
Expand source code
def find(self, tid: str) -> SurrealResponse:
    """Find documents by their ids or table name.
    Parameters
    ----------
    tid: str
        The id or table name of the document(s)

    Returns
    -------
    SurrealResponse
        The result of the query as a SurrealResponse object containing the id of the query and the results as a list of dictionaries (rows) with the column names as keys and the values as values of the dictionary (row)
    """
    returnData = self._send("select", tid)
    return SurrealResponse(id=returnData[0], results=returnData[1])
def find_one(self, tid: str) ‑> Optional[surrealpy.ws.models.SurrealResponse]

Find a document by its id or table name. Parameters


tid : str
The id or table name of the document

Returns

Optional[SurrealResponse]
The result of the query as a SurrealResponse object containing the results as a list of dictionaries (rows) with the column names as keys and the values as values of the dictionary (row)
Expand source code
def find_one(self, tid: str) -> Optional[SurrealResponse]:
    """Find a document by its id or table name.
    Parameters
    ----------
    tid: str
        The id or table name of the document

    Returns
    -------
    Optional[SurrealResponse]
        The result of the query as a SurrealResponse object containing the results as a list of dictionaries (rows) with the column names as keys and the values as values of the dictionary (row)

    """
    response = self.find(tid)
    if len(response.results) == 0:
        return None
    return response.results[0]
def invalidate(self) ‑> None

Invalidate the current session.

Expand source code
def invalidate(self) -> None:
    """
    Invalidate the current session.
    """
    return self._send("invalidate")[1]
def killProcess(self, id: str) ‑> None

Kill the current process.

Expand source code
def killProcess(self, id: str) -> None:
    """
    Kill the current process.
    """
    return self._send("kill", id)[1]
def let(self, key: str, value: Any) ‑> None

Set a let variable.

Expand source code
def let(self, key: str, value: Any) -> None:
    """Set a let variable."""

    return self._send("let", key, value)[1]
def login(self, params: Union[dict[str, Any], surrealpy.ws.models.LoginParams]) ‑> None

Login to the SurrealDB server.

Parameters

params : Union[dict[str, Any], LoginParams]
The credentials of the login request
Expand source code
def login(self, params: Union[dict[str, Any], LoginParams]) -> None:
    """
    Login to the SurrealDB server.

    Parameters
    ----------
    params: Union[dict[str, Any], LoginParams]
        The credentials of the login request
    """
    if isinstance(params, LoginParams):
        params = params.to_dict()
    clean_params = self._clean_dict_params(params)
    return self._send("signin", clean_params)[1]
def modify(self, tid: str, data: Union[Any, dict[str, Any]]) ‑> list[dict[str, typing.Any]]

Modify a document. Parameters


tid : str
The id or table name of the document
data : Union[Any,dict[str,Any]]
The data of the document

Returns

list[dict[str,Any]]
The document(s) modified as a list of dictionaries (rows) with the column names as keys and the values as values of the dictionary (row)
Expand source code
def modify(
    self, tid: str, data: Union[Any, dict[str, Any]]
) -> list[dict[str, Any]]:
    """Modify a document.
    Parameters
    ----------
    tid: str
        The id or table name of the document
    data: Union[Any,dict[str,Any]]
        The data of the document

    Returns
    -------
    list[dict[str,Any]]
        The document(s) modified as a list of dictionaries (rows) with the column names as keys and the values as values of the dictionary (row)

    """
    return self._send("modify", tid, data)[1]
def ping(self) ‑> bool

Ping the SurrealDB server.

Returns

bool
True if the SurrealDB server is alive else False
Expand source code
def ping(self) -> bool:
    """
    Ping the SurrealDB server.

    Returns
    -------
    bool
        True if the SurrealDB server is alive else False
    """
    return self._send("ping")[1]
def query(self, sql: str, params: Union[dict[str, Any], list[Any], tuple[Any], set[Any]] = None) ‑> list[dict[str, typing.Any]]

Query the SurrealDB server.

Parameters

sql : str
The sql query to execute on the SurrealDB server
params : Any
The params of the query (optional)

Returns

list[dict[str, Any]]
The result of the query as a list of dictionaries (rows) with the column names as keys and the values as values of the dictionary (row)
Expand source code
def query(
    self,
    sql: str,
    params: Union[dict[str, Any], list[Any], tuple[Any], set[Any]] = None,
) -> list[dict[str, Any]]:
    """Query the SurrealDB server.

    Parameters
    ----------
    sql: str
        The sql query to execute on the SurrealDB server
    params: Any
        The params of the query (optional)

    Returns
    -------
    list[dict[str, Any]]
        The result of the query as a list of dictionaries (rows) with the column names as keys and the values as values of the dictionary (row)
    """
    if params is None:
        id, query_result = self._send("query", sql, params)
    elif isinstance(params, (list, tuple, set)):
        print(sql.format(*params))
        id, query_result = self._send("query", sql.format(*params))
    elif isinstance(params, dict):
        id, query_result = self._send("query", sql.format(**params))
    else:
        raise TypeError("params must be a list, tuple, set or dict")
    return SurrealResponse(id=id, results=[r["result"] for r in query_result])
def register(self, params: dict[str, typing.Any]) ‑> str

Signup to the SurrealDB server.

Parameters

params : dict[str, Any]
The params of the request

Returns

str
The token of the user
Expand source code
def register(self, params: dict[str, Any]) -> str:
    """
    Signup to the SurrealDB server.

    Parameters
    ----------
    params: dict[str, Any]
        The params of the request

    Returns
    -------
    str
        The token of the user
    """
    clean_params = self._clean_dict_params(params)
    return self._send("signup", clean_params)[1]
def update(self, tid: str, data: Union[Any, dict[str, Any]]) ‑> list[dict[str, typing.Any]]

Update a document. Parameters


tid : str
The id or table name of the document
data : Union[Any,dict[str,Any]]
The data of the document

Returns

list[dict[str,Any]]
The document(s) updated as a list of dictionaries (rows) with the column names as keys and the values as values of the dictionary (row)
Expand source code
def update(
    self, tid: str, data: Union[Any, dict[str, Any]]
) -> list[dict[str, Any]]:
    """Update a document.
    Parameters
    ----------
    tid: str
        The id or table name of the document
    data: Union[Any,dict[str,Any]]
        The data of the document

    Returns
    -------
    list[dict[str,Any]]
        The document(s) updated as a list of dictionaries (rows) with the column names as keys and the values as values of the dictionary (row)

    """
    return self._send("update", tid, data)[id]
def use(self, namespace: str, database: str) ‑> None

Use a database.

Parameters

namespace : str
The namespace of the database
database : str
The database name
Expand source code
def use(self, namespace: str, database: str) -> None:
    """
    Use a database.

    Parameters
    ----------
    namespace: str
        The namespace of the database
    database: str
        The database name

    """
    return self._send("use", namespace, database)[1]
class SurrealClientThread (url: str, *, eventManager: Optional[EventManager] = None)

A class used to represent SurrealClient …

note: This class is not thread safe

Parameters

url : str
The url of the SurrealDB server
eventManager : Optional[EventManager]
The event manager of the client (optional) if you want to use custom event manager (default is None)

Attributes

url : str
The url of the websocket server
ws : WebSocket
The websocket connection
namespace : Optional[str]
The namespace of the database
database : Optional[str]
The database name
eventManager : Optional[EventManager]
The event manager of the client (optional) if you want to use custom event manager (default is None)

Methods

connect() -> None: Connect to the SurrealDB server disconnect() -> None: Disconnect from the SurrealDB server ping() -> str: Ping the SurrealDB server use(namespace: str, database: str) -> None: Use a database info() -> dict[str, Any]: Get current SurrealDB server's authentication info register(params: dict[str, Any]) -> str Signup to the SurrealDB server login(params: dict[str, Any]) -> None Login to the SurrealDB server invalidate() -> None Invalidate the current session authenticate(token: str) -> None Authenticate the current session killProcess(id: str) -> None Kill the given id process let(key: str, value: Any) -> None Set a let variable to the SurrealDB server query(sql: str, params: dict[str, Any]) -> list[dict[str, Any]] Query the SurrealDB server with the given query and params (optional) that returns a list of dict find(tid: str) -> list[dict[str, Any]] Find the given tid (table or record id) in the SurrealDB server and returns a list of dict find_one(tid: str) -> dict[str, Any] Find the given tid (table or record id) in the SurrealDB server and returns a dict create(tid: str, data: Union[Any, dict[str, Any]]) -> list[dict[str, Any]] Create a record in the SurrealDB server with the given tid (table id) and data, returns a list of dict of the created record(s) update(tid: str, data: Union[Any, dict[str, Any]]) -> list[dict[str, Any]] Update a record in the SurrealDB server with the given tid (table id) and data, returns a list of dict of the updated record(s) change(tid: str, data: Union[Any, dict[str, Any]]) -> list[dict[str, Any]] Change a record in the SurrealDB server with the given tid (table id) and data, returns a list of dict of the changed record(s) modify(tid: str, data: Union[Any, dict[str, Any]]) -> list[dict[str, Any]] Modify a record in the SurrealDB server with the given tid (table id) and data, returns a list of dict of the modified record(s) delete(tid: str) -> list[dict[str, Any]] Delete a record in the SurrealDB server with the given tid (table id), returns a list of dict of the deleted record(s) on(event: Union[str, Callable]) -> Callable Decorator to register an event handler for the given event name or function name (if event is a function) to the EventManager and returns a function that can be used to unregister the event handler from the EventManager

Initialize the SurrealClientThread instance.

Parameters

url : str
The url of the SurrealDB server
Expand source code
class SurrealClientThread(SurrealClient):
    """
    A class used to represent SurrealClient
    ...

    note: This class is not thread safe

    Parameters
    ----------
    url: str
        The url of the SurrealDB server
    eventManager: Optional[event.EventManager]
        The event manager of the client (optional) if you want to use custom event manager (default is None)

    Attributes
    ----------
    url: str
        The url of the websocket server
    ws: WebSocket
        The websocket connection
    namespace: Optional[str]
        The namespace of the database
    database: Optional[str]
        The database name
    eventManager: Optional[event.EventManager]
        The event manager of the client (optional) if you want to use custom event manager (default is None)
    Methods
    -------
    connect() -> None:
        Connect to the SurrealDB server
    disconnect() -> None:
        Disconnect from the SurrealDB server
    ping() -> str:
        Ping the SurrealDB server
    use(namespace: str, database: str) -> None:
        Use a database
    info() -> dict[str, Any]:
        Get current SurrealDB server's authentication info
    register(params: dict[str, Any]) -> str
        Signup to the SurrealDB server
    login(params: dict[str, Any]) -> None
        Login to the SurrealDB server
    invalidate() -> None
        Invalidate the current session
    authenticate(token: str) -> None
        Authenticate the current session
    killProcess(id: str) -> None
        Kill the given id process
    let(key: str, value: Any) -> None
        Set a let variable to the SurrealDB server
    query(sql: str, params: dict[str, Any]) -> list[dict[str, Any]]
        Query the SurrealDB server with the given query and params (optional) that returns a list of dict
    find(tid: str) -> list[dict[str, Any]]
        Find the given tid (table or record id) in the SurrealDB server and returns a list of dict
    find_one(tid: str) -> dict[str, Any]
        Find the given tid (table or record id) in the SurrealDB server and returns a dict
    create(tid: str, data: Union[Any, dict[str, Any]]) -> list[dict[str, Any]]
        Create a record in the SurrealDB server with the given tid (table id) and data, returns a list of dict of the created record(s)
    update(tid: str, data: Union[Any, dict[str, Any]]) -> list[dict[str, Any]]
        Update a record in the SurrealDB server with the given tid (table id) and data, returns a list of dict of the updated record(s)
    change(tid: str, data: Union[Any, dict[str, Any]]) -> list[dict[str, Any]]
        Change a record in the SurrealDB server with the given tid (table id) and data, returns a list of dict of the changed record(s)
    modify(tid: str, data: Union[Any, dict[str, Any]]) -> list[dict[str, Any]]
        Modify a record in the SurrealDB server with the given tid (table id) and data, returns a list of dict of the modified record(s)
    delete(tid: str) -> list[dict[str, Any]]
        Delete a record in the SurrealDB server with the given tid (table id), returns a list of dict of the deleted record(s)
    on(event: Union[str, Callable]) -> Callable
        Decorator to register an event handler for the given event name or function name (if event is a function) to the EventManager and returns a function that can be used to unregister the event handler from the EventManager
    """

    def __init__(self, url: str, *, eventManager: Optional[event.EventManager] = None):
        """Initialize the SurrealClientThread instance.

        Parameters
        ----------
        url: str
            The url of the SurrealDB server

        """
        super().__init__(url)
        self._receive_thread: threading.Thread = threading.Thread(
            target=self._receive_responses, daemon=True
        )
        self._lock = threading.Lock()
        self._responses: dict[str, Any] = {}
        self._event_manager: event.EventManager = eventManager or event.EventManager()

        self.on = self._event_manager.on

    def count(self) -> int:
        """
        Count the number of responses.

        Returns
        -------
        int
            The number of responses
        """
        with self._lock:
            return len(self._responses)

    def _count(self) -> str:
        """
        This is a private function that is used to count the number of requests. It is not recommended to use this function.

        Returns
        -------
        str
            The number of requests
        """
        with self._lock:
            self._counter += 1
            return str(self._counter)

    def _receive_responses(self):
        """
        This is a private function that is used to receive responses from the SurrealDB server. It is not recommended to use this function.
        """
        while self.ws.connected:
            # Receive response from server and add it to the responses dictionary
            response = self._ws.recv()
            if response is None or response == "":
                # If the response is empty, then the connection has been closed
                continue
            # Parse the response as a dictionary
            response = json_loads(response)
            # Add the response to the responses dictionary
            self._responses[response["id"]] = response
            self._event_manager.emit(event.Events.RECEIVED, response)

    def _send(self, method: str, *params: Any) -> Any:
        """
        Sends a request to the websocket server

        Parameters
        ----------
        method: str
            The method of the request
        *params: Any
            The parameters of the request

        Raises
        ------
        WebSocketError
            if surrealDB server returns an error(s)

        Returns
        -------
        Any
            The response from the SurrealDB server
        """
        request = SurrealRequest(id=self._count(), method=method, params=params)
        self._ws.send(json_dumps(request))
        # Wait for the response to be received
        while request.id not in self._responses:
            time.sleep(0.01)
        # Get the response from the responses dictionary and remove it from the dictionary
        response = self._responses.pop(request.id, None)
        if response is None:
            # If the response is None, then the connection has been closed
            err = SurrealError("Response is None")
            # Emit the error event
            self._event_manager.emit(
                event.Events.ERROR,
                event.Event(
                    event=event.Events.ERROR, response=SurrealResponse("-1", (err,))
                ),
            )
            raise err
        if response.get("error") is not None:
            # If the response has an error, then raise the error and return None
            err = WebSocketError(response["error"])
            # Emit the error event
            self._event_manager.emit(
                event.Events.ERROR,
                event.Event(event=event.Events.ERROR, response=response, id=request.id),
            )
            raise err
        return response["id"], response["result"]

    def connect(self):
        """
        Connect to the SurrealDB server.

        Raises
        ------
        WebSocketError
            If the connection is already established
        """
        # check if thread is already running, if so, raise an error
        if self._receive_thread.is_alive():
            raise WebSocketError("Connection is already established")
        self._ws = create_connection(self.url)
        self._receive_thread.start()
        self._event_manager.emit(
            event.Events.CONNECTED,
            event.Event(
                event.Events.CONNECTED, response=SurrealResponse("-1", ("Connected",))
            ),
        )

    def login(self, params: Union[dict[str, Any], LoginParams]) -> None:
        """
        Login to the SurrealDB server.

        Parameters
        ----------
        params: Union[dict[str, Any], LoginParams]
            The credentials of the login request
        """
        self._event_manager.emit(
            event.Events.LOGIN,
            event.Event(
                event.Events.LOGIN, response=SurrealResponse("-1", ("Logging in...",))
            ),
        )
        if isinstance(params, LoginParams):
            params = params.to_dict()
        clean_params = self._clean_dict_params(params)
        result = (self._send("signin", clean_params)[1],)
        self._event_manager.emit(
            event.Events.LOGGED_IN,
            event.Event(event.Events.LOGGED_IN, response=SurrealResponse("-1", result)),
        )
        return result

    def use(self, namespace: str, database: str) -> None:
        """
        Use a database.

        Parameters
        ----------
        namespace: str
            The namespace of the database
        database: str
            The database name

        """
        result = self._send("use", namespace, database)
        self._event_manager.emit(
            event.Events.USE,
            event.Event(
                event.Events.USE,
                response=SurrealResponse(
                    result[0], {"namespace": namespace, "database": database}
                ),
                id=result[0],
            ),
        )
        return result[1]

    def disconnect(self):
        """
        Disconnect from the SurrealDB server.

        Raises
        ------
        WebSocketError
            If the connection is already closed
        """
        self._ws.close()
        self._receive_thread.join()
        self._event_manager.emit(
            event.Events.DISCONNECTED,
            event.Event(
                event.Events.DISCONNECTED,
                response=SurrealResponse("-1", ("Disconnected",)),
            ),
        )

    def __enter__(self):
        """Enter the context manager.

        Returns
        -------
        SurrealClient
            The SurrealClient instance
        """
        self.connect()
        return self

Ancestors

Methods

def connect(self)

Connect to the SurrealDB server.

Raises

WebSocketError
If the connection is already established
Expand source code
def connect(self):
    """
    Connect to the SurrealDB server.

    Raises
    ------
    WebSocketError
        If the connection is already established
    """
    # check if thread is already running, if so, raise an error
    if self._receive_thread.is_alive():
        raise WebSocketError("Connection is already established")
    self._ws = create_connection(self.url)
    self._receive_thread.start()
    self._event_manager.emit(
        event.Events.CONNECTED,
        event.Event(
            event.Events.CONNECTED, response=SurrealResponse("-1", ("Connected",))
        ),
    )
def count(self) ‑> int

Count the number of responses.

Returns

int
The number of responses
Expand source code
def count(self) -> int:
    """
    Count the number of responses.

    Returns
    -------
    int
        The number of responses
    """
    with self._lock:
        return len(self._responses)
def disconnect(self)

Disconnect from the SurrealDB server.

Raises

WebSocketError
If the connection is already closed
Expand source code
def disconnect(self):
    """
    Disconnect from the SurrealDB server.

    Raises
    ------
    WebSocketError
        If the connection is already closed
    """
    self._ws.close()
    self._receive_thread.join()
    self._event_manager.emit(
        event.Events.DISCONNECTED,
        event.Event(
            event.Events.DISCONNECTED,
            response=SurrealResponse("-1", ("Disconnected",)),
        ),
    )

Inherited members