Source code for cassy.casdriv

# src/fogd_db/casdriv.py
"""Cassandra-Driver CRUD interface."""
from collections import abc, namedtuple

import cassandra.cluster
import cassandra.cqlengine.management as cql_manage
from cassandra.cqlengine import connection


[docs]def connect_session(ips=("127.0.0.1",), port=9042, **kwargs): """Connect and return session using :mod:`fogd_db.casdb`. Parameters ---------- ips: ~collections.abc.Container Container holding contact_points to connect the `cassandra.cluster.Cluster <https://docs.datastax.com/en/developer/python-driver/3.18/api/cassandra/cluster/>`_. Defaults to ("127.0.0.1",), which is the local host. port: ~numbers.Number The server-side port to open connections to. Defaults to 9042. kwargs Additional arguments relegated to `cassandra.cluster.Cluster <https://docs.datastax.com/en/developer/python-driver/3.18/api/cassandra/cluster/>`_ Returns ------- ~typing.NamedTuple Tuple of `cassandra.session.Cluster <https://docs.datastax.com/en/developer/python-driver/3.18/api/cassandra/cluster/#cassandra.cluster.Cluster>`_ and `cassandra.session.Session <https://docs.datastax.com/en/developer/python-driver/3.18/api/cassandra/cluster/#cassandra.cluster.Session>`_ as in ``('cluster'=cluster, 'session'=session)``. """ # create cluster object cluster = cassandra.cluster.Cluster(contact_points=ips, port=port, **kwargs) # and connect non keyspace session session = cluster.connect() # infer cluster name cluster_name = get_cluster_name(cluster) # register connection using the inferred cluster name connection.register_connection(cluster_name, session=session) return namedtuple("CnS", ["cluster", "session"])(cluster, session)
[docs]def create_simple_keyspace( name, replication_factor, durable_writes=True, connections=None ): """Create a keyspace with SimpleStrategy for replica placement. If the keyspace already exists, it will not be modified. Basically a very close wrapper of `cassandra driver's cqlengine functionality <https://docs.datastax.com/en/developer/python-driver/3.25/api/cassandra/cqlengine/management/>`_ Parameters ---------- name: str name of keyspace to create replication_factor: int keyspace replication factor, used with SimpleStrategy durable_writes: bool, default=True Write log is bypassed if set to False. connections: list List of connection names. """ cql_manage.create_keyspace_simple( name=name, replication_factor=replication_factor, durable_writes=durable_writes, connections=connections, )
[docs]def create_entry(model, data, prior_syncing=False, keyspace=None, con=None): """Create a new entry using a python data class model. Parameters ---------- model One of the cassandra python-driver `data class models <https://docs.datastax.com/en/developer/python-driver/3.25/api/cassandra/cqlengine/models/>`_. data: dict Keyword value pairings of the data to be added. Most conform to the :paramref:`~create_entry.model` used. prior_syncing: bool, default=False If ``True`` :func:`synchronize_model` is called before creation, to synchronize database table and model. (Required if you call ``model.create`` for the first time, or change any model attributes. keyspace: str, None, default=None String to specify the keyspace the table is created. If ``None``, ``model.__keyspace__`` is used. con: str, None, default=None String to specify the connection name the table is created with. :mod:`casdriv` uses :attr:`cluster name <get_cluster_name>` as default connection name. If ``None``, ``model.__connection__`` is used. Returns ------- model class Instance of the :paramref:`~create_entry.model` created using :paramref:`~create_entry.data`. """ if keyspace: model.__keyspace__ = keyspace if con: model.__connection__ = con if prior_syncing: synchronize_model(model) return model.create(**data)
[docs]def delete_entry(model, primary_keys, values, keyspace=None, con=None): """Delete an existing entry using a python data class model. Parameters ---------- model One of the cassandra python-driver `data class models <https://docs.datastax.com/en/developer/python-driver/3.25/api/cassandra/cqlengine/models/>`_. primary_keys: str, tuple String or tuple of strings specifying the label/schema of the primary key(s) to be read. values: str, tuple String or tuple of strings specifying the value of the primary key to be queried for. keyspace: str, None, default=None String to specify the keyspace the table is created. If ``None``, ``model.__keyspace__`` is used. con: str, None, default=None String to specify the connection name the table is created with. :mod:`casdriv` uses :attr:`cluster name <get_cluster_name>` as default connection name. If ``None``, ``model.__connection__`` is used. """ entry = read_entry(model, primary_keys, values, keyspace, con) entry.delete()
[docs]def get_all_entries(model, keyspace=None, con=None): """Get all entries of an existing data class model table. Uses the `cassandra python-driver queries <https://docs.datastax.com/en/developer/python-driver/3.25/cqlengine/queryset/#retrieving-objects-with-filters>`_ Parameters ---------- model One of the cassandra python-driver `data class models <https://docs.datastax.com/en/developer/python-driver/3.25/api/cassandra/cqlengine/models/>`_. keyspace: str, None, default=None String to specify the keyspace the table is created. If ``None``, ``model.__keyspace__`` is used. con: str, None, default=None String to specify the connection name the table is created with. :mod:`casdriv` uses :attr:`cluster name <get_cluster_name>` as default connection name. If ``None``, ``model.__connection__`` is used. Returns ------- list List of table entries created via a cassandra python-driver `data class models <https://docs.datastax.com/en/developer/python-driver/3.25/api/cassandra/cqlengine/models/>`_. """ if keyspace: model.__keyspace__ = keyspace if con: model.__connection__ = con return list(model.objects().all())
[docs]def get_cluster_name(cluster): """Return clusters registered name. Parameters ---------- cluster: cassandra.cluster.Cluster `cassandra.cluster.Cluster <https://docs.datastax.com/en/developer/python-driver/3.18/api/cassandra/cluster/>`_ object holding the cassandra database. Returns ------- str: cluster.metadata.cluster_name """ return cluster.metadata.cluster_name
[docs]def list_keyspace_tables(cluster, keyspace): """List all tables present in keyspaces. Parameters ---------- cluster: cassandra.cluster.Cluster `cassandra.cluster.Cluster <https://docs.datastax.com/en/developer/python-driver/3.18/api/cassandra/cluster/>`_ object holding the cassandra database. keyspace: str String specifying the keyspace of the cluster of which all existing tables are to be listed Returns ------- list List of strings specifying the tables currently present inside the :paramref:`~list_keyspace_tables.keyspace` of :paramref:`~list_keyspace_tables.cluster`. """ tables = cluster.metadata.keyspaces[keyspace].tables return list(tables.keys())
[docs]def list_cluster_keyspaces(cluster): """List all keyspaces present in cluster. Parameters ---------- cluster: cassandra.cluster.Cluster `cassandra.cluster.Cluster <https://docs.datastax.com/en/developer/python-driver/3.18/api/cassandra/cluster/>`_ object holding the cassandra database. Returns ------- list List of strings specifying the keyspaces currently present inside the :paramref:`~list_cluster_keyspaces.cluster`. """ keyspaces = cluster.metadata.keyspaces return list(keyspaces.keys())
[docs]def list_table_primary_keys(cluster, keyspace, table): """List all primary key labels (schemas?). Parameters ---------- cluster: cassandra.cluster.Cluster `cassandra.cluster.Cluster <https://docs.datastax.com/en/developer/python-driver/3.18/api/cassandra/cluster/>`_ object holding the cassandra database. keyspace: str String specifying the keyspace of the :paramref:`~list_table_primary_keys.cluster` where :paramref:`~list_table_primary_keys.table` is to be found. table: str String specifying the table inside the :paramref:`~list_table_primary_keys.keyspace` of :paramref:`~list_table_primary_keys.cluster` of which the primary key lable(s) is(are) to be listed. Returns ------- list List of strings specifying the found primary key lables """ keys = [ key.name for key in cluster.metadata.keyspaces[keyspace].tables[table].primary_key ] return keys
[docs]def list_table_columns(cluster, keyspace, table): """List all primary key lables (schemas?). Parameters ---------- cluster: cassandra.cluster.Cluster `cassandra.cluster.Cluster <https://docs.datastax.com/en/developer/python-driver/3.18/api/cassandra/cluster/>`_ object holding the cassandra database. keyspace: str String specifying the keyspace of the :paramref:`~list_table_primary_keys.cluster` where :paramref:`~list_table_primary_keys.table` is to be found. table: str String specifying the table inside the :paramref:`~list_table_primary_keys.keyspace` of :paramref:`~list_table_primary_keys.cluster` of which the column labels are to be listed. Returns ------- list List of strings specifying the found column lables """ columns = cluster.metadata.keyspaces[keyspace].tables[table].columns.keys() return columns
[docs]def read_entry(model, primary_keys, values, keyspace=None, con=None): """Read existing entry using a python data class model. Uses the `cassandra python-driver queries <https://docs.datastax.com/en/developer/python-driver/3.25/cqlengine/queryset/#retrieving-objects-with-filters>`_ Parameters ---------- model One of the cassandra python-driver `data class models <https://docs.datastax.com/en/developer/python-driver/3.25/api/cassandra/cqlengine/models/>`_. primary_keys: str, tuple String or tuple of strings specifying the label/schema of the primary key(s) to be read. values: str, tuple String or tuple of strings specifying the value of the primary key to be queried for. keyspace: str, None, default=None String to specify the keyspace the table is created. If ``None``, ``model.__keyspace__`` is used. con: str, None, default=None String to specify the connection name the table is created with. :mod:`casdriv` uses :attr:`cluster name <get_cluster_name>` as default connection name. If ``None``, ``model.__connection__`` is used. Returns ------- model class Instance of the :paramref:`~read_entry.model` read. Raises ------ ValueError Raised when :paramref:`~read_entry.primary_keys` is neither of type tuple or str. """ # parse keyspace and connection if keyspace: model.__keyspace__ = keyspace if con: model.__connection__ = con # parse key and value pairings if isinstance(primary_keys, str): filter_dict = {primary_keys: values} elif isinstance(primary_keys, abc.Container): filter_dict = dict(zip(primary_keys, values)) else: msg1 = "'primary_keys' argument must be of type str or tuple" msg2 = "not {type(primary_keys)}" raise ValueError(msg1 + msg2) entry = model.get(**filter_dict) return entry
[docs]def synchronize_model(model): """Synchronize the cassandra table with a python data class model. Effectively creating a cassandra table out of a data class model, if not present. Parameters ---------- model One of the cassandra python-driver `data class models <https://docs.datastax.com/en/developer/python-driver/3.25/api/cassandra/cqlengine/models/>`_. Python data class model to synch. """ cql_manage.sync_table(model)