Module lbsntransform.output.submit_data
Module for storing common Proto LBSN Structure to PG DB.
Expand source code
# -*- coding: utf-8 -*-
"""
Module for storing common Proto LBSN Structure to PG DB.
"""
# pylint: disable=no-member
import traceback
import logging
import sys
from typing import Any, Dict, List, Tuple, Union, Optional
import psycopg2
import lbsnstructure as lbsn
from lbsntransform.tools.helper_functions import HelperFunctions as HF
from lbsntransform.output.csv.store_csv import LBSNcsv
from lbsntransform.output.hll import hll_bases as hll
from lbsntransform.output.hll.base import social, spatial, temporal, topical
from lbsntransform.output.hll.hll_functions import HLLFunctions as HLF
from lbsntransform.output.hll.shared_structure_proto_hlldb import ProtoHLLMapping
from lbsntransform.output.hll.sql_hll import HLLSql
from lbsntransform.output.lbsn.shared_structure_proto_lbsndb import ProtoLBSNMapping
from lbsntransform.output.lbsn.sql_lbsn import LBSNSql
class LBSNTransfer:
"""Transfer converted lbsn records
to various output formats (CSV, raw-db, hll-db)
"""
def __init__(
self,
db_cursor=None,
db_connection=None,
commit_volume=None,
disable_reaction_post_ref=0,
store_csv=None,
SUPPRESS_LINEBREAKS=True,
dbformat_output="lbsn",
hllworker_cursor=None,
include_lbsn_bases=None,
dry_run: Optional[bool] = None,
):
self.db_cursor = db_cursor
self.db_connection = db_connection
self.dry_run = dry_run
if not self.db_cursor:
print("CSV Output Mode.")
self.count_entries_commit = 0
self.count_entries_store = 0
self.count_affected = 0
if store_csv:
dbformat_output = "lbsn"
if commit_volume is None:
# due to more output bases
# increase commit volume on hll output
if dbformat_output == "lbsn":
commit_volume = 10000
else:
commit_volume = 100000
self.commit_volume = commit_volume
self.store_volume = 500000
self.count_glob = 0
self.null_island_count = 0
self.disable_reaction_post_ref = disable_reaction_post_ref
self.log = logging.getLogger("__main__")
self.batched_lbsn_records = {
lbsn.Origin.DESCRIPTOR.name: list(),
lbsn.Country.DESCRIPTOR.name: list(),
lbsn.City.DESCRIPTOR.name: list(),
lbsn.Place.DESCRIPTOR.name: list(),
lbsn.User.DESCRIPTOR.name: list(),
lbsn.UserGroup.DESCRIPTOR.name: list(),
lbsn.Post.DESCRIPTOR.name: list(),
lbsn.PostReaction.DESCRIPTOR.name: list(),
lbsn.Relationship.DESCRIPTOR.name: list(),
}
# dynamially register base classes from base module
hll.register_classes()
# this is the global dict of measures that are currently supported,
# bases not registered here will not be measured
self.batched_hll_records = {
spatial.LatLngBase.NAME: dict(),
spatial.PlaceBase.NAME: dict(),
temporal.DateBase.NAME: dict(),
temporal.MonthBase.NAME: dict(),
temporal.YearBase.NAME: dict(),
temporal.MonthLatLngBase.NAME: dict(),
temporal.MonthHashtagBase.NAME: dict(),
temporal.MonthHashtagLatLngBase.NAME: dict(),
topical.TermBase.NAME: dict(),
topical.HashtagBase.NAME: dict(),
topical.EmojiBase.NAME: dict(),
topical.TermLatLngBase.NAME: dict(),
topical.HashtagLatLngBase.NAME: dict(),
topical.EmojiLatLngBase.NAME: dict(),
social.CommunityBase.NAME: dict(),
}
self.count_round = 0
if dbformat_output == "lbsn":
batch_db_volume = 100
else:
# increase for less complex but
# higher quantity of hll records
batch_db_volume = 20000
# Records are batched and submitted in
# one insert with x number of records
self.batch_db_volume = batch_db_volume
self.store_csv = store_csv
self.headers_written = set()
# self.CSVsuppressLinebreaks = CSVsuppressLinebreaks
self.dbformat_output = dbformat_output
if self.dbformat_output == "lbsn":
self.db_mapping = ProtoLBSNMapping()
else:
if include_lbsn_bases is None:
include_lbsn_bases = []
self.db_mapping = ProtoHLLMapping(include_lbsn_bases=include_lbsn_bases)
self.hllworker_cursor = hllworker_cursor
if self.store_csv:
self.csv_output = LBSNcsv(SUPPRESS_LINEBREAKS)
def commit_changes(self):
"""Commit Changes to DB"""
if self.db_cursor:
self.db_connection.commit() #
self.count_entries_commit = 0
def store_changes(self):
"""Write changes to CSV"""
if self.store_csv:
raise NotImplementedError("CSV Output curently not supported")
# self.csv_output.clean_csv_batches(
# self.batched_lbsn_records, self.dry_run)
# self.count_entries_store = 0
def store_origin(self, origin_id, name):
"""Store origin of input source sql"""
if self.dry_run:
return
if self.store_csv:
origin = lbsn.Origin()
origin.origin_id = origin_id
self.csv_output.store_append_batch_to_csv(
[origin], 0, lbsn.Origin.DESCRIPTOR.name
)
return
insert_sql = f"""
INSERT INTO social."origin" (
origin_id, name)
VALUES ({origin_id},'{name}')
ON CONFLICT (origin_id)
DO NOTHING
"""
self.db_cursor.execute(insert_sql)
def store_lbsn_record_dicts(self, lbsn_record_dicts):
"""Main loop for storing lbsn records to CSV or DB
Arguments:
field_mapping {field mapping class} -- Import Field mapping class
with attached data
order is important here, as PostGres will reject any
records where Foreign Keys are violated
therefore, records are processed starting from lowest
granularity. Order is stored in all_dicts()
"""
self.count_round += 1
# self.headersWritten.clear()
r_cnt = 0
self.count_affected = 0
g_cnt = lbsn_record_dicts.get_current_count()
# clear line
sys.stdout.write("\033[K")
for record, type_name in lbsn_record_dicts.get_all_records():
r_cnt += 1
print(
f"Converting {r_cnt} of {g_cnt} " f"lbsn records ({type_name})..",
end="\r",
)
self.prepare_lbsn_record(record, type_name)
self.count_glob += 1 # self.dbCursor.rowcount
self.count_entries_commit += 1 # self.dbCursor.rowcount
self.count_entries_store += 1
if self.db_cursor and (
self.count_glob == 100 or self.count_entries_commit > self.commit_volume
):
self.commit_changes()
if self.store_csv and (self.count_entries_store > self.store_volume):
self.store_changes()
# submit remaining rest
self.submit_all_batches()
# self.count_affected += x # monitoring
print(
f"\nRound {self.count_round:03d}: "
f"Updated/Inserted {self.count_glob} records."
)
def prepare_lbsn_record(self, record, record_type):
"""Prepare batched records for submit to either LBSN or HLL db"""
# clean duplicates in repeated Fields and Sort List
self.sort_clean_proto_repeated_field(record)
# store cleaned ProtoBuf records
# LBSN or HLL output
if self.dbformat_output == "lbsn":
self.batched_lbsn_records[record_type].append(record)
else:
# extract hll bases and metric from records
hll_base_metrics = self.db_mapping.extract_hll_base_metrics(
record, record_type
)
if hll_base_metrics is None:
# no base metrics extracted
return
# update hll dicts
self.db_mapping.update_hll_dicts(self.batched_hll_records, hll_base_metrics)
# check batched records (and submit)
self.check_batchvolume_submit()
def check_batchvolume_submit(self):
"""If any dict contains more values than self.batch_db_volume,
submit/store all
"""
if self.dbformat_output == "lbsn":
batch_lists = self.batched_lbsn_records.values()
else:
batch_lists = self.batched_hll_records.values()
for batch_list in batch_lists:
if len(batch_list) >= self.batch_db_volume:
self.submit_all_batches()
def submit_all_batches(self):
"""Hook to submit either lbsn or hll records"""
if self.dbformat_output == "lbsn":
self.submit_batches(self.batched_lbsn_records)
else:
self.submit_batches(self.batched_hll_records)
def submit_batches(
self,
batched_records: Union[
Dict[str, List[str]], Dict[Tuple[str, str], Dict[str, Any]]
],
):
"""Prepare values for each batch, format sql and submit to db"""
for record_type, batch_item in batched_records.items():
if batch_item:
# if self.storeCSV and not record_type in self.headersWritten:
# self.writeCSVHeader(record_type)
# self.headersWritten.add(record_type)
func_select = self.get_prepare_records_func(self.dbformat_output)
prepared_records = func_select(batch_item)
self.submit_records(record_type, prepared_records)
batch_item.clear()
def get_prepare_records_func(self, dbformat_outpur: str):
"""Selector function to get prepared records (hll/lbsn)"""
if dbformat_outpur == "lbsn":
return self.get_prepared_lbsn_records
# hll
return self.get_prepared_hll_records
def get_prepared_lbsn_records(self, batch_item: List[Any]):
"""Turns proprietary lbsn classes into prepared sql value tuples
For hll output, this includes calculation of
shards from individual items using the hll_worker
"""
prepared_records = []
for record_item in batch_item:
try:
prepared_record = self.db_mapping.func_prepare_selector(record_item)
except Exception:
print(f"Could not process record:\n" f"{record_item}")
track = traceback.format_exc()
print(f"Traceback: {track}")
sys.exit(1)
if prepared_record:
prepared_records.append(prepared_record)
return prepared_records
def prepare_hmac(self, hmac_key: str = None):
"""Update the session hmac key that is used to apply cryptographic hashing
Defaults to empty string, if no --hmac_key is provided and crypt.salt
is not set in worker db.
"""
if hmac_key is None:
logging.getLogger("__main__").warn(
"Use of empty hmac_key: For production use, please "
"make sure that either `crypt.salt` is set on your worker db, "
"or provide the --hmac_key."
)
self.hllworker_cursor.execute(
"""
SELECT set_config(
'crypt.salt',
COALESCE(current_setting('crypt.salt', 't'), ''),
false);
"""
)
else:
self.hllworker_cursor.execute("SET crypt.salt = %s", hmac_key)
hmac_func_sql = HLLSql.get_hmac_hash_sql()
self.hllworker_cursor.execute(hmac_func_sql)
def get_prepared_hll_records(self, batch_item: Dict[str, Any]):
"""Turns propietary hll classes into prepared sql value tuples
This includes calculation of shards from individual items
using the hll_worker
"""
hll_items = [] # (base_key, metric_key, item)
hll_base_records = [] # (base_key, attr1, attr2)
# the following iteration will
# loop keys in case of dict
# and values in case of list
for index, record_item in enumerate(batch_item.values()):
# get base record and value
base = record_item.get_prepared_record()
if base.record:
hll_base_records.append(base.record)
base_metric_item_tuples = HLF.concat_base_metric_item(
index, base.metrics
)
# format tuple-values as sql-escaped strings
value_str = [
self.prepare_sqlescaped_values(record)
for record in base_metric_item_tuples
]
# add to global list of items to be upserted
hll_items.extend(value_str)
# format sql for shard generation
# get sql escaped values list
values_str = HF.concat_values_str(hll_items)
# clear line
sys.stdout.write("\033[K")
print(f"Calculating hll shards for {len(values_str)} values..", end="\r")
# calculate hll shards from raw values
hll_shards = HLF.calculate_item_shards(self.hllworker_cursor, values_str)
prepared_records = HLF.concat_base_shards(hll_base_records, hll_shards)
return prepared_records
def submit_records(
self,
record_type: Union[Tuple[str, str], str],
prepared_records: List[Tuple[Any]],
):
"""Submit/save prepared records to db or csv"""
if self.store_csv:
if self.dry_run:
return
self.csv_output.store_append_batch_to_csv(
self.batched_lbsn_records[record_type], self.count_round, record_type
)
if self.db_cursor:
# get sql escaped values list
sql_escaped_values_list = [
self.prepare_sqlescaped_values(record) for record in prepared_records
]
# concat to single sql str
values_str = HF.concat_values_str(sql_escaped_values_list)
insert_sql = self.insert_sql_selector(values_str, record_type)
# clear line
sys.stdout.write("\033[K")
print(f"Submitting {len(prepared_records)}..", end="\r")
self.submit_batch(insert_sql)
def insert_sql_selector(self, values_str: str, record_type):
"""Select function to prepare SQL insert.
Attributes:
record_type type of record
values_str values to be inserted
"""
if self.dbformat_output == "lbsn":
sql_selector = LBSNSql.type_sql_mapper()
prepare_function = sql_selector.get(record_type)
else:
# hll
prepare_function = HLLSql.hll_insertsql
return prepare_function(values_str, record_type)
def submit_lbsn_relationships(self):
"""submit relationships of different types
record[1] is the PostgresQL formatted list of values,
record[0] is the type of relationship that determines
the table selection
"""
select_friends = [
relationship[1]
for relationship in self.batched_lbsn_records[
lbsn.Relationship().DESCRIPTOR.name
]
if relationship[0] == "isfriend"
]
if select_friends:
if self.store_csv:
self.csv_output.store_append_batch_to_csv(
select_friends, self.count_round, "_user_friends_user"
)
if self.db_cursor:
args_isfriend = ",".join(select_friends)
insert_sql = f"""
INSERT INTO interlinkage."_user_friends_user" (
{self.typeNamesHeaderDict["_user_friends_user"]})
VALUES {args_isfriend}
ON CONFLICT (origin_id, user_guid, friend_guid)
DO NOTHING
"""
self.submit_batch(insert_sql)
select_connected = [
relationship[1]
for relationship in self.batched_lbsn_records[
lbsn.Relationship().DESCRIPTOR.name
]
if relationship[0] == "isconnected"
]
if select_connected:
if self.store_csv:
self.csv_output.store_append_batch_to_csv(
select_connected, self.count_round, "_user_connectsto_user"
)
if self.db_cursor:
args_isconnected = ",".join(select_connected)
insert_sql = f"""
INSERT INTO interlinkage."_user_connectsto_user" (
{self.typeNamesHeaderDict["_user_connectsto_user"]})
VALUES {args_isconnected}
ON CONFLICT (origin_id, user_guid,
connectedto_user_guid)
DO NOTHING
"""
self.submit_batch(insert_sql)
select_usergroupmember = [
relationship[1]
for relationship in self.batched_lbsn_records[
lbsn.Relationship().DESCRIPTOR.name
]
if relationship[0] == "ingroup"
]
if select_usergroupmember:
if self.store_csv:
self.csv_output.store_append_batch_to_csv(
select_usergroupmember, self.count_round, "_user_memberof_group"
)
if self.db_cursor:
args_isingroup = ",".join(select_usergroupmember)
insert_sql = f"""
INSERT INTO interlinkage."_user_memberof_group" (
{self.typeNamesHeaderDict["_user_memberof_group"]})
VALUES {args_isingroup}
ON CONFLICT (origin_id, user_guid, group_guid)
DO NOTHING
"""
self.submit_batch(insert_sql)
select_usergroupmember = [
relationship[1]
for relationship in self.batched_lbsn_records[
lbsn.Relationship().DESCRIPTOR.name
]
if relationship[0] == "followsgroup"
]
if select_usergroupmember:
if self.store_csv:
self.csv_output.store_append_batch_to_csv(
select_usergroupmember, self.count_round, "_user_follows_group"
)
if self.db_cursor:
args_isingroup = ",".join(select_usergroupmember)
insert_sql = f"""
INSERT INTO interlinkage."_user_follows_group" (
{self.typeNamesHeaderDict["_user_follows_group"]})
VALUES {args_isingroup}
ON CONFLICT (origin_id, user_guid, group_guid)
DO NOTHING
"""
self.submit_batch(insert_sql)
select_usermentions = [
relationship[1]
for relationship in self.batched_lbsn_records[
lbsn.Relationship().DESCRIPTOR.name
]
if relationship[0] == "mentions_user"
]
if select_usermentions:
if self.store_csv:
self.csv_output.store_append_batch_to_csv(
select_usermentions, self.count_round, "_user_mentions_user"
)
if self.db_cursor:
args_isingroup = ",".join(select_usermentions)
insert_sql = f"""
INSERT INTO interlinkage."_user_mentions_user" (
{self.typeNamesHeaderDict["_user_mentions_user"]})
VALUES {args_isingroup}
ON CONFLICT (origin_id, user_guid, mentioneduser_guid)
DO NOTHING
"""
self.submit_batch(insert_sql)
def submit_batch(self, insert_sql):
"""Submit Batch to PG DB.
Needs testing: is using Savepoint for each insert slower
than rolling back entire commit?
for performance, see https://stackoverflow.com/questions/
12206600/how-to-speed-up-insertion-performance-in-postgresql
or this: https://stackoverflow.com/questions/8134602/
psycopg2-insert-multiple-rows-with-one-query
"""
tsuccessful = False
if self.dry_run:
return
self.db_cursor.execute("SAVEPOINT submit_recordBatch")
while not tsuccessful:
try:
self.db_cursor.execute(insert_sql)
except psycopg2.IntegrityError as einteg:
if (
"(post_language)" in einteg.diag.message_detail
or "(user_language)" in einteg.diag.message_detail
):
# If language does not exist, we'll trust Twitter
# and add this to our language list
missingLanguage = einteg.diag.message_detail.partition(
"language)=("
)[2].partition(") is not present")[0]
print(
f'TransactionIntegrityError, inserting language "'
f'{missingLanguage}" first.. '
)
# self.db_cursor.rollback()
self.db_cursor.execute("ROLLBACK TO SAVEPOINT submit_recordBatch")
insert_language_sql = """
INSERT INTO social."language"
(language_short, language_name, language_name_de)
VALUES (%s,NULL,NULL);
"""
# submit sql to db
self.db_cursor.execute(insert_language_sql, (missingLanguage,))
# commit changes so they're available when
# try is executed again
self.commit_changes()
# recreate SAVEPOINT after language insert
self.db_cursor.execute("SAVEPOINT submit_recordBatch")
else:
sys.exit(f"{einteg}")
except psycopg2.DataError as edata:
sys.exit(f"{edata}\nINSERT SQL WAS: {insert_sql}")
except ValueError as evalue:
self.log.warning(f"{evalue}")
input("Press Enter to continue... (entry will be skipped)")
self.log.warning(f"{insert_sql}")
input("args:... ")
# self.log.warning(f'{args_str}')
self.db_cursor.execute("ROLLBACK TO SAVEPOINT submit_recordBatch")
tsuccessful = True
except psycopg2.ProgrammingError as epsyco:
file = open("hll_exc.txt", "w")
file.write(f"{epsyco}\nINSERT SQL WAS: {insert_sql}")
file.close() # This close() is important
sys.exit(f"{epsyco}\nINSERT SQL WAS: {insert_sql}")
except psycopg2.errors.DiskFull:
input("Disk space full. Clean files and continue..")
else:
# executed if the try clause does not raise an exception
self.db_cursor.execute("RELEASE SAVEPOINT submit_recordBatch")
tsuccessful = True
def prepare_sqlescaped_values(self, *args):
"""dynamically construct sql value injection
e.g. record_sql = '''(%s,%s,%s,%s,%s,%s,%s)'''
"""
record_sql = f"""{','.join('%s' for x in range(0, len(args)))}"""
# inject values
prepared_sql_record = self.db_cursor.mogrify(record_sql, tuple(args))
# mogrify returns a byte object,
# we decode it so it can be used as a string again
prepared_sql_record = prepared_sql_record.decode()
return prepared_sql_record
@classmethod
def sort_clean_proto_repeated_field(cls, record):
"""Remove duplicate values in repeated field, sort alphabetically
ProtocolBuffers has no unique list field type. This function will
remove duplicates from lists, which is needed for unique compare.
"""
for descriptor in record.DESCRIPTOR.fields:
if descriptor.label == descriptor.LABEL_REPEATED:
x_attr = getattr(record, descriptor.name)
if x_attr and not len(x_attr) == 1:
try:
x_attr_cleaned = set(x_attr)
except TypeError:
# needed to catch
# TypeError for unhashable type: 'CompositeKey'
# (lazy-initialized)
continue
x_attr_sorted = sorted(x_attr_cleaned)
# Complete clear of repeated field
for _ in range(0, len(x_attr)):
x_attr.pop()
# add sorted list
x_attr.extend(x_attr_sorted)
def finalize(self):
"""Final procedure calls:
- clean and merge csv batches
"""
if self.store_csv:
raise NotImplementedError("CSV Output curently not supported")
# self.csv_output.clean_csv_batches(
# self.batched_lbsn_records, self.dry_run)
Classes
class LBSNTransfer (db_cursor=None, db_connection=None, commit_volume=None, disable_reaction_post_ref=0, store_csv=None, SUPPRESS_LINEBREAKS=True, dbformat_output='lbsn', hllworker_cursor=None, include_lbsn_bases=None, dry_run: Optional[bool] = None)
-
Transfer converted lbsn records to various output formats (CSV, raw-db, hll-db)
Expand source code
class LBSNTransfer: """Transfer converted lbsn records to various output formats (CSV, raw-db, hll-db) """ def __init__( self, db_cursor=None, db_connection=None, commit_volume=None, disable_reaction_post_ref=0, store_csv=None, SUPPRESS_LINEBREAKS=True, dbformat_output="lbsn", hllworker_cursor=None, include_lbsn_bases=None, dry_run: Optional[bool] = None, ): self.db_cursor = db_cursor self.db_connection = db_connection self.dry_run = dry_run if not self.db_cursor: print("CSV Output Mode.") self.count_entries_commit = 0 self.count_entries_store = 0 self.count_affected = 0 if store_csv: dbformat_output = "lbsn" if commit_volume is None: # due to more output bases # increase commit volume on hll output if dbformat_output == "lbsn": commit_volume = 10000 else: commit_volume = 100000 self.commit_volume = commit_volume self.store_volume = 500000 self.count_glob = 0 self.null_island_count = 0 self.disable_reaction_post_ref = disable_reaction_post_ref self.log = logging.getLogger("__main__") self.batched_lbsn_records = { lbsn.Origin.DESCRIPTOR.name: list(), lbsn.Country.DESCRIPTOR.name: list(), lbsn.City.DESCRIPTOR.name: list(), lbsn.Place.DESCRIPTOR.name: list(), lbsn.User.DESCRIPTOR.name: list(), lbsn.UserGroup.DESCRIPTOR.name: list(), lbsn.Post.DESCRIPTOR.name: list(), lbsn.PostReaction.DESCRIPTOR.name: list(), lbsn.Relationship.DESCRIPTOR.name: list(), } # dynamially register base classes from base module hll.register_classes() # this is the global dict of measures that are currently supported, # bases not registered here will not be measured self.batched_hll_records = { spatial.LatLngBase.NAME: dict(), spatial.PlaceBase.NAME: dict(), temporal.DateBase.NAME: dict(), temporal.MonthBase.NAME: dict(), temporal.YearBase.NAME: dict(), temporal.MonthLatLngBase.NAME: dict(), temporal.MonthHashtagBase.NAME: dict(), temporal.MonthHashtagLatLngBase.NAME: dict(), topical.TermBase.NAME: dict(), topical.HashtagBase.NAME: dict(), topical.EmojiBase.NAME: dict(), topical.TermLatLngBase.NAME: dict(), topical.HashtagLatLngBase.NAME: dict(), topical.EmojiLatLngBase.NAME: dict(), social.CommunityBase.NAME: dict(), } self.count_round = 0 if dbformat_output == "lbsn": batch_db_volume = 100 else: # increase for less complex but # higher quantity of hll records batch_db_volume = 20000 # Records are batched and submitted in # one insert with x number of records self.batch_db_volume = batch_db_volume self.store_csv = store_csv self.headers_written = set() # self.CSVsuppressLinebreaks = CSVsuppressLinebreaks self.dbformat_output = dbformat_output if self.dbformat_output == "lbsn": self.db_mapping = ProtoLBSNMapping() else: if include_lbsn_bases is None: include_lbsn_bases = [] self.db_mapping = ProtoHLLMapping(include_lbsn_bases=include_lbsn_bases) self.hllworker_cursor = hllworker_cursor if self.store_csv: self.csv_output = LBSNcsv(SUPPRESS_LINEBREAKS) def commit_changes(self): """Commit Changes to DB""" if self.db_cursor: self.db_connection.commit() # self.count_entries_commit = 0 def store_changes(self): """Write changes to CSV""" if self.store_csv: raise NotImplementedError("CSV Output curently not supported") # self.csv_output.clean_csv_batches( # self.batched_lbsn_records, self.dry_run) # self.count_entries_store = 0 def store_origin(self, origin_id, name): """Store origin of input source sql""" if self.dry_run: return if self.store_csv: origin = lbsn.Origin() origin.origin_id = origin_id self.csv_output.store_append_batch_to_csv( [origin], 0, lbsn.Origin.DESCRIPTOR.name ) return insert_sql = f""" INSERT INTO social."origin" ( origin_id, name) VALUES ({origin_id},'{name}') ON CONFLICT (origin_id) DO NOTHING """ self.db_cursor.execute(insert_sql) def store_lbsn_record_dicts(self, lbsn_record_dicts): """Main loop for storing lbsn records to CSV or DB Arguments: field_mapping {field mapping class} -- Import Field mapping class with attached data order is important here, as PostGres will reject any records where Foreign Keys are violated therefore, records are processed starting from lowest granularity. Order is stored in all_dicts() """ self.count_round += 1 # self.headersWritten.clear() r_cnt = 0 self.count_affected = 0 g_cnt = lbsn_record_dicts.get_current_count() # clear line sys.stdout.write("\033[K") for record, type_name in lbsn_record_dicts.get_all_records(): r_cnt += 1 print( f"Converting {r_cnt} of {g_cnt} " f"lbsn records ({type_name})..", end="\r", ) self.prepare_lbsn_record(record, type_name) self.count_glob += 1 # self.dbCursor.rowcount self.count_entries_commit += 1 # self.dbCursor.rowcount self.count_entries_store += 1 if self.db_cursor and ( self.count_glob == 100 or self.count_entries_commit > self.commit_volume ): self.commit_changes() if self.store_csv and (self.count_entries_store > self.store_volume): self.store_changes() # submit remaining rest self.submit_all_batches() # self.count_affected += x # monitoring print( f"\nRound {self.count_round:03d}: " f"Updated/Inserted {self.count_glob} records." ) def prepare_lbsn_record(self, record, record_type): """Prepare batched records for submit to either LBSN or HLL db""" # clean duplicates in repeated Fields and Sort List self.sort_clean_proto_repeated_field(record) # store cleaned ProtoBuf records # LBSN or HLL output if self.dbformat_output == "lbsn": self.batched_lbsn_records[record_type].append(record) else: # extract hll bases and metric from records hll_base_metrics = self.db_mapping.extract_hll_base_metrics( record, record_type ) if hll_base_metrics is None: # no base metrics extracted return # update hll dicts self.db_mapping.update_hll_dicts(self.batched_hll_records, hll_base_metrics) # check batched records (and submit) self.check_batchvolume_submit() def check_batchvolume_submit(self): """If any dict contains more values than self.batch_db_volume, submit/store all """ if self.dbformat_output == "lbsn": batch_lists = self.batched_lbsn_records.values() else: batch_lists = self.batched_hll_records.values() for batch_list in batch_lists: if len(batch_list) >= self.batch_db_volume: self.submit_all_batches() def submit_all_batches(self): """Hook to submit either lbsn or hll records""" if self.dbformat_output == "lbsn": self.submit_batches(self.batched_lbsn_records) else: self.submit_batches(self.batched_hll_records) def submit_batches( self, batched_records: Union[ Dict[str, List[str]], Dict[Tuple[str, str], Dict[str, Any]] ], ): """Prepare values for each batch, format sql and submit to db""" for record_type, batch_item in batched_records.items(): if batch_item: # if self.storeCSV and not record_type in self.headersWritten: # self.writeCSVHeader(record_type) # self.headersWritten.add(record_type) func_select = self.get_prepare_records_func(self.dbformat_output) prepared_records = func_select(batch_item) self.submit_records(record_type, prepared_records) batch_item.clear() def get_prepare_records_func(self, dbformat_outpur: str): """Selector function to get prepared records (hll/lbsn)""" if dbformat_outpur == "lbsn": return self.get_prepared_lbsn_records # hll return self.get_prepared_hll_records def get_prepared_lbsn_records(self, batch_item: List[Any]): """Turns proprietary lbsn classes into prepared sql value tuples For hll output, this includes calculation of shards from individual items using the hll_worker """ prepared_records = [] for record_item in batch_item: try: prepared_record = self.db_mapping.func_prepare_selector(record_item) except Exception: print(f"Could not process record:\n" f"{record_item}") track = traceback.format_exc() print(f"Traceback: {track}") sys.exit(1) if prepared_record: prepared_records.append(prepared_record) return prepared_records def prepare_hmac(self, hmac_key: str = None): """Update the session hmac key that is used to apply cryptographic hashing Defaults to empty string, if no --hmac_key is provided and crypt.salt is not set in worker db. """ if hmac_key is None: logging.getLogger("__main__").warn( "Use of empty hmac_key: For production use, please " "make sure that either `crypt.salt` is set on your worker db, " "or provide the --hmac_key." ) self.hllworker_cursor.execute( """ SELECT set_config( 'crypt.salt', COALESCE(current_setting('crypt.salt', 't'), ''), false); """ ) else: self.hllworker_cursor.execute("SET crypt.salt = %s", hmac_key) hmac_func_sql = HLLSql.get_hmac_hash_sql() self.hllworker_cursor.execute(hmac_func_sql) def get_prepared_hll_records(self, batch_item: Dict[str, Any]): """Turns propietary hll classes into prepared sql value tuples This includes calculation of shards from individual items using the hll_worker """ hll_items = [] # (base_key, metric_key, item) hll_base_records = [] # (base_key, attr1, attr2) # the following iteration will # loop keys in case of dict # and values in case of list for index, record_item in enumerate(batch_item.values()): # get base record and value base = record_item.get_prepared_record() if base.record: hll_base_records.append(base.record) base_metric_item_tuples = HLF.concat_base_metric_item( index, base.metrics ) # format tuple-values as sql-escaped strings value_str = [ self.prepare_sqlescaped_values(record) for record in base_metric_item_tuples ] # add to global list of items to be upserted hll_items.extend(value_str) # format sql for shard generation # get sql escaped values list values_str = HF.concat_values_str(hll_items) # clear line sys.stdout.write("\033[K") print(f"Calculating hll shards for {len(values_str)} values..", end="\r") # calculate hll shards from raw values hll_shards = HLF.calculate_item_shards(self.hllworker_cursor, values_str) prepared_records = HLF.concat_base_shards(hll_base_records, hll_shards) return prepared_records def submit_records( self, record_type: Union[Tuple[str, str], str], prepared_records: List[Tuple[Any]], ): """Submit/save prepared records to db or csv""" if self.store_csv: if self.dry_run: return self.csv_output.store_append_batch_to_csv( self.batched_lbsn_records[record_type], self.count_round, record_type ) if self.db_cursor: # get sql escaped values list sql_escaped_values_list = [ self.prepare_sqlescaped_values(record) for record in prepared_records ] # concat to single sql str values_str = HF.concat_values_str(sql_escaped_values_list) insert_sql = self.insert_sql_selector(values_str, record_type) # clear line sys.stdout.write("\033[K") print(f"Submitting {len(prepared_records)}..", end="\r") self.submit_batch(insert_sql) def insert_sql_selector(self, values_str: str, record_type): """Select function to prepare SQL insert. Attributes: record_type type of record values_str values to be inserted """ if self.dbformat_output == "lbsn": sql_selector = LBSNSql.type_sql_mapper() prepare_function = sql_selector.get(record_type) else: # hll prepare_function = HLLSql.hll_insertsql return prepare_function(values_str, record_type) def submit_lbsn_relationships(self): """submit relationships of different types record[1] is the PostgresQL formatted list of values, record[0] is the type of relationship that determines the table selection """ select_friends = [ relationship[1] for relationship in self.batched_lbsn_records[ lbsn.Relationship().DESCRIPTOR.name ] if relationship[0] == "isfriend" ] if select_friends: if self.store_csv: self.csv_output.store_append_batch_to_csv( select_friends, self.count_round, "_user_friends_user" ) if self.db_cursor: args_isfriend = ",".join(select_friends) insert_sql = f""" INSERT INTO interlinkage."_user_friends_user" ( {self.typeNamesHeaderDict["_user_friends_user"]}) VALUES {args_isfriend} ON CONFLICT (origin_id, user_guid, friend_guid) DO NOTHING """ self.submit_batch(insert_sql) select_connected = [ relationship[1] for relationship in self.batched_lbsn_records[ lbsn.Relationship().DESCRIPTOR.name ] if relationship[0] == "isconnected" ] if select_connected: if self.store_csv: self.csv_output.store_append_batch_to_csv( select_connected, self.count_round, "_user_connectsto_user" ) if self.db_cursor: args_isconnected = ",".join(select_connected) insert_sql = f""" INSERT INTO interlinkage."_user_connectsto_user" ( {self.typeNamesHeaderDict["_user_connectsto_user"]}) VALUES {args_isconnected} ON CONFLICT (origin_id, user_guid, connectedto_user_guid) DO NOTHING """ self.submit_batch(insert_sql) select_usergroupmember = [ relationship[1] for relationship in self.batched_lbsn_records[ lbsn.Relationship().DESCRIPTOR.name ] if relationship[0] == "ingroup" ] if select_usergroupmember: if self.store_csv: self.csv_output.store_append_batch_to_csv( select_usergroupmember, self.count_round, "_user_memberof_group" ) if self.db_cursor: args_isingroup = ",".join(select_usergroupmember) insert_sql = f""" INSERT INTO interlinkage."_user_memberof_group" ( {self.typeNamesHeaderDict["_user_memberof_group"]}) VALUES {args_isingroup} ON CONFLICT (origin_id, user_guid, group_guid) DO NOTHING """ self.submit_batch(insert_sql) select_usergroupmember = [ relationship[1] for relationship in self.batched_lbsn_records[ lbsn.Relationship().DESCRIPTOR.name ] if relationship[0] == "followsgroup" ] if select_usergroupmember: if self.store_csv: self.csv_output.store_append_batch_to_csv( select_usergroupmember, self.count_round, "_user_follows_group" ) if self.db_cursor: args_isingroup = ",".join(select_usergroupmember) insert_sql = f""" INSERT INTO interlinkage."_user_follows_group" ( {self.typeNamesHeaderDict["_user_follows_group"]}) VALUES {args_isingroup} ON CONFLICT (origin_id, user_guid, group_guid) DO NOTHING """ self.submit_batch(insert_sql) select_usermentions = [ relationship[1] for relationship in self.batched_lbsn_records[ lbsn.Relationship().DESCRIPTOR.name ] if relationship[0] == "mentions_user" ] if select_usermentions: if self.store_csv: self.csv_output.store_append_batch_to_csv( select_usermentions, self.count_round, "_user_mentions_user" ) if self.db_cursor: args_isingroup = ",".join(select_usermentions) insert_sql = f""" INSERT INTO interlinkage."_user_mentions_user" ( {self.typeNamesHeaderDict["_user_mentions_user"]}) VALUES {args_isingroup} ON CONFLICT (origin_id, user_guid, mentioneduser_guid) DO NOTHING """ self.submit_batch(insert_sql) def submit_batch(self, insert_sql): """Submit Batch to PG DB. Needs testing: is using Savepoint for each insert slower than rolling back entire commit? for performance, see https://stackoverflow.com/questions/ 12206600/how-to-speed-up-insertion-performance-in-postgresql or this: https://stackoverflow.com/questions/8134602/ psycopg2-insert-multiple-rows-with-one-query """ tsuccessful = False if self.dry_run: return self.db_cursor.execute("SAVEPOINT submit_recordBatch") while not tsuccessful: try: self.db_cursor.execute(insert_sql) except psycopg2.IntegrityError as einteg: if ( "(post_language)" in einteg.diag.message_detail or "(user_language)" in einteg.diag.message_detail ): # If language does not exist, we'll trust Twitter # and add this to our language list missingLanguage = einteg.diag.message_detail.partition( "language)=(" )[2].partition(") is not present")[0] print( f'TransactionIntegrityError, inserting language "' f'{missingLanguage}" first.. ' ) # self.db_cursor.rollback() self.db_cursor.execute("ROLLBACK TO SAVEPOINT submit_recordBatch") insert_language_sql = """ INSERT INTO social."language" (language_short, language_name, language_name_de) VALUES (%s,NULL,NULL); """ # submit sql to db self.db_cursor.execute(insert_language_sql, (missingLanguage,)) # commit changes so they're available when # try is executed again self.commit_changes() # recreate SAVEPOINT after language insert self.db_cursor.execute("SAVEPOINT submit_recordBatch") else: sys.exit(f"{einteg}") except psycopg2.DataError as edata: sys.exit(f"{edata}\nINSERT SQL WAS: {insert_sql}") except ValueError as evalue: self.log.warning(f"{evalue}") input("Press Enter to continue... (entry will be skipped)") self.log.warning(f"{insert_sql}") input("args:... ") # self.log.warning(f'{args_str}') self.db_cursor.execute("ROLLBACK TO SAVEPOINT submit_recordBatch") tsuccessful = True except psycopg2.ProgrammingError as epsyco: file = open("hll_exc.txt", "w") file.write(f"{epsyco}\nINSERT SQL WAS: {insert_sql}") file.close() # This close() is important sys.exit(f"{epsyco}\nINSERT SQL WAS: {insert_sql}") except psycopg2.errors.DiskFull: input("Disk space full. Clean files and continue..") else: # executed if the try clause does not raise an exception self.db_cursor.execute("RELEASE SAVEPOINT submit_recordBatch") tsuccessful = True def prepare_sqlescaped_values(self, *args): """dynamically construct sql value injection e.g. record_sql = '''(%s,%s,%s,%s,%s,%s,%s)''' """ record_sql = f"""{','.join('%s' for x in range(0, len(args)))}""" # inject values prepared_sql_record = self.db_cursor.mogrify(record_sql, tuple(args)) # mogrify returns a byte object, # we decode it so it can be used as a string again prepared_sql_record = prepared_sql_record.decode() return prepared_sql_record @classmethod def sort_clean_proto_repeated_field(cls, record): """Remove duplicate values in repeated field, sort alphabetically ProtocolBuffers has no unique list field type. This function will remove duplicates from lists, which is needed for unique compare. """ for descriptor in record.DESCRIPTOR.fields: if descriptor.label == descriptor.LABEL_REPEATED: x_attr = getattr(record, descriptor.name) if x_attr and not len(x_attr) == 1: try: x_attr_cleaned = set(x_attr) except TypeError: # needed to catch # TypeError for unhashable type: 'CompositeKey' # (lazy-initialized) continue x_attr_sorted = sorted(x_attr_cleaned) # Complete clear of repeated field for _ in range(0, len(x_attr)): x_attr.pop() # add sorted list x_attr.extend(x_attr_sorted) def finalize(self): """Final procedure calls: - clean and merge csv batches """ if self.store_csv: raise NotImplementedError("CSV Output curently not supported") # self.csv_output.clean_csv_batches( # self.batched_lbsn_records, self.dry_run)
Static methods
def sort_clean_proto_repeated_field(record)
-
Remove duplicate values in repeated field, sort alphabetically
ProtocolBuffers has no unique list field type. This function will remove duplicates from lists, which is needed for unique compare.
Expand source code
@classmethod def sort_clean_proto_repeated_field(cls, record): """Remove duplicate values in repeated field, sort alphabetically ProtocolBuffers has no unique list field type. This function will remove duplicates from lists, which is needed for unique compare. """ for descriptor in record.DESCRIPTOR.fields: if descriptor.label == descriptor.LABEL_REPEATED: x_attr = getattr(record, descriptor.name) if x_attr and not len(x_attr) == 1: try: x_attr_cleaned = set(x_attr) except TypeError: # needed to catch # TypeError for unhashable type: 'CompositeKey' # (lazy-initialized) continue x_attr_sorted = sorted(x_attr_cleaned) # Complete clear of repeated field for _ in range(0, len(x_attr)): x_attr.pop() # add sorted list x_attr.extend(x_attr_sorted)
Methods
def check_batchvolume_submit(self)
-
If any dict contains more values than self.batch_db_volume, submit/store all
Expand source code
def check_batchvolume_submit(self): """If any dict contains more values than self.batch_db_volume, submit/store all """ if self.dbformat_output == "lbsn": batch_lists = self.batched_lbsn_records.values() else: batch_lists = self.batched_hll_records.values() for batch_list in batch_lists: if len(batch_list) >= self.batch_db_volume: self.submit_all_batches()
def commit_changes(self)
-
Commit Changes to DB
Expand source code
def commit_changes(self): """Commit Changes to DB""" if self.db_cursor: self.db_connection.commit() # self.count_entries_commit = 0
def finalize(self)
-
Final procedure calls: - clean and merge csv batches
Expand source code
def finalize(self): """Final procedure calls: - clean and merge csv batches """ if self.store_csv: raise NotImplementedError("CSV Output curently not supported") # self.csv_output.clean_csv_batches( # self.batched_lbsn_records, self.dry_run)
def get_prepare_records_func(self, dbformat_outpur: str)
-
Selector function to get prepared records (hll/lbsn)
Expand source code
def get_prepare_records_func(self, dbformat_outpur: str): """Selector function to get prepared records (hll/lbsn)""" if dbformat_outpur == "lbsn": return self.get_prepared_lbsn_records # hll return self.get_prepared_hll_records
def get_prepared_hll_records(self, batch_item: Dict[str, Any])
-
Turns propietary hll classes into prepared sql value tuples
This includes calculation of shards from individual items using the hll_worker
Expand source code
def get_prepared_hll_records(self, batch_item: Dict[str, Any]): """Turns propietary hll classes into prepared sql value tuples This includes calculation of shards from individual items using the hll_worker """ hll_items = [] # (base_key, metric_key, item) hll_base_records = [] # (base_key, attr1, attr2) # the following iteration will # loop keys in case of dict # and values in case of list for index, record_item in enumerate(batch_item.values()): # get base record and value base = record_item.get_prepared_record() if base.record: hll_base_records.append(base.record) base_metric_item_tuples = HLF.concat_base_metric_item( index, base.metrics ) # format tuple-values as sql-escaped strings value_str = [ self.prepare_sqlescaped_values(record) for record in base_metric_item_tuples ] # add to global list of items to be upserted hll_items.extend(value_str) # format sql for shard generation # get sql escaped values list values_str = HF.concat_values_str(hll_items) # clear line sys.stdout.write("\033[K") print(f"Calculating hll shards for {len(values_str)} values..", end="\r") # calculate hll shards from raw values hll_shards = HLF.calculate_item_shards(self.hllworker_cursor, values_str) prepared_records = HLF.concat_base_shards(hll_base_records, hll_shards) return prepared_records
def get_prepared_lbsn_records(self, batch_item: List[Any])
-
Turns proprietary lbsn classes into prepared sql value tuples
For hll output, this includes calculation of shards from individual items using the hll_worker
Expand source code
def get_prepared_lbsn_records(self, batch_item: List[Any]): """Turns proprietary lbsn classes into prepared sql value tuples For hll output, this includes calculation of shards from individual items using the hll_worker """ prepared_records = [] for record_item in batch_item: try: prepared_record = self.db_mapping.func_prepare_selector(record_item) except Exception: print(f"Could not process record:\n" f"{record_item}") track = traceback.format_exc() print(f"Traceback: {track}") sys.exit(1) if prepared_record: prepared_records.append(prepared_record) return prepared_records
def insert_sql_selector(self, values_str: str, record_type)
-
Select function to prepare SQL insert.
Attributes
record_type type of record values_str values to be inserted
Expand source code
def insert_sql_selector(self, values_str: str, record_type): """Select function to prepare SQL insert. Attributes: record_type type of record values_str values to be inserted """ if self.dbformat_output == "lbsn": sql_selector = LBSNSql.type_sql_mapper() prepare_function = sql_selector.get(record_type) else: # hll prepare_function = HLLSql.hll_insertsql return prepare_function(values_str, record_type)
def prepare_hmac(self, hmac_key: str = None)
-
Update the session hmac key that is used to apply cryptographic hashing
Defaults to empty string, if no –hmac_key is provided and crypt.salt is not set in worker db.
Expand source code
def prepare_hmac(self, hmac_key: str = None): """Update the session hmac key that is used to apply cryptographic hashing Defaults to empty string, if no --hmac_key is provided and crypt.salt is not set in worker db. """ if hmac_key is None: logging.getLogger("__main__").warn( "Use of empty hmac_key: For production use, please " "make sure that either `crypt.salt` is set on your worker db, " "or provide the --hmac_key." ) self.hllworker_cursor.execute( """ SELECT set_config( 'crypt.salt', COALESCE(current_setting('crypt.salt', 't'), ''), false); """ ) else: self.hllworker_cursor.execute("SET crypt.salt = %s", hmac_key) hmac_func_sql = HLLSql.get_hmac_hash_sql() self.hllworker_cursor.execute(hmac_func_sql)
def prepare_lbsn_record(self, record, record_type)
-
Prepare batched records for submit to either LBSN or HLL db
Expand source code
def prepare_lbsn_record(self, record, record_type): """Prepare batched records for submit to either LBSN or HLL db""" # clean duplicates in repeated Fields and Sort List self.sort_clean_proto_repeated_field(record) # store cleaned ProtoBuf records # LBSN or HLL output if self.dbformat_output == "lbsn": self.batched_lbsn_records[record_type].append(record) else: # extract hll bases and metric from records hll_base_metrics = self.db_mapping.extract_hll_base_metrics( record, record_type ) if hll_base_metrics is None: # no base metrics extracted return # update hll dicts self.db_mapping.update_hll_dicts(self.batched_hll_records, hll_base_metrics) # check batched records (and submit) self.check_batchvolume_submit()
def prepare_sqlescaped_values(self, *args)
-
dynamically construct sql value injection
e.g. record_sql = '''(%s,%s,%s,%s,%s,%s,%s)'''
Expand source code
def prepare_sqlescaped_values(self, *args): """dynamically construct sql value injection e.g. record_sql = '''(%s,%s,%s,%s,%s,%s,%s)''' """ record_sql = f"""{','.join('%s' for x in range(0, len(args)))}""" # inject values prepared_sql_record = self.db_cursor.mogrify(record_sql, tuple(args)) # mogrify returns a byte object, # we decode it so it can be used as a string again prepared_sql_record = prepared_sql_record.decode() return prepared_sql_record
def store_changes(self)
-
Write changes to CSV
Expand source code
def store_changes(self): """Write changes to CSV""" if self.store_csv: raise NotImplementedError("CSV Output curently not supported") # self.csv_output.clean_csv_batches( # self.batched_lbsn_records, self.dry_run) # self.count_entries_store = 0
def store_lbsn_record_dicts(self, lbsn_record_dicts)
-
Main loop for storing lbsn records to CSV or DB
Arguments
field_mapping {field mapping class} – Import Field mapping class with attached data
order is important here, as PostGres will reject any records where Foreign Keys are violated therefore, records are processed starting from lowest granularity. Order is stored in all_dicts()
Expand source code
def store_lbsn_record_dicts(self, lbsn_record_dicts): """Main loop for storing lbsn records to CSV or DB Arguments: field_mapping {field mapping class} -- Import Field mapping class with attached data order is important here, as PostGres will reject any records where Foreign Keys are violated therefore, records are processed starting from lowest granularity. Order is stored in all_dicts() """ self.count_round += 1 # self.headersWritten.clear() r_cnt = 0 self.count_affected = 0 g_cnt = lbsn_record_dicts.get_current_count() # clear line sys.stdout.write("\033[K") for record, type_name in lbsn_record_dicts.get_all_records(): r_cnt += 1 print( f"Converting {r_cnt} of {g_cnt} " f"lbsn records ({type_name})..", end="\r", ) self.prepare_lbsn_record(record, type_name) self.count_glob += 1 # self.dbCursor.rowcount self.count_entries_commit += 1 # self.dbCursor.rowcount self.count_entries_store += 1 if self.db_cursor and ( self.count_glob == 100 or self.count_entries_commit > self.commit_volume ): self.commit_changes() if self.store_csv and (self.count_entries_store > self.store_volume): self.store_changes() # submit remaining rest self.submit_all_batches() # self.count_affected += x # monitoring print( f"\nRound {self.count_round:03d}: " f"Updated/Inserted {self.count_glob} records." )
def store_origin(self, origin_id, name)
-
Store origin of input source sql
Expand source code
def store_origin(self, origin_id, name): """Store origin of input source sql""" if self.dry_run: return if self.store_csv: origin = lbsn.Origin() origin.origin_id = origin_id self.csv_output.store_append_batch_to_csv( [origin], 0, lbsn.Origin.DESCRIPTOR.name ) return insert_sql = f""" INSERT INTO social."origin" ( origin_id, name) VALUES ({origin_id},'{name}') ON CONFLICT (origin_id) DO NOTHING """ self.db_cursor.execute(insert_sql)
def submit_all_batches(self)
-
Hook to submit either lbsn or hll records
Expand source code
def submit_all_batches(self): """Hook to submit either lbsn or hll records""" if self.dbformat_output == "lbsn": self.submit_batches(self.batched_lbsn_records) else: self.submit_batches(self.batched_hll_records)
def submit_batch(self, insert_sql)
-
Submit Batch to PG DB.
Needs testing: is using Savepoint for each insert slower than rolling back entire commit? for performance, see https://stackoverflow.com/questions/ 12206600/how-to-speed-up-insertion-performance-in-postgresql or this: https://stackoverflow.com/questions/8134602/ psycopg2-insert-multiple-rows-with-one-query
Expand source code
def submit_batch(self, insert_sql): """Submit Batch to PG DB. Needs testing: is using Savepoint for each insert slower than rolling back entire commit? for performance, see https://stackoverflow.com/questions/ 12206600/how-to-speed-up-insertion-performance-in-postgresql or this: https://stackoverflow.com/questions/8134602/ psycopg2-insert-multiple-rows-with-one-query """ tsuccessful = False if self.dry_run: return self.db_cursor.execute("SAVEPOINT submit_recordBatch") while not tsuccessful: try: self.db_cursor.execute(insert_sql) except psycopg2.IntegrityError as einteg: if ( "(post_language)" in einteg.diag.message_detail or "(user_language)" in einteg.diag.message_detail ): # If language does not exist, we'll trust Twitter # and add this to our language list missingLanguage = einteg.diag.message_detail.partition( "language)=(" )[2].partition(") is not present")[0] print( f'TransactionIntegrityError, inserting language "' f'{missingLanguage}" first.. ' ) # self.db_cursor.rollback() self.db_cursor.execute("ROLLBACK TO SAVEPOINT submit_recordBatch") insert_language_sql = """ INSERT INTO social."language" (language_short, language_name, language_name_de) VALUES (%s,NULL,NULL); """ # submit sql to db self.db_cursor.execute(insert_language_sql, (missingLanguage,)) # commit changes so they're available when # try is executed again self.commit_changes() # recreate SAVEPOINT after language insert self.db_cursor.execute("SAVEPOINT submit_recordBatch") else: sys.exit(f"{einteg}") except psycopg2.DataError as edata: sys.exit(f"{edata}\nINSERT SQL WAS: {insert_sql}") except ValueError as evalue: self.log.warning(f"{evalue}") input("Press Enter to continue... (entry will be skipped)") self.log.warning(f"{insert_sql}") input("args:... ") # self.log.warning(f'{args_str}') self.db_cursor.execute("ROLLBACK TO SAVEPOINT submit_recordBatch") tsuccessful = True except psycopg2.ProgrammingError as epsyco: file = open("hll_exc.txt", "w") file.write(f"{epsyco}\nINSERT SQL WAS: {insert_sql}") file.close() # This close() is important sys.exit(f"{epsyco}\nINSERT SQL WAS: {insert_sql}") except psycopg2.errors.DiskFull: input("Disk space full. Clean files and continue..") else: # executed if the try clause does not raise an exception self.db_cursor.execute("RELEASE SAVEPOINT submit_recordBatch") tsuccessful = True
def submit_batches(self, batched_records: Union[Dict[str, List[str]], Dict[Tuple[str, str], Dict[str, Any]]])
-
Prepare values for each batch, format sql and submit to db
Expand source code
def submit_batches( self, batched_records: Union[ Dict[str, List[str]], Dict[Tuple[str, str], Dict[str, Any]] ], ): """Prepare values for each batch, format sql and submit to db""" for record_type, batch_item in batched_records.items(): if batch_item: # if self.storeCSV and not record_type in self.headersWritten: # self.writeCSVHeader(record_type) # self.headersWritten.add(record_type) func_select = self.get_prepare_records_func(self.dbformat_output) prepared_records = func_select(batch_item) self.submit_records(record_type, prepared_records) batch_item.clear()
def submit_lbsn_relationships(self)
-
submit relationships of different types
record[1] is the PostgresQL formatted list of values, record[0] is the type of relationship that determines the table selection
Expand source code
def submit_lbsn_relationships(self): """submit relationships of different types record[1] is the PostgresQL formatted list of values, record[0] is the type of relationship that determines the table selection """ select_friends = [ relationship[1] for relationship in self.batched_lbsn_records[ lbsn.Relationship().DESCRIPTOR.name ] if relationship[0] == "isfriend" ] if select_friends: if self.store_csv: self.csv_output.store_append_batch_to_csv( select_friends, self.count_round, "_user_friends_user" ) if self.db_cursor: args_isfriend = ",".join(select_friends) insert_sql = f""" INSERT INTO interlinkage."_user_friends_user" ( {self.typeNamesHeaderDict["_user_friends_user"]}) VALUES {args_isfriend} ON CONFLICT (origin_id, user_guid, friend_guid) DO NOTHING """ self.submit_batch(insert_sql) select_connected = [ relationship[1] for relationship in self.batched_lbsn_records[ lbsn.Relationship().DESCRIPTOR.name ] if relationship[0] == "isconnected" ] if select_connected: if self.store_csv: self.csv_output.store_append_batch_to_csv( select_connected, self.count_round, "_user_connectsto_user" ) if self.db_cursor: args_isconnected = ",".join(select_connected) insert_sql = f""" INSERT INTO interlinkage."_user_connectsto_user" ( {self.typeNamesHeaderDict["_user_connectsto_user"]}) VALUES {args_isconnected} ON CONFLICT (origin_id, user_guid, connectedto_user_guid) DO NOTHING """ self.submit_batch(insert_sql) select_usergroupmember = [ relationship[1] for relationship in self.batched_lbsn_records[ lbsn.Relationship().DESCRIPTOR.name ] if relationship[0] == "ingroup" ] if select_usergroupmember: if self.store_csv: self.csv_output.store_append_batch_to_csv( select_usergroupmember, self.count_round, "_user_memberof_group" ) if self.db_cursor: args_isingroup = ",".join(select_usergroupmember) insert_sql = f""" INSERT INTO interlinkage."_user_memberof_group" ( {self.typeNamesHeaderDict["_user_memberof_group"]}) VALUES {args_isingroup} ON CONFLICT (origin_id, user_guid, group_guid) DO NOTHING """ self.submit_batch(insert_sql) select_usergroupmember = [ relationship[1] for relationship in self.batched_lbsn_records[ lbsn.Relationship().DESCRIPTOR.name ] if relationship[0] == "followsgroup" ] if select_usergroupmember: if self.store_csv: self.csv_output.store_append_batch_to_csv( select_usergroupmember, self.count_round, "_user_follows_group" ) if self.db_cursor: args_isingroup = ",".join(select_usergroupmember) insert_sql = f""" INSERT INTO interlinkage."_user_follows_group" ( {self.typeNamesHeaderDict["_user_follows_group"]}) VALUES {args_isingroup} ON CONFLICT (origin_id, user_guid, group_guid) DO NOTHING """ self.submit_batch(insert_sql) select_usermentions = [ relationship[1] for relationship in self.batched_lbsn_records[ lbsn.Relationship().DESCRIPTOR.name ] if relationship[0] == "mentions_user" ] if select_usermentions: if self.store_csv: self.csv_output.store_append_batch_to_csv( select_usermentions, self.count_round, "_user_mentions_user" ) if self.db_cursor: args_isingroup = ",".join(select_usermentions) insert_sql = f""" INSERT INTO interlinkage."_user_mentions_user" ( {self.typeNamesHeaderDict["_user_mentions_user"]}) VALUES {args_isingroup} ON CONFLICT (origin_id, user_guid, mentioneduser_guid) DO NOTHING """ self.submit_batch(insert_sql)
def submit_records(self, record_type: Union[Tuple[str, str], str], prepared_records: List[Tuple[Any]])
-
Submit/save prepared records to db or csv
Expand source code
def submit_records( self, record_type: Union[Tuple[str, str], str], prepared_records: List[Tuple[Any]], ): """Submit/save prepared records to db or csv""" if self.store_csv: if self.dry_run: return self.csv_output.store_append_batch_to_csv( self.batched_lbsn_records[record_type], self.count_round, record_type ) if self.db_cursor: # get sql escaped values list sql_escaped_values_list = [ self.prepare_sqlescaped_values(record) for record in prepared_records ] # concat to single sql str values_str = HF.concat_values_str(sql_escaped_values_list) insert_sql = self.insert_sql_selector(values_str, record_type) # clear line sys.stdout.write("\033[K") print(f"Submitting {len(prepared_records)}..", end="\r") self.submit_batch(insert_sql)