Module lbsntransform.__main__
lbsntransform package script to load, format and store data from and to common lbsn structure
Import options: - Postgres Database or - local CSV/json/stacked json import Output options: - Postgres Database or - local ProtoBuf and CSV Import (prepared for Postgres /Copy)
Expand source code
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
lbsntransform package script to load, format and store data
from and to common lbsn structure
Import options:
- Postgres Database or
- local CSV/json/stacked json import
Output options:
- Postgres Database or
- local ProtoBuf and CSV Import (prepared for Postgres /Copy)
"""
__author__ = "Alexander Dunkel"
__license__ = "GNU GPLv3"
# version: see version.py
import sys
from lbsntransform.tools.helper_functions import HelperFunctions as HF
from lbsntransform.output.shared_structure import TimeMonitor
from lbsntransform.input.load_data import LoadData
from lbsntransform.config.config import BaseConfig
from lbsntransform.lbsntransform_ import LBSNTransform
def main(config: BaseConfig = None):
"""Main function for cli-mode to process data
from postgres db or local file input
to postgres db or local file output
"""
# Load Default Config, will be overwritten if args are given
if config is None:
config = BaseConfig()
# Parse args
config.parse_args()
# initialize mapping class
# depending on lbsn origin
# e.g. 1 = Instagram,
# 2 = Flickr, 2.1 = Flickr YFCC100m,
# 3 = Twitter)
importer = HF.load_importer_mapping_module(config.origin, config.mappings_path)
# initialize lbsntransform
lbsntransform = LBSNTransform(
importer=importer,
logging_level=config.logging_level,
is_local_input=config.is_local_input,
transfer_count=config.transfer_count,
csv_output=config.csv_output,
csv_suppress_linebreaks=config.csv_suppress_linebreaks,
dbuser_output=config.dbuser_output,
dbserveraddress_output=config.dbserveraddress_output,
dbname_output=config.dbname_output,
dbpassword_output=config.dbpassword_output,
dbserverport_output=config.dbserverport_output,
dbformat_output=config.dbformat_output,
dbuser_input=config.dbuser_input,
dbserveraddress_input=config.dbserveraddress_input,
dbname_input=config.dbname_input,
dbpassword_input=config.dbpassword_input,
dbserverport_input=config.dbserverport_input,
dbuser_hllworker=config.dbuser_hllworker,
dbserveraddress_hllworker=config.dbserveraddress_hllworker,
dbname_hllworker=config.dbname_hllworker,
dbpassword_hllworker=config.dbpassword_hllworker,
dbserverport_hllworker=config.dbserverport_hllworker,
include_lbsn_bases=config.include_lbsn_bases,
dry_run=config.dry_run,
hmac_key=config.hmac_key,
commit_volume=config.commit_volume,
)
# initialize input reader
input_data = LoadData(
importer=importer,
is_local_input=config.is_local_input,
startwith_db_rownumber=config.startwith_db_rownumber,
skip_until_file=config.skip_until_file,
cursor_input=lbsntransform.cursor_input,
input_path=config.input_path,
recursive_load=config.recursive_load,
local_file_type=config.local_file_type,
endwith_db_rownumber=config.endwith_db_rownumber,
is_stacked_json=config.is_stacked_json,
is_line_separated_json=config.is_line_separated_json,
csv_delim=config.csv_delim,
use_csv_dictreader=config.use_csv_dictreader,
input_lbsn_type=config.input_lbsn_type,
dbformat_input=config.dbformat_input,
geocode_locations=config.geocode_locations,
ignore_input_source_list=config.ignore_input_source_list,
disable_reactionpost_ref=config.disable_reactionpost_ref,
map_relations=config.map_relations,
transfer_reactions=config.transfer_reactions,
ignore_non_geotagged=config.ignore_non_geotagged,
min_geoaccuracy=config.min_geoaccuracy,
source_web=config.source_web,
skip_until_record=config.skip_until_record,
zip_records=config.zip_records,
include_lbsn_objects=config.include_lbsn_objects,
override_lbsn_query_schema=config.override_lbsn_query_schema,
)
# Manually add entries that need submission prior to parsing data
# add_bundestag_group_example(import_mapper)
# init time monitoring
how_long = TimeMonitor()
# read and process unfiltered input records from csv
# start settings
with input_data as records:
for record in records:
lbsntransform.add_processed_records(record)
# report progress
if lbsntransform.processed_total % 1000 == 0:
stats_str = HF.report_stats(
input_data.count_glob,
input_data.continue_number,
lbsntransform.lbsn_records,
)
print(stats_str, end="\r")
sys.stdout.flush()
if (
config.transferlimit
and lbsntransform.processed_total >= config.transferlimit
):
break
# finalize output (close db connection, submit remaining)
lbsntransform.log.info(
f"\nTransferring remaining "
f"{lbsntransform.lbsn_records.count_glob} to db.. "
f"{HF.null_notice(input_data.import_mapper.null_island)})"
)
lbsntransform.finalize_output()
# final report
lbsntransform.log.info(
f"\n\n{''.join([f'(Dry Run){chr(10)}' if config.dry_run else ''])}"
f"Processed {input_data.count_glob} input records "
f"(Input {input_data.start_number} to "
f"{input_data.continue_number}). "
f"\n\nIdentified {lbsntransform.processed_total} LBSN records, "
f"with {lbsntransform.lbsn_records.count_glob_total} "
f"distinct LBSN records overall. "
f"{HF.get_skipped_report(input_data.import_mapper)}. "
f"Merged {lbsntransform.lbsn_records.count_dup_merge_total} "
f"duplicate records."
)
lbsntransform.log.info(f"\n{HF.get_count_stats(lbsntransform.lbsn_records)}")
lbsntransform.log.info(f"Done. {how_long.stop_time()}")
lbsntransform.close_log()
if __name__ == "__main__":
main()
Functions
def main(config: BaseConfig = None)
-
Main function for cli-mode to process data from postgres db or local file input to postgres db or local file output
Expand source code
def main(config: BaseConfig = None): """Main function for cli-mode to process data from postgres db or local file input to postgres db or local file output """ # Load Default Config, will be overwritten if args are given if config is None: config = BaseConfig() # Parse args config.parse_args() # initialize mapping class # depending on lbsn origin # e.g. 1 = Instagram, # 2 = Flickr, 2.1 = Flickr YFCC100m, # 3 = Twitter) importer = HF.load_importer_mapping_module(config.origin, config.mappings_path) # initialize lbsntransform lbsntransform = LBSNTransform( importer=importer, logging_level=config.logging_level, is_local_input=config.is_local_input, transfer_count=config.transfer_count, csv_output=config.csv_output, csv_suppress_linebreaks=config.csv_suppress_linebreaks, dbuser_output=config.dbuser_output, dbserveraddress_output=config.dbserveraddress_output, dbname_output=config.dbname_output, dbpassword_output=config.dbpassword_output, dbserverport_output=config.dbserverport_output, dbformat_output=config.dbformat_output, dbuser_input=config.dbuser_input, dbserveraddress_input=config.dbserveraddress_input, dbname_input=config.dbname_input, dbpassword_input=config.dbpassword_input, dbserverport_input=config.dbserverport_input, dbuser_hllworker=config.dbuser_hllworker, dbserveraddress_hllworker=config.dbserveraddress_hllworker, dbname_hllworker=config.dbname_hllworker, dbpassword_hllworker=config.dbpassword_hllworker, dbserverport_hllworker=config.dbserverport_hllworker, include_lbsn_bases=config.include_lbsn_bases, dry_run=config.dry_run, hmac_key=config.hmac_key, commit_volume=config.commit_volume, ) # initialize input reader input_data = LoadData( importer=importer, is_local_input=config.is_local_input, startwith_db_rownumber=config.startwith_db_rownumber, skip_until_file=config.skip_until_file, cursor_input=lbsntransform.cursor_input, input_path=config.input_path, recursive_load=config.recursive_load, local_file_type=config.local_file_type, endwith_db_rownumber=config.endwith_db_rownumber, is_stacked_json=config.is_stacked_json, is_line_separated_json=config.is_line_separated_json, csv_delim=config.csv_delim, use_csv_dictreader=config.use_csv_dictreader, input_lbsn_type=config.input_lbsn_type, dbformat_input=config.dbformat_input, geocode_locations=config.geocode_locations, ignore_input_source_list=config.ignore_input_source_list, disable_reactionpost_ref=config.disable_reactionpost_ref, map_relations=config.map_relations, transfer_reactions=config.transfer_reactions, ignore_non_geotagged=config.ignore_non_geotagged, min_geoaccuracy=config.min_geoaccuracy, source_web=config.source_web, skip_until_record=config.skip_until_record, zip_records=config.zip_records, include_lbsn_objects=config.include_lbsn_objects, override_lbsn_query_schema=config.override_lbsn_query_schema, ) # Manually add entries that need submission prior to parsing data # add_bundestag_group_example(import_mapper) # init time monitoring how_long = TimeMonitor() # read and process unfiltered input records from csv # start settings with input_data as records: for record in records: lbsntransform.add_processed_records(record) # report progress if lbsntransform.processed_total % 1000 == 0: stats_str = HF.report_stats( input_data.count_glob, input_data.continue_number, lbsntransform.lbsn_records, ) print(stats_str, end="\r") sys.stdout.flush() if ( config.transferlimit and lbsntransform.processed_total >= config.transferlimit ): break # finalize output (close db connection, submit remaining) lbsntransform.log.info( f"\nTransferring remaining " f"{lbsntransform.lbsn_records.count_glob} to db.. " f"{HF.null_notice(input_data.import_mapper.null_island)})" ) lbsntransform.finalize_output() # final report lbsntransform.log.info( f"\n\n{''.join([f'(Dry Run){chr(10)}' if config.dry_run else ''])}" f"Processed {input_data.count_glob} input records " f"(Input {input_data.start_number} to " f"{input_data.continue_number}). " f"\n\nIdentified {lbsntransform.processed_total} LBSN records, " f"with {lbsntransform.lbsn_records.count_glob_total} " f"distinct LBSN records overall. " f"{HF.get_skipped_report(input_data.import_mapper)}. " f"Merged {lbsntransform.lbsn_records.count_dup_merge_total} " f"duplicate records." ) lbsntransform.log.info(f"\n{HF.get_count_stats(lbsntransform.lbsn_records)}") lbsntransform.log.info(f"Done. {how_long.stop_time()}") lbsntransform.close_log()