Module lbsntransform.input.mappings.db_query
Module for db input connection sql mapping
Expand source code
# -*- coding: utf-8 -*-
"""
Module for db input connection sql mapping
"""
import enum
from typing import Union, Optional, List, Tuple
import lbsnstructure as lbsn
"""Schema convention from lbsn db spec"""
LBSN_SCHEMA = [
(lbsn.Origin().DESCRIPTOR.name, "social", "origin", "origin_id"),
(lbsn.Country().DESCRIPTOR.name, "spatial", "country", "country_guid"),
(lbsn.City().DESCRIPTOR.name, "spatial", "city", "city_guid"),
(lbsn.Place().DESCRIPTOR.name, "spatial", "place", "place_guid"),
(lbsn.UserGroup().DESCRIPTOR.name, "social", "user_groups", "usergroup_guid"),
(lbsn.User().DESCRIPTOR.name, "social", "user", "user_guid"),
(lbsn.Post().DESCRIPTOR.name, "topical", "post", "post_guid"),
(lbsn.PostReaction().DESCRIPTOR.name, "topical", "post_reaction", "reaction_guid"),
(lbsn.Event().DESCRIPTOR.name, "temporal", "event", "event_guid"),
]
def optional_schema_override(
LBSN_SCHEMA: List[Tuple[str, str, str, str]],
schema_table_overrides: List[Tuple[str, str]],
) -> List[Tuple[str, str, str, str]]:
"""Override schema and table name for selected lbsn objects."""
LBSN_SCHEMA_OVERRIDE = []
for lbsn_type, schema_name, table_name, key_col in LBSN_SCHEMA:
for schema_table_override in schema_table_overrides:
lbsn_object_ref, schema_table_override = schema_table_override
try:
schema_override, table_override = schema_table_override.split(".")
except ValueError as e:
raise ValueError(
f"Cannot split schema and table from override "
f"({schema_table_override}). Make sure "
f"override_lbsn_query_schema entries are formatted "
f"correctly, e.g. schema_override.table_override"
) from e
if lbsn_type.lower() == lbsn_object_ref:
# append override
LBSN_SCHEMA_OVERRIDE.append(
(lbsn_type, schema_override, table_override, key_col)
)
break
else:
# append none-overrides
# if no match has been found
LBSN_SCHEMA_OVERRIDE.append((lbsn_type, schema_name, table_name, key_col))
return LBSN_SCHEMA_OVERRIDE
class InputSQL(enum.Enum):
"""SQL for default JSON records stored in DB
In this example, records are stored in table "input" in schema
public. There are 3 columns, in_id, insert_time and data.
Table data is cast to json type.
The two %s string formatters allow substitution of values
during the query. The first %s is the key, the second %s is the limit of
records to get during the query (default 10000)
"""
DEFAULT = """
SELECT in_id, insert_time, data::json
FROM {schema_name}."{table_name}"
WHERE {key_col} > {start_id}
ORDER BY {key_col} ASC
LIMIT {number_of_records_to_fetch};
"""
"""SQL for LBSN records stored in DB"""
LBSN = """
SELECT * FROM {schema_name}."{table_name}"
{optional_where}
ORDER BY {key_col} ASC
LIMIT {number_of_records_to_fetch};
"""
DB_CUSTOM = """Define your own DB Mapping SQL here"""
def get_sql(
self,
schema_name: str = "public",
table_name: str = "input",
start_id: Optional[Union[int, str]] = None,
number_of_records_to_fetch: int = 10000,
key_col="in_id",
):
"""Get SQL formatted string"""
optional_where = ""
if start_id is not None:
quote_subst = ""
if self.name == "LBSN":
# quoted string required
quote_subst = "'"
optional_where = f"WHERE {key_col} > {quote_subst}{start_id}{quote_subst}"
# self.value refers to current ENUM,
# which is always string
return self.value.format(
schema_name=schema_name,
table_name=table_name,
optional_where=optional_where,
number_of_records_to_fetch=number_of_records_to_fetch,
key_col=key_col,
)
Functions
def optional_schema_override(LBSN_SCHEMA: List[Tuple[str, str, str, str]], schema_table_overrides: List[Tuple[str, str]]) ‑> List[Tuple[str, str, str, str]]
-
Override schema and table name for selected lbsn objects.
Expand source code
def optional_schema_override( LBSN_SCHEMA: List[Tuple[str, str, str, str]], schema_table_overrides: List[Tuple[str, str]], ) -> List[Tuple[str, str, str, str]]: """Override schema and table name for selected lbsn objects.""" LBSN_SCHEMA_OVERRIDE = [] for lbsn_type, schema_name, table_name, key_col in LBSN_SCHEMA: for schema_table_override in schema_table_overrides: lbsn_object_ref, schema_table_override = schema_table_override try: schema_override, table_override = schema_table_override.split(".") except ValueError as e: raise ValueError( f"Cannot split schema and table from override " f"({schema_table_override}). Make sure " f"override_lbsn_query_schema entries are formatted " f"correctly, e.g. schema_override.table_override" ) from e if lbsn_type.lower() == lbsn_object_ref: # append override LBSN_SCHEMA_OVERRIDE.append( (lbsn_type, schema_override, table_override, key_col) ) break else: # append none-overrides # if no match has been found LBSN_SCHEMA_OVERRIDE.append((lbsn_type, schema_name, table_name, key_col)) return LBSN_SCHEMA_OVERRIDE
Classes
class InputSQL (*args, **kwds)
-
SQL for default JSON records stored in DB
In this example, records are stored in table "input" in schema public. There are 3 columns, in_id, insert_time and data. Table data is cast to json type.
The two %s string formatters allow substitution of values during the query. The first %s is the key, the second %s is the limit of records to get during the query (default 10000)
Expand source code
class InputSQL(enum.Enum): """SQL for default JSON records stored in DB In this example, records are stored in table "input" in schema public. There are 3 columns, in_id, insert_time and data. Table data is cast to json type. The two %s string formatters allow substitution of values during the query. The first %s is the key, the second %s is the limit of records to get during the query (default 10000) """ DEFAULT = """ SELECT in_id, insert_time, data::json FROM {schema_name}."{table_name}" WHERE {key_col} > {start_id} ORDER BY {key_col} ASC LIMIT {number_of_records_to_fetch}; """ """SQL for LBSN records stored in DB""" LBSN = """ SELECT * FROM {schema_name}."{table_name}" {optional_where} ORDER BY {key_col} ASC LIMIT {number_of_records_to_fetch}; """ DB_CUSTOM = """Define your own DB Mapping SQL here""" def get_sql( self, schema_name: str = "public", table_name: str = "input", start_id: Optional[Union[int, str]] = None, number_of_records_to_fetch: int = 10000, key_col="in_id", ): """Get SQL formatted string""" optional_where = "" if start_id is not None: quote_subst = "" if self.name == "LBSN": # quoted string required quote_subst = "'" optional_where = f"WHERE {key_col} > {quote_subst}{start_id}{quote_subst}" # self.value refers to current ENUM, # which is always string return self.value.format( schema_name=schema_name, table_name=table_name, optional_where=optional_where, number_of_records_to_fetch=number_of_records_to_fetch, key_col=key_col, )
Ancestors
- enum.Enum
Class variables
var DB_CUSTOM
var DEFAULT
-
SQL for LBSN records stored in DB
var LBSN
Methods
def get_sql(self, schema_name: str = 'public', table_name: str = 'input', start_id: Union[int, str, ForwardRef(None)] = None, number_of_records_to_fetch: int = 10000, key_col='in_id')
-
Get SQL formatted string
Expand source code
def get_sql( self, schema_name: str = "public", table_name: str = "input", start_id: Optional[Union[int, str]] = None, number_of_records_to_fetch: int = 10000, key_col="in_id", ): """Get SQL formatted string""" optional_where = "" if start_id is not None: quote_subst = "" if self.name == "LBSN": # quoted string required quote_subst = "'" optional_where = f"WHERE {key_col} > {quote_subst}{start_id}{quote_subst}" # self.value refers to current ENUM, # which is always string return self.value.format( schema_name=schema_name, table_name=table_name, optional_where=optional_where, number_of_records_to_fetch=number_of_records_to_fetch, key_col=key_col, )