Module lbsntransform.output.hll.hll_functions
Collection of functions used in hll transformation
Expand source code
# -*- coding: utf-8 -*-
"""
Collection of functions used in hll transformation
"""
import datetime as dt
from typing import Dict, Generator, List, Optional, Set, Tuple, Union
import psycopg2
import lbsnstructure as lbsn
from lbsntransform.tools.helper_functions import HelperFunctions as HF
class HLLFunctions:
"""Collection of functions used in hll transformation"""
@staticmethod
def merge_hll_tuples(
base_tuples: List[Tuple[Union[str, int], ...]],
hll_tuples: List[Tuple[Union[str, int], ...]],
) -> Generator[Tuple[Union[int, str], ...], None, None]:
"""Merges two lists of tuples,
whereby only the third part of hll_tuples (the hll shard)
is appended. The first part of base_tuples is used to collect hll
shards per base"""
# assign start values
hll_baseidx_before = hll_tuples[0]
merged_shard_batch = (hll_baseidx_before[2],)
ix_before = hll_baseidx_before[0]
# start from second element
for hll_tup in hll_tuples[1:]:
hll_shard = hll_tup[2]
ix = hll_tup[0]
if ix == ix_before:
# if base is unchanged (and not the last hll_tuple),
# concat hll shards per base into one tuple
merged_shard_batch += (hll_shard,)
ix_before = ix
# continue with next shard
continue
# else: if base changed,
# append hll_shards to base record
merged_base_hll_tuple = base_tuples[ix_before] + merged_shard_batch
merged_shard_batch = (hll_shard,)
ix_before = ix
# return merged record + hll tuple
# and continue iteration
yield merged_base_hll_tuple
# final return of last item
yield base_tuples[ix] + merged_shard_batch
@staticmethod
def concat_base_shards(
records: List[Tuple[Union[str, int], ...]],
hll_shards: List[Tuple[Union[str, int], ...]],
) -> List[Tuple[Union[int, str], ...]]:
"""Concat records with list of hll_shards
For ease of calculation, hll_shards are reduced to
a onedimensional list. Here, shards are assigned back to
records using record base keys
"""
merged_records = [
merged_tuple
for merged_tuple in HLLFunctions.merge_hll_tuples(records, hll_shards)
]
# if this ever evaluates to false
# something went fundamentally wrong:
# abort processing
assert len(records) == len(merged_records)
return merged_records
@staticmethod
def make_shard_sql(values_str: str) -> str:
"""SQL to calculate HLL Shards from sets of items
Example values_str:
('2011-01-01'::date, 'user_hll', '2:4399297324604'),
"""
insert_sql = f"""
SELECT
base_id,
metric_id,
hll_add_agg(
hll_hash_text(
extensions.crypt_hash(
s.item_value,
current_setting('crypt.salt'))))
FROM (
VALUES {values_str}
) s(base_id, metric_id, item_value)
GROUP BY base_id, metric_id
ORDER BY base_id, metric_id
"""
return insert_sql
@staticmethod
def calculate_item_shards(
hllworker_cursor, values_str: str
) -> List[Tuple[int, str, str]]:
"""Calculates shards from batched hll_items
using hll_worker connection
"""
tsuccessful = False
shard_sql = HLLFunctions.make_shard_sql(values_str)
while not tsuccessful:
try:
hllworker_cursor.execute(shard_sql)
except psycopg2.OperationalError:
input(
"Server closed the connection unexpectedly. Check "
"if your postgres server has enough memory available. "
"Will repeat execution once after input.."
)
hllworker_cursor.execute(shard_sql)
tsuccessful = True
else:
tsuccessful = True
hll_shards = hllworker_cursor.fetchall()
return hll_shards
@staticmethod
def concat_base_metric_item(
base_key: int, hll_items: Dict[str, set]
) -> List[Tuple[int, int, str]]:
"""Concat hll item to (base_key, metric_key, item) tuple
Note that only ids (0, 1, 2..), not the true keys are used
for base_key and metric_key,
following the separation of concerns principle
"""
tuple_list = []
for metric_ix, item_set in enumerate(hll_items.values()):
if not item_set:
tuple_list.append((base_key, metric_ix, None))
else:
for item in item_set:
tuple_list.append((base_key, metric_ix, item))
return tuple_list
@staticmethod
def hll_concat_origin_guid(record) -> str:
"""Concat origin and pkey (guid) to string"""
origin_guid = HLLFunctions.hll_concat(
[record.pkey.origin.origin_id, record.pkey.id]
)
return origin_guid
@staticmethod
def hll_concat_user(record: lbsn.Post) -> str:
"""Concat origin and user pkey (guid) to string"""
user_hll = HLLFunctions.hll_concat(
[record.pkey.origin.origin_id, record.user_pkey.id]
)
return user_hll
@staticmethod
def hll_concat_date(record: lbsn.Post) -> str:
"""Merge create and publish date and concat to string"""
date_merged = HLLFunctions.merge_dates_post(record)
if date_merged is None:
return
date_formatted = date_merged.strftime("%Y-%m-%d")
return date_formatted
@staticmethod
def hll_concat_userday(record: lbsn.Post) -> str:
"""Merge date and user pkey (guid) for measuring user days"""
date_merged = HLLFunctions.hll_concat_date(record)
user_merged = HLLFunctions.hll_concat_user(record)
pud_hll = HLLFunctions.hll_concat([user_merged, date_merged])
return pud_hll
@staticmethod
def hll_concat_yearmonth(record: lbsn.Post) -> str:
"""Concat year and month of post date to string"""
date_merged = HLLFunctions.merge_dates_post(record)
date_formatted = date_merged.strftime("%Y-%m")
return date_formatted
@staticmethod
def hll_concat_latlng(record: lbsn.Post) -> str:
"""Concat post lat lng coordinates to string"""
if record.post_geoaccuracy == "latlng":
coordinates_geom = HF.null_check(record.post_latlng)
coordinates = HF.get_coordinates_from_ewkt(coordinates_geom)
return f"{coordinates.lat}:{coordinates.lng}"
return "0:0"
@staticmethod
def hll_concat_place(record: lbsn.Post) -> str:
"""Concat origin and post place guid to string"""
if record.post_geoaccuracy == "place":
return HLLFunctions.hll_concat(
[record.pkey.origin.origin_id, record.place_pkey.id]
)
return None
@staticmethod
def hll_concat_upt_hll(record: lbsn.Post) -> List[str]:
"""Concat all post terms (body, title, hashtags) and return list"""
body_terms = HF.select_terms(record.post_body)
title_terms = HF.select_terms(record.post_title)
tag_terms = {item.lower() for item in record.hashtags if len(item) > 2}
all_post_terms = set.union(body_terms, title_terms, tag_terms)
user_hll = HLLFunctions.hll_concat_user(record)
upt_hll = HLLFunctions.hll_concat_user_terms(user_hll, all_post_terms)
return upt_hll
@staticmethod
def hll_concat_user_terms(user: str, terms: List[str]) -> Set[str]:
"""Concat user and list of terms to set of unique user:terms"""
concat_user_terms = set()
for term in terms:
concat_user_terms.add(":".join([user, term]))
return concat_user_terms
@staticmethod
def hll_concat(record_attr: List[Union[str, int]]) -> str:
"""Concat record attributes using colon"""
return ":".join(map(str, record_attr))
@staticmethod
def merge_dates_post(record: lbsn.Post = None) -> Optional[dt.datetime]:
"""Merge post_publish and post_created attributes"""
post_create_date = HF.null_check_datetime(record.post_create_date)
post_publish_date = HF.null_check_datetime(record.post_publish_date)
if post_create_date is None:
return post_publish_date
return post_create_date
Classes
class HLLFunctions
-
Collection of functions used in hll transformation
Expand source code
class HLLFunctions: """Collection of functions used in hll transformation""" @staticmethod def merge_hll_tuples( base_tuples: List[Tuple[Union[str, int], ...]], hll_tuples: List[Tuple[Union[str, int], ...]], ) -> Generator[Tuple[Union[int, str], ...], None, None]: """Merges two lists of tuples, whereby only the third part of hll_tuples (the hll shard) is appended. The first part of base_tuples is used to collect hll shards per base""" # assign start values hll_baseidx_before = hll_tuples[0] merged_shard_batch = (hll_baseidx_before[2],) ix_before = hll_baseidx_before[0] # start from second element for hll_tup in hll_tuples[1:]: hll_shard = hll_tup[2] ix = hll_tup[0] if ix == ix_before: # if base is unchanged (and not the last hll_tuple), # concat hll shards per base into one tuple merged_shard_batch += (hll_shard,) ix_before = ix # continue with next shard continue # else: if base changed, # append hll_shards to base record merged_base_hll_tuple = base_tuples[ix_before] + merged_shard_batch merged_shard_batch = (hll_shard,) ix_before = ix # return merged record + hll tuple # and continue iteration yield merged_base_hll_tuple # final return of last item yield base_tuples[ix] + merged_shard_batch @staticmethod def concat_base_shards( records: List[Tuple[Union[str, int], ...]], hll_shards: List[Tuple[Union[str, int], ...]], ) -> List[Tuple[Union[int, str], ...]]: """Concat records with list of hll_shards For ease of calculation, hll_shards are reduced to a onedimensional list. Here, shards are assigned back to records using record base keys """ merged_records = [ merged_tuple for merged_tuple in HLLFunctions.merge_hll_tuples(records, hll_shards) ] # if this ever evaluates to false # something went fundamentally wrong: # abort processing assert len(records) == len(merged_records) return merged_records @staticmethod def make_shard_sql(values_str: str) -> str: """SQL to calculate HLL Shards from sets of items Example values_str: ('2011-01-01'::date, 'user_hll', '2:4399297324604'), """ insert_sql = f""" SELECT base_id, metric_id, hll_add_agg( hll_hash_text( extensions.crypt_hash( s.item_value, current_setting('crypt.salt')))) FROM ( VALUES {values_str} ) s(base_id, metric_id, item_value) GROUP BY base_id, metric_id ORDER BY base_id, metric_id """ return insert_sql @staticmethod def calculate_item_shards( hllworker_cursor, values_str: str ) -> List[Tuple[int, str, str]]: """Calculates shards from batched hll_items using hll_worker connection """ tsuccessful = False shard_sql = HLLFunctions.make_shard_sql(values_str) while not tsuccessful: try: hllworker_cursor.execute(shard_sql) except psycopg2.OperationalError: input( "Server closed the connection unexpectedly. Check " "if your postgres server has enough memory available. " "Will repeat execution once after input.." ) hllworker_cursor.execute(shard_sql) tsuccessful = True else: tsuccessful = True hll_shards = hllworker_cursor.fetchall() return hll_shards @staticmethod def concat_base_metric_item( base_key: int, hll_items: Dict[str, set] ) -> List[Tuple[int, int, str]]: """Concat hll item to (base_key, metric_key, item) tuple Note that only ids (0, 1, 2..), not the true keys are used for base_key and metric_key, following the separation of concerns principle """ tuple_list = [] for metric_ix, item_set in enumerate(hll_items.values()): if not item_set: tuple_list.append((base_key, metric_ix, None)) else: for item in item_set: tuple_list.append((base_key, metric_ix, item)) return tuple_list @staticmethod def hll_concat_origin_guid(record) -> str: """Concat origin and pkey (guid) to string""" origin_guid = HLLFunctions.hll_concat( [record.pkey.origin.origin_id, record.pkey.id] ) return origin_guid @staticmethod def hll_concat_user(record: lbsn.Post) -> str: """Concat origin and user pkey (guid) to string""" user_hll = HLLFunctions.hll_concat( [record.pkey.origin.origin_id, record.user_pkey.id] ) return user_hll @staticmethod def hll_concat_date(record: lbsn.Post) -> str: """Merge create and publish date and concat to string""" date_merged = HLLFunctions.merge_dates_post(record) if date_merged is None: return date_formatted = date_merged.strftime("%Y-%m-%d") return date_formatted @staticmethod def hll_concat_userday(record: lbsn.Post) -> str: """Merge date and user pkey (guid) for measuring user days""" date_merged = HLLFunctions.hll_concat_date(record) user_merged = HLLFunctions.hll_concat_user(record) pud_hll = HLLFunctions.hll_concat([user_merged, date_merged]) return pud_hll @staticmethod def hll_concat_yearmonth(record: lbsn.Post) -> str: """Concat year and month of post date to string""" date_merged = HLLFunctions.merge_dates_post(record) date_formatted = date_merged.strftime("%Y-%m") return date_formatted @staticmethod def hll_concat_latlng(record: lbsn.Post) -> str: """Concat post lat lng coordinates to string""" if record.post_geoaccuracy == "latlng": coordinates_geom = HF.null_check(record.post_latlng) coordinates = HF.get_coordinates_from_ewkt(coordinates_geom) return f"{coordinates.lat}:{coordinates.lng}" return "0:0" @staticmethod def hll_concat_place(record: lbsn.Post) -> str: """Concat origin and post place guid to string""" if record.post_geoaccuracy == "place": return HLLFunctions.hll_concat( [record.pkey.origin.origin_id, record.place_pkey.id] ) return None @staticmethod def hll_concat_upt_hll(record: lbsn.Post) -> List[str]: """Concat all post terms (body, title, hashtags) and return list""" body_terms = HF.select_terms(record.post_body) title_terms = HF.select_terms(record.post_title) tag_terms = {item.lower() for item in record.hashtags if len(item) > 2} all_post_terms = set.union(body_terms, title_terms, tag_terms) user_hll = HLLFunctions.hll_concat_user(record) upt_hll = HLLFunctions.hll_concat_user_terms(user_hll, all_post_terms) return upt_hll @staticmethod def hll_concat_user_terms(user: str, terms: List[str]) -> Set[str]: """Concat user and list of terms to set of unique user:terms""" concat_user_terms = set() for term in terms: concat_user_terms.add(":".join([user, term])) return concat_user_terms @staticmethod def hll_concat(record_attr: List[Union[str, int]]) -> str: """Concat record attributes using colon""" return ":".join(map(str, record_attr)) @staticmethod def merge_dates_post(record: lbsn.Post = None) -> Optional[dt.datetime]: """Merge post_publish and post_created attributes""" post_create_date = HF.null_check_datetime(record.post_create_date) post_publish_date = HF.null_check_datetime(record.post_publish_date) if post_create_date is None: return post_publish_date return post_create_date
Static methods
def calculate_item_shards(hllworker_cursor, values_str: str) ‑> List[Tuple[int, str, str]]
-
Calculates shards from batched hll_items using hll_worker connection
Expand source code
@staticmethod def calculate_item_shards( hllworker_cursor, values_str: str ) -> List[Tuple[int, str, str]]: """Calculates shards from batched hll_items using hll_worker connection """ tsuccessful = False shard_sql = HLLFunctions.make_shard_sql(values_str) while not tsuccessful: try: hllworker_cursor.execute(shard_sql) except psycopg2.OperationalError: input( "Server closed the connection unexpectedly. Check " "if your postgres server has enough memory available. " "Will repeat execution once after input.." ) hllworker_cursor.execute(shard_sql) tsuccessful = True else: tsuccessful = True hll_shards = hllworker_cursor.fetchall() return hll_shards
def concat_base_metric_item(base_key: int, hll_items: Dict[str, set]) ‑> List[Tuple[int, int, str]]
-
Concat hll item to (base_key, metric_key, item) tuple Note that only ids (0, 1, 2..), not the true keys are used for base_key and metric_key, following the separation of concerns principle
Expand source code
@staticmethod def concat_base_metric_item( base_key: int, hll_items: Dict[str, set] ) -> List[Tuple[int, int, str]]: """Concat hll item to (base_key, metric_key, item) tuple Note that only ids (0, 1, 2..), not the true keys are used for base_key and metric_key, following the separation of concerns principle """ tuple_list = [] for metric_ix, item_set in enumerate(hll_items.values()): if not item_set: tuple_list.append((base_key, metric_ix, None)) else: for item in item_set: tuple_list.append((base_key, metric_ix, item)) return tuple_list
def concat_base_shards(records: List[Tuple[Union[str, int], ...]], hll_shards: List[Tuple[Union[str, int], ...]]) ‑> List[Tuple[Union[str, int], ...]]
-
Concat records with list of hll_shards
For ease of calculation, hll_shards are reduced to a onedimensional list. Here, shards are assigned back to records using record base keys
Expand source code
@staticmethod def concat_base_shards( records: List[Tuple[Union[str, int], ...]], hll_shards: List[Tuple[Union[str, int], ...]], ) -> List[Tuple[Union[int, str], ...]]: """Concat records with list of hll_shards For ease of calculation, hll_shards are reduced to a onedimensional list. Here, shards are assigned back to records using record base keys """ merged_records = [ merged_tuple for merged_tuple in HLLFunctions.merge_hll_tuples(records, hll_shards) ] # if this ever evaluates to false # something went fundamentally wrong: # abort processing assert len(records) == len(merged_records) return merged_records
def hll_concat(record_attr: List[Union[str, int]]) ‑> str
-
Concat record attributes using colon
Expand source code
@staticmethod def hll_concat(record_attr: List[Union[str, int]]) -> str: """Concat record attributes using colon""" return ":".join(map(str, record_attr))
def hll_concat_date(record: lbsnstructure.topical_pb2.Post) ‑> str
-
Merge create and publish date and concat to string
Expand source code
@staticmethod def hll_concat_date(record: lbsn.Post) -> str: """Merge create and publish date and concat to string""" date_merged = HLLFunctions.merge_dates_post(record) if date_merged is None: return date_formatted = date_merged.strftime("%Y-%m-%d") return date_formatted
def hll_concat_latlng(record: lbsnstructure.topical_pb2.Post) ‑> str
-
Concat post lat lng coordinates to string
Expand source code
@staticmethod def hll_concat_latlng(record: lbsn.Post) -> str: """Concat post lat lng coordinates to string""" if record.post_geoaccuracy == "latlng": coordinates_geom = HF.null_check(record.post_latlng) coordinates = HF.get_coordinates_from_ewkt(coordinates_geom) return f"{coordinates.lat}:{coordinates.lng}" return "0:0"
def hll_concat_origin_guid(record) ‑> str
-
Concat origin and pkey (guid) to string
Expand source code
@staticmethod def hll_concat_origin_guid(record) -> str: """Concat origin and pkey (guid) to string""" origin_guid = HLLFunctions.hll_concat( [record.pkey.origin.origin_id, record.pkey.id] ) return origin_guid
def hll_concat_place(record: lbsnstructure.topical_pb2.Post) ‑> str
-
Concat origin and post place guid to string
Expand source code
@staticmethod def hll_concat_place(record: lbsn.Post) -> str: """Concat origin and post place guid to string""" if record.post_geoaccuracy == "place": return HLLFunctions.hll_concat( [record.pkey.origin.origin_id, record.place_pkey.id] ) return None
def hll_concat_upt_hll(record: lbsnstructure.topical_pb2.Post) ‑> List[str]
-
Concat all post terms (body, title, hashtags) and return list
Expand source code
@staticmethod def hll_concat_upt_hll(record: lbsn.Post) -> List[str]: """Concat all post terms (body, title, hashtags) and return list""" body_terms = HF.select_terms(record.post_body) title_terms = HF.select_terms(record.post_title) tag_terms = {item.lower() for item in record.hashtags if len(item) > 2} all_post_terms = set.union(body_terms, title_terms, tag_terms) user_hll = HLLFunctions.hll_concat_user(record) upt_hll = HLLFunctions.hll_concat_user_terms(user_hll, all_post_terms) return upt_hll
def hll_concat_user(record: lbsnstructure.topical_pb2.Post) ‑> str
-
Concat origin and user pkey (guid) to string
Expand source code
@staticmethod def hll_concat_user(record: lbsn.Post) -> str: """Concat origin and user pkey (guid) to string""" user_hll = HLLFunctions.hll_concat( [record.pkey.origin.origin_id, record.user_pkey.id] ) return user_hll
def hll_concat_user_terms(user: str, terms: List[str]) ‑> Set[str]
-
Concat user and list of terms to set of unique user:terms
Expand source code
@staticmethod def hll_concat_user_terms(user: str, terms: List[str]) -> Set[str]: """Concat user and list of terms to set of unique user:terms""" concat_user_terms = set() for term in terms: concat_user_terms.add(":".join([user, term])) return concat_user_terms
def hll_concat_userday(record: lbsnstructure.topical_pb2.Post) ‑> str
-
Merge date and user pkey (guid) for measuring user days
Expand source code
@staticmethod def hll_concat_userday(record: lbsn.Post) -> str: """Merge date and user pkey (guid) for measuring user days""" date_merged = HLLFunctions.hll_concat_date(record) user_merged = HLLFunctions.hll_concat_user(record) pud_hll = HLLFunctions.hll_concat([user_merged, date_merged]) return pud_hll
def hll_concat_yearmonth(record: lbsnstructure.topical_pb2.Post) ‑> str
-
Concat year and month of post date to string
Expand source code
@staticmethod def hll_concat_yearmonth(record: lbsn.Post) -> str: """Concat year and month of post date to string""" date_merged = HLLFunctions.merge_dates_post(record) date_formatted = date_merged.strftime("%Y-%m") return date_formatted
def make_shard_sql(values_str: str) ‑> str
-
SQL to calculate HLL Shards from sets of items
Example values_str: ('2011-01-01'::date, 'user_hll', '2:4399297324604'),
Expand source code
@staticmethod def make_shard_sql(values_str: str) -> str: """SQL to calculate HLL Shards from sets of items Example values_str: ('2011-01-01'::date, 'user_hll', '2:4399297324604'), """ insert_sql = f""" SELECT base_id, metric_id, hll_add_agg( hll_hash_text( extensions.crypt_hash( s.item_value, current_setting('crypt.salt')))) FROM ( VALUES {values_str} ) s(base_id, metric_id, item_value) GROUP BY base_id, metric_id ORDER BY base_id, metric_id """ return insert_sql
def merge_dates_post(record: lbsnstructure.topical_pb2.Post = None) ‑> Optional[datetime.datetime]
-
Merge post_publish and post_created attributes
Expand source code
@staticmethod def merge_dates_post(record: lbsn.Post = None) -> Optional[dt.datetime]: """Merge post_publish and post_created attributes""" post_create_date = HF.null_check_datetime(record.post_create_date) post_publish_date = HF.null_check_datetime(record.post_publish_date) if post_create_date is None: return post_publish_date return post_create_date
def merge_hll_tuples(base_tuples: List[Tuple[Union[str, int], ...]], hll_tuples: List[Tuple[Union[str, int], ...]]) ‑> Generator[Tuple[Union[str, int], ...], None, None]
-
Merges two lists of tuples, whereby only the third part of hll_tuples (the hll shard) is appended. The first part of base_tuples is used to collect hll shards per base
Expand source code
@staticmethod def merge_hll_tuples( base_tuples: List[Tuple[Union[str, int], ...]], hll_tuples: List[Tuple[Union[str, int], ...]], ) -> Generator[Tuple[Union[int, str], ...], None, None]: """Merges two lists of tuples, whereby only the third part of hll_tuples (the hll shard) is appended. The first part of base_tuples is used to collect hll shards per base""" # assign start values hll_baseidx_before = hll_tuples[0] merged_shard_batch = (hll_baseidx_before[2],) ix_before = hll_baseidx_before[0] # start from second element for hll_tup in hll_tuples[1:]: hll_shard = hll_tup[2] ix = hll_tup[0] if ix == ix_before: # if base is unchanged (and not the last hll_tuple), # concat hll shards per base into one tuple merged_shard_batch += (hll_shard,) ix_before = ix # continue with next shard continue # else: if base changed, # append hll_shards to base record merged_base_hll_tuple = base_tuples[ix_before] + merged_shard_batch merged_shard_batch = (hll_shard,) ix_before = ix # return merged record + hll tuple # and continue iteration yield merged_base_hll_tuple # final return of last item yield base_tuples[ix] + merged_shard_batch