Module lbsntransform.input.load_data
Module for loding data from different sources (CSV, DB, JSON etc.).
Expand source code
# -*- coding: utf-8 -*-
"""
Module for loding data from different sources (CSV, DB, JSON etc.).
"""
import codecs
import csv
import os
import sys
import logging
import traceback
from contextlib import closing
from itertools import zip_longest
from typing import Tuple, List, Union, Iterator, Optional, IO
import psycopg2
import ntpath
import requests
import lbsnstructure as lbsn
from lbsntransform.tools.db_connection import DBConnection
from lbsntransform.output.shared_structure import GeocodeLocations
from lbsntransform.tools.helper_functions import HelperFunctions as HF
from lbsntransform.input.mappings.db_query import (
InputSQL,
LBSN_SCHEMA,
optional_schema_override,
)
# type alias
LBSNObjects = Union[
lbsn.CompositeKey,
lbsn.Language,
lbsn.RelationshipKey,
lbsn.City,
lbsn.Country,
lbsn.Origin,
lbsn.Place,
lbsn.Post,
lbsn.PostReaction,
lbsn.Relationship,
lbsn.User,
lbsn.UserGroup,
]
class LoadData:
"""
Class for loding data from different sources (CSV, DB, JSON etc.).
"""
def __init__(
self,
importer=None,
is_local_input=None,
startwith_db_rownumber=None,
skip_until_file=None,
cursor_input=None,
input_path=None,
recursive_load=None,
local_file_type=None,
endwith_db_rownumber=None,
is_stacked_json=None,
is_line_separated_json=None,
csv_delim=None,
input_lbsn_type=None,
dbformat_input=None,
geocode_locations=None,
ignore_input_source_list=None,
disable_reactionpost_ref=None,
map_relations=None,
transfer_reactions=None,
ignore_non_geotagged=None,
min_geoaccuracy=None,
source_web=None,
zip_records=None,
skip_until_record=None,
include_lbsn_objects=None,
override_lbsn_query_schema=None,
use_csv_dictreader=None,
):
self.is_local_input = is_local_input
self.start_number = 1
self.continue_number = None
self.skip_until_record = None
if not self.is_local_input:
# Start Value, Modify to continue from last processing
self.continue_number = startwith_db_rownumber
self.start_number = startwith_db_rownumber
else:
self.continue_number = 0
self.skip_until_record = skip_until_record
self.source_web = source_web
if zip_records is None:
zip_records = False
self.zip_records = zip_records
self.cursor_input = cursor_input
if self.is_local_input and not self.source_web:
self.filelist = LoadData._read_local_files(
input_path=input_path,
recursive_load=recursive_load,
local_file_type=local_file_type,
skip_until_file=skip_until_file,
)
elif self.is_local_input and self.source_web:
self.filelist = input_path
self.finished = False
self.dbformat_input = dbformat_input
self.db_row_number = 0
self.endwith_db_rownumber = endwith_db_rownumber
self.is_stacked_json = is_stacked_json
self.is_line_separated_json = is_line_separated_json
self.local_file_type = local_file_type
self.csv_delim = csv_delim
self.use_csv_dictreader = use_csv_dictreader
self.file_format = local_file_type
self.input_lbsn_type = input_lbsn_type
self.start_id = None
if include_lbsn_objects is None:
include_lbsn_objects = ["post"]
self.include_lbsn_objects = include_lbsn_objects
self.count_glob = 0
self.current_source = None
# self.transferlimit = cfg.transferlimit
# Optional Geocoding
self.geocode_dict = None
if geocode_locations:
self.geocode_dict = LoadData.load_geocodes(geocode_locations)
# Optional ignore input sources
self.ignore_sources_set = None
if ignore_input_source_list:
self.ignore_sources_set = LoadData.load_ignore_sources(
ignore_input_source_list
)
self.lbsn_schema = LBSN_SCHEMA
if override_lbsn_query_schema:
self.lbsn_schema = optional_schema_override(
LBSN_SCHEMA, override_lbsn_query_schema
)
# these kwargs will be given to the dynamic mappings module
# only if defined in the module;
# these variables are made available via cli to the user;
# adding new options requires updating cli config
kwargs = {
"disable_reaction_post_referencing": disable_reactionpost_ref,
"geocodes": self.geocode_dict,
"map_full_relations": map_relations,
"map_reactions": transfer_reactions,
"ignore_non_geotagged": ignore_non_geotagged,
"ignore_sources_set": self.ignore_sources_set,
"min_geoaccuracy": min_geoaccuracy,
}
# initialize field mapping structure
self.import_mapper = importer(**kwargs)
self.finished = False
def __enter__(self) -> Iterator[LBSNObjects]:
"""Main pipeline for reading input data
Combine multiple generators to single pipeline,
returned for being processed by with-statement
"""
if self.cursor_input or self.source_web:
return self.convert_records(self._process_input())
else:
return self.convert_records(self._process_input(self._open_input_files()))
def __exit__(self, exception_type, exception_value, tb_value):
"""Contextmanager exit: nothing to do here if no exception is raised"""
if any(v is not None for v in [exception_type, exception_value, tb_value]):
# only if any of these variables is not None
# catch exception and output additional information
logging.getLogger("__main__").warning(
f"\nError while reading records: "
f"{exception_type}\n{exception_value}\n"
f"{traceback.print_tb(tb_value)}\n"
)
logging.getLogger("__main__").warning(
f"Current source: \n {self.current_source}\n"
)
stats_str = HF.report_stats(self.count_glob, self.continue_number)
logging.getLogger("__main__").warning(stats_str)
return False
def _open_input_files(self) -> Iterator[IO[str]]:
"""Loops input filelist and returns opened file handles"""
# process localfiles
for file_name in self.filelist:
self.continue_number += 1
self.current_source = file_name
HF.log_main_debug(f"Current file: {ntpath.basename(file_name)}")
yield open(file_name, "r", encoding="utf-8", errors="replace")
def _process_input(
self, file_handles: Iterator[IO[str]] = None
) -> Iterator[Optional[Tuple[List[str], Optional[str]]]]:
"""File parse for CSV or JSON from open file handle
Output Generator of type Tuple (1, 2):
1) a list of post that can be parsed
2) Optional information regarding type of records in list
"""
if self.source_web:
if self.csv_delim is None:
self.csv_delim = ","
kwargs = {
"delimiter": self.csv_delim,
"quotechar": '"',
"quoting": csv.QUOTE_NONE,
}
if len(self.filelist) == 1:
# single web file query
url = self.filelist[0]
with closing(requests.get(url, stream=True)) as file_handle:
record_reader = csv.reader(
codecs.iterdecode(
file_handle.iter_lines(), "utf-8"
), # pylint: disable=maybe-no-member
**kwargs,
)
if self.use_csv_dictreader:
record_reader = csv.DictReader(
codecs.iterdecode(
file_handle.iter_lines(), "utf-8"
), # pylint: disable=maybe-no-member
**kwargs,
)
for record in record_reader:
# for record in zip_longest(r1, r2)
yield record, None
else:
# multiple web file query
if self.zip_records and len(self.filelist) == 2:
# zip 2 web csv sources in parallel, e.g.
# zip_longest('ABCD', 'xy', fillvalue='-') --> Ax By C- D-
url1 = self.filelist[0]
url2 = self.filelist[1]
with closing(requests.get(url1, stream=True)) as fhandle1, closing(
requests.get(url2, stream=True)
) as fhandle2:
if self.use_csv_dictreader:
logging.getLogger("__main__").warning(
"--use_csv_dictreader not supported with flag --zip_records."
)
reader1 = csv.reader(
codecs.iterdecode(fhandle1.iter_lines(), "utf-8"), **kwargs
)
reader2 = csv.reader(
codecs.iterdecode(fhandle2.iter_lines(), "utf-8"), **kwargs
)
for zipped_record in zip_longest(reader1, reader2):
# catch any type-error None record
# if zipped_record[0] is None:
# yield zipped_record[1]
# if zipped_record[1] is None:
# yield zipped_record[0]
# two combine lists
try:
yield zipped_record[0] + zipped_record[1], None
except TypeError:
sys.exit(
f"Stream appears to have broken. "
f"Check connection and continue at "
f"{self.count_glob}"
)
return
else:
raise ValueError(
"Iteration of multiple web sources "
"is currently not supported."
)
elif self.is_local_input:
if file_handles is None:
raise ValueError("Input is empty")
if self.zip_records:
if len(self.filelist) == 2:
filehandle1 = next(file_handles)
filehandle2 = next(file_handles)
for zipped_record in zip_longest(
self.fetch_record_from_file(filehandle1),
self.fetch_record_from_file(filehandle2),
):
yield zipped_record[0] + zipped_record[1], None
else:
raise ValueError("Zipping only supported for 2 files.")
else:
# local file loop
for file_handle in file_handles:
if not self.file_format == "json":
# csv or txt
for record in self.fetch_record_from_file(file_handle):
if record:
yield record, None
else:
continue
else:
# json
for record in self.fetch_json_data_from_file(file_handle):
if record:
yield record, None
else:
continue
else:
# db query
if self.dbformat_input == "lbsn":
for lbsn_type, schema_name, table_name, key_col in self.lbsn_schema:
if lbsn_type.lower() not in self.include_lbsn_objects:
continue
while self.cursor_input:
records = self.fetch_json_data_from_lbsn(
cursor=self.cursor_input,
start_id=self.continue_number,
schema_name=schema_name,
table_name=table_name,
key_col=key_col,
)
if records is None:
break
for record in records:
yield record, lbsn_type
# reset start cursor
# note: this will disable --startwith_db_rownumber for
# any further lbsnobjects
self.continue_number = None
elif self.dbformat_input == "json":
while self.cursor_input:
records = self.fetch_json_data_from_lbsn(
cursor=self.cursor_input, start_id=self.continue_number
)
for record in records:
yield record, self.input_lbsn_type
def convert_records(
self, records: Iterator[Optional[Tuple[List[str], Optional[str]]]]
) -> Iterator[List[LBSNObjects]]:
"""Loops input json or csv records,
converts to ProtoBuf structure and adds to records_dict
Returns statistic-counts, modifies (adds results to) import_mapper
"""
for record in records:
self.count_glob += 1
# skip records based on count
if self.skip_until_record and self.skip_until_record > self.count_glob:
print(f"Skipping record {self.count_glob}", end="\r")
continue
record_type = None
if self.is_local_input or self.dbformat_input == "lbsn":
single_record = record[0]
record_type = record[1]
else:
# e.g. dbformat_input == "json"
self.db_row_number = record[0]
single_record = record[2]
if LoadData.skip_empty_or_other(single_record):
# skip empty or malformed records
continue
# pass arguments by position,
# record_type may not always be avaiable/ used by mapping
args = [single_record, record_type]
if self.local_file_type == "json" or not self.is_local_input:
# note: db-records always returned as json-dict
lbsn_records = self.import_mapper.parse_json_record(*args)
elif self.local_file_type in ("txt", "csv"):
lbsn_records = self.import_mapper.parse_csv_record(*args)
else:
sys.exit(f"Format {self.local_file_type} not supported.")
# return record as pipe
if lbsn_records is None:
continue
for lbsn_record in lbsn_records:
yield lbsn_record
@staticmethod
def skip_empty_or_other(single_record):
"""Detect Rate Limiting Notice or empty records
so they can be skipped.
"""
skip = False
if not single_record or (
isinstance(single_record, dict) and single_record.get("limit")
):
skip = True
return skip
def fetch_json_data_from_lbsn(
self,
cursor,
start_id=None,
get_max=None,
number_of_records_to_fetch=10000,
schema_name=None,
table_name=None,
key_col=None,
) -> Optional[List[List[str]]]:
"""Fetches records from Postgres DB
Keyword arguments:
cursor -- db-cursor
start_id -- Offset for querying
get_max -- optional limit for retrieving records
number_of_records_to_fetch -- how many records should get fetched
"""
# if transferlimit is below number_of_records_to_fetch, e.g. 10000,
# retrieve only necessary volume of records
if get_max:
number_of_records_to_fetch = min(number_of_records_to_fetch, get_max)
query_sql = InputSQL.LBSN.get_sql(
schema_name=schema_name,
table_name=table_name,
start_id=start_id,
number_of_records_to_fetch=number_of_records_to_fetch,
key_col=key_col,
)
cursor.execute(query_sql)
records = cursor.fetchall()
if cursor.rowcount == 0:
return None
# update last returned db_row_number
if key_col == None:
self.continue_number = records[-1][0]
if not self.start_number:
# first returned db_row_number
self.start_number = records[0][0]
else:
self.continue_number = records[-1].get(key_col)
if not self.start_number:
# first returned db_row_number
self.start_number = records[0].get(key_col)
return records
def fetch_record_from_file(self, file_handle):
"""Fetches CSV or JSON data (including stacked json) from file"""
if self.file_format in ["txt", "csv"]:
record_reader = self.fetch_csv_data_from_file(file_handle)
else:
sys.exit(f"Format {self.file_format} not supported.")
# return record pipeline
for record in record_reader:
yield record
def fetch_json_data_from_file(self, file_handle):
"""Read json entries from file.
Typical form is [{json1},{json2}], if is_stacked_json is True:
will process stacked jsons in the form of {json1}{json2}
If is_line_separated_json is true:
{json1}
{json2}
...
"""
# records = []
# Stacked JSON is a simple file with many concatenated jsons, e.g.
# {json1}{json2} etc.
if self.is_stacked_json:
# note: this requires loading file completely first
# not streaming optimized yet
for record in HF.json_read_wrapper(HF.decode_stacked(file_handle.read())):
yield record
if self.is_line_separated_json:
# json's separated by line ending
for line in file_handle:
record = HF.json_load_wrapper(line, single=True)
yield record
else:
# normal json nesting, e.g. {{record1},{record2}}
records = HF.json_load_wrapper(file_handle)
if records:
if isinstance(records, list):
for record in records:
yield record
else:
record = records
yield record
yield None
# streaming version:
# start_pos = 0
# while True:
# try:
# record = json.load(file_handle)
# yield record
# return
# except json.JSONDecodeError as e:
# file_handle.seek(start_pos)
# json_str = file_handle.read(e.pos)
# record = json.loads(json_str)
# start_pos += e.pos
# yield record
def fetch_csv_data_from_file(self, file_handle):
"""Read csv entries from file (either *.txt or *.csv).
The actual CSV formatting is not setable in config yet.
There are many specifics, e.g.
# QUOTE_NONE is used here because media saved from Flickr
does not contain any quotes ""
"""
if self.csv_delim is None:
self.csv_delim = ","
kwargs = {
"delimiter": self.csv_delim,
"quotechar": '"',
"quoting": csv.QUOTE_NONE,
}
if self.use_csv_dictreader:
record_reader = csv.DictReader(f=file_handle, **kwargs)
return record_reader
record_reader = csv.reader(file_handle, **kwargs)
return record_reader
@staticmethod
def initialize_connection(
dbuser_output,
dbserveraddress_output,
dbname_output,
dbpassword_output,
dbserverport_output,
readonly: bool = True,
dict_cursor: Optional[bool] = None,
):
"""Establishes connection to DB (Postgres)"""
if dbuser_output:
connection = DBConnection(
serveraddress=dbserveraddress_output,
dbname=dbname_output,
user=dbuser_output,
password=dbpassword_output,
port=dbserverport_output,
readonly=readonly,
)
conn, cursor = connection.connect(dict_cursor)
else:
conn = None
cursor = None
return conn, cursor
@staticmethod
def _read_local_files(
input_path=None, recursive_load=None, local_file_type=None, skip_until_file=None
) -> List[str]:
"""Read Local Files according to config parameters and
returns list of file-paths
"""
if recursive_load:
excludefolderlist = [
"01_DataSetHistory",
"02_UserData",
"03_ClippedData",
"04_MapVis",
]
excludestartswithfile = ["log", "settings", "GridCoordinates"]
loc_filelist = LoadData.scan_rec(
input_path,
file_format=local_file_type,
excludefolderlist=excludefolderlist,
excludestartswithfile=excludestartswithfile,
)
else:
loc_filelist_gen = input_path.glob(f"*.{local_file_type}")
loc_filelist = []
for file_path in loc_filelist_gen:
loc_filelist.append(file_path)
if skip_until_file:
file_index = LoadData._item_index_list(loc_filelist, skip_until_file)
logging.getLogger("__main__").info(
f"\nSkipped {len(loc_filelist) - len(loc_filelist[file_index:])}"
f" input files. \n"
)
loc_filelist = loc_filelist[file_index:]
input_count = len(loc_filelist)
if input_count == 0:
sys.exit("No location files found.")
else:
return loc_filelist
@staticmethod
def _item_index_list(item_list, value, split_filename=True):
index = 0
for element in item_list:
if split_filename:
element = os.path.split(element)[1]
if element == value:
return index
index += 1
return -1
@staticmethod
def load_geocodes(geo_config):
"""Loads coordinates-string tuples for geocoding
text locations (Optional)
"""
locationsgeocode_dict = GeocodeLocations()
locationsgeocode_dict.load_geocodelist(geo_config)
geocode_dict = locationsgeocode_dict.geocode_dict
return geocode_dict
@staticmethod
def load_ignore_sources(list_source=None):
"""Loads list of source types to be ignored"""
ignore_source_list = set()
with open(list_source, newline="", encoding="utf8") as file_handle:
for ignore_source in file_handle:
ignore_source_list.add(ignore_source.strip())
return ignore_source_list
@staticmethod
def scan_rec(
root,
subdirlimit=2,
file_format="csv",
excludefolderlist=None,
excludestartswithfile=None,
):
"""Recursively scan subdir for datafiles"""
rval = []
if excludefolderlist is None:
excludefolderlist = []
if excludestartswithfile is None:
excludestartswithfile = []
def do_scan(start_dir, output, depth=0):
for file_handle in os.listdir(start_dir):
file_handle_path = os.path.join(start_dir, file_handle)
if os.path.isdir(file_handle_path):
if depth < subdirlimit:
efound = False
# check for excludefolders:
for entry in excludefolderlist:
if entry in file_handle_path:
efound = True
break
if efound is False:
do_scan(file_handle_path, output, depth + 1)
else:
if file_handle_path.endswith(file_format):
efound = False
for entry in excludestartswithfile:
if ntpath.basename(file_handle_path).startswith(entry):
efound = True
break
if efound is False:
output.append(file_handle_path)
do_scan(root, rval, 0)
return rval
Classes
class LoadData (importer=None, is_local_input=None, startwith_db_rownumber=None, skip_until_file=None, cursor_input=None, input_path=None, recursive_load=None, local_file_type=None, endwith_db_rownumber=None, is_stacked_json=None, is_line_separated_json=None, csv_delim=None, input_lbsn_type=None, dbformat_input=None, geocode_locations=None, ignore_input_source_list=None, disable_reactionpost_ref=None, map_relations=None, transfer_reactions=None, ignore_non_geotagged=None, min_geoaccuracy=None, source_web=None, zip_records=None, skip_until_record=None, include_lbsn_objects=None, override_lbsn_query_schema=None, use_csv_dictreader=None)
-
Class for loding data from different sources (CSV, DB, JSON etc.).
Expand source code
class LoadData: """ Class for loding data from different sources (CSV, DB, JSON etc.). """ def __init__( self, importer=None, is_local_input=None, startwith_db_rownumber=None, skip_until_file=None, cursor_input=None, input_path=None, recursive_load=None, local_file_type=None, endwith_db_rownumber=None, is_stacked_json=None, is_line_separated_json=None, csv_delim=None, input_lbsn_type=None, dbformat_input=None, geocode_locations=None, ignore_input_source_list=None, disable_reactionpost_ref=None, map_relations=None, transfer_reactions=None, ignore_non_geotagged=None, min_geoaccuracy=None, source_web=None, zip_records=None, skip_until_record=None, include_lbsn_objects=None, override_lbsn_query_schema=None, use_csv_dictreader=None, ): self.is_local_input = is_local_input self.start_number = 1 self.continue_number = None self.skip_until_record = None if not self.is_local_input: # Start Value, Modify to continue from last processing self.continue_number = startwith_db_rownumber self.start_number = startwith_db_rownumber else: self.continue_number = 0 self.skip_until_record = skip_until_record self.source_web = source_web if zip_records is None: zip_records = False self.zip_records = zip_records self.cursor_input = cursor_input if self.is_local_input and not self.source_web: self.filelist = LoadData._read_local_files( input_path=input_path, recursive_load=recursive_load, local_file_type=local_file_type, skip_until_file=skip_until_file, ) elif self.is_local_input and self.source_web: self.filelist = input_path self.finished = False self.dbformat_input = dbformat_input self.db_row_number = 0 self.endwith_db_rownumber = endwith_db_rownumber self.is_stacked_json = is_stacked_json self.is_line_separated_json = is_line_separated_json self.local_file_type = local_file_type self.csv_delim = csv_delim self.use_csv_dictreader = use_csv_dictreader self.file_format = local_file_type self.input_lbsn_type = input_lbsn_type self.start_id = None if include_lbsn_objects is None: include_lbsn_objects = ["post"] self.include_lbsn_objects = include_lbsn_objects self.count_glob = 0 self.current_source = None # self.transferlimit = cfg.transferlimit # Optional Geocoding self.geocode_dict = None if geocode_locations: self.geocode_dict = LoadData.load_geocodes(geocode_locations) # Optional ignore input sources self.ignore_sources_set = None if ignore_input_source_list: self.ignore_sources_set = LoadData.load_ignore_sources( ignore_input_source_list ) self.lbsn_schema = LBSN_SCHEMA if override_lbsn_query_schema: self.lbsn_schema = optional_schema_override( LBSN_SCHEMA, override_lbsn_query_schema ) # these kwargs will be given to the dynamic mappings module # only if defined in the module; # these variables are made available via cli to the user; # adding new options requires updating cli config kwargs = { "disable_reaction_post_referencing": disable_reactionpost_ref, "geocodes": self.geocode_dict, "map_full_relations": map_relations, "map_reactions": transfer_reactions, "ignore_non_geotagged": ignore_non_geotagged, "ignore_sources_set": self.ignore_sources_set, "min_geoaccuracy": min_geoaccuracy, } # initialize field mapping structure self.import_mapper = importer(**kwargs) self.finished = False def __enter__(self) -> Iterator[LBSNObjects]: """Main pipeline for reading input data Combine multiple generators to single pipeline, returned for being processed by with-statement """ if self.cursor_input or self.source_web: return self.convert_records(self._process_input()) else: return self.convert_records(self._process_input(self._open_input_files())) def __exit__(self, exception_type, exception_value, tb_value): """Contextmanager exit: nothing to do here if no exception is raised""" if any(v is not None for v in [exception_type, exception_value, tb_value]): # only if any of these variables is not None # catch exception and output additional information logging.getLogger("__main__").warning( f"\nError while reading records: " f"{exception_type}\n{exception_value}\n" f"{traceback.print_tb(tb_value)}\n" ) logging.getLogger("__main__").warning( f"Current source: \n {self.current_source}\n" ) stats_str = HF.report_stats(self.count_glob, self.continue_number) logging.getLogger("__main__").warning(stats_str) return False def _open_input_files(self) -> Iterator[IO[str]]: """Loops input filelist and returns opened file handles""" # process localfiles for file_name in self.filelist: self.continue_number += 1 self.current_source = file_name HF.log_main_debug(f"Current file: {ntpath.basename(file_name)}") yield open(file_name, "r", encoding="utf-8", errors="replace") def _process_input( self, file_handles: Iterator[IO[str]] = None ) -> Iterator[Optional[Tuple[List[str], Optional[str]]]]: """File parse for CSV or JSON from open file handle Output Generator of type Tuple (1, 2): 1) a list of post that can be parsed 2) Optional information regarding type of records in list """ if self.source_web: if self.csv_delim is None: self.csv_delim = "," kwargs = { "delimiter": self.csv_delim, "quotechar": '"', "quoting": csv.QUOTE_NONE, } if len(self.filelist) == 1: # single web file query url = self.filelist[0] with closing(requests.get(url, stream=True)) as file_handle: record_reader = csv.reader( codecs.iterdecode( file_handle.iter_lines(), "utf-8" ), # pylint: disable=maybe-no-member **kwargs, ) if self.use_csv_dictreader: record_reader = csv.DictReader( codecs.iterdecode( file_handle.iter_lines(), "utf-8" ), # pylint: disable=maybe-no-member **kwargs, ) for record in record_reader: # for record in zip_longest(r1, r2) yield record, None else: # multiple web file query if self.zip_records and len(self.filelist) == 2: # zip 2 web csv sources in parallel, e.g. # zip_longest('ABCD', 'xy', fillvalue='-') --> Ax By C- D- url1 = self.filelist[0] url2 = self.filelist[1] with closing(requests.get(url1, stream=True)) as fhandle1, closing( requests.get(url2, stream=True) ) as fhandle2: if self.use_csv_dictreader: logging.getLogger("__main__").warning( "--use_csv_dictreader not supported with flag --zip_records." ) reader1 = csv.reader( codecs.iterdecode(fhandle1.iter_lines(), "utf-8"), **kwargs ) reader2 = csv.reader( codecs.iterdecode(fhandle2.iter_lines(), "utf-8"), **kwargs ) for zipped_record in zip_longest(reader1, reader2): # catch any type-error None record # if zipped_record[0] is None: # yield zipped_record[1] # if zipped_record[1] is None: # yield zipped_record[0] # two combine lists try: yield zipped_record[0] + zipped_record[1], None except TypeError: sys.exit( f"Stream appears to have broken. " f"Check connection and continue at " f"{self.count_glob}" ) return else: raise ValueError( "Iteration of multiple web sources " "is currently not supported." ) elif self.is_local_input: if file_handles is None: raise ValueError("Input is empty") if self.zip_records: if len(self.filelist) == 2: filehandle1 = next(file_handles) filehandle2 = next(file_handles) for zipped_record in zip_longest( self.fetch_record_from_file(filehandle1), self.fetch_record_from_file(filehandle2), ): yield zipped_record[0] + zipped_record[1], None else: raise ValueError("Zipping only supported for 2 files.") else: # local file loop for file_handle in file_handles: if not self.file_format == "json": # csv or txt for record in self.fetch_record_from_file(file_handle): if record: yield record, None else: continue else: # json for record in self.fetch_json_data_from_file(file_handle): if record: yield record, None else: continue else: # db query if self.dbformat_input == "lbsn": for lbsn_type, schema_name, table_name, key_col in self.lbsn_schema: if lbsn_type.lower() not in self.include_lbsn_objects: continue while self.cursor_input: records = self.fetch_json_data_from_lbsn( cursor=self.cursor_input, start_id=self.continue_number, schema_name=schema_name, table_name=table_name, key_col=key_col, ) if records is None: break for record in records: yield record, lbsn_type # reset start cursor # note: this will disable --startwith_db_rownumber for # any further lbsnobjects self.continue_number = None elif self.dbformat_input == "json": while self.cursor_input: records = self.fetch_json_data_from_lbsn( cursor=self.cursor_input, start_id=self.continue_number ) for record in records: yield record, self.input_lbsn_type def convert_records( self, records: Iterator[Optional[Tuple[List[str], Optional[str]]]] ) -> Iterator[List[LBSNObjects]]: """Loops input json or csv records, converts to ProtoBuf structure and adds to records_dict Returns statistic-counts, modifies (adds results to) import_mapper """ for record in records: self.count_glob += 1 # skip records based on count if self.skip_until_record and self.skip_until_record > self.count_glob: print(f"Skipping record {self.count_glob}", end="\r") continue record_type = None if self.is_local_input or self.dbformat_input == "lbsn": single_record = record[0] record_type = record[1] else: # e.g. dbformat_input == "json" self.db_row_number = record[0] single_record = record[2] if LoadData.skip_empty_or_other(single_record): # skip empty or malformed records continue # pass arguments by position, # record_type may not always be avaiable/ used by mapping args = [single_record, record_type] if self.local_file_type == "json" or not self.is_local_input: # note: db-records always returned as json-dict lbsn_records = self.import_mapper.parse_json_record(*args) elif self.local_file_type in ("txt", "csv"): lbsn_records = self.import_mapper.parse_csv_record(*args) else: sys.exit(f"Format {self.local_file_type} not supported.") # return record as pipe if lbsn_records is None: continue for lbsn_record in lbsn_records: yield lbsn_record @staticmethod def skip_empty_or_other(single_record): """Detect Rate Limiting Notice or empty records so they can be skipped. """ skip = False if not single_record or ( isinstance(single_record, dict) and single_record.get("limit") ): skip = True return skip def fetch_json_data_from_lbsn( self, cursor, start_id=None, get_max=None, number_of_records_to_fetch=10000, schema_name=None, table_name=None, key_col=None, ) -> Optional[List[List[str]]]: """Fetches records from Postgres DB Keyword arguments: cursor -- db-cursor start_id -- Offset for querying get_max -- optional limit for retrieving records number_of_records_to_fetch -- how many records should get fetched """ # if transferlimit is below number_of_records_to_fetch, e.g. 10000, # retrieve only necessary volume of records if get_max: number_of_records_to_fetch = min(number_of_records_to_fetch, get_max) query_sql = InputSQL.LBSN.get_sql( schema_name=schema_name, table_name=table_name, start_id=start_id, number_of_records_to_fetch=number_of_records_to_fetch, key_col=key_col, ) cursor.execute(query_sql) records = cursor.fetchall() if cursor.rowcount == 0: return None # update last returned db_row_number if key_col == None: self.continue_number = records[-1][0] if not self.start_number: # first returned db_row_number self.start_number = records[0][0] else: self.continue_number = records[-1].get(key_col) if not self.start_number: # first returned db_row_number self.start_number = records[0].get(key_col) return records def fetch_record_from_file(self, file_handle): """Fetches CSV or JSON data (including stacked json) from file""" if self.file_format in ["txt", "csv"]: record_reader = self.fetch_csv_data_from_file(file_handle) else: sys.exit(f"Format {self.file_format} not supported.") # return record pipeline for record in record_reader: yield record def fetch_json_data_from_file(self, file_handle): """Read json entries from file. Typical form is [{json1},{json2}], if is_stacked_json is True: will process stacked jsons in the form of {json1}{json2} If is_line_separated_json is true: {json1} {json2} ... """ # records = [] # Stacked JSON is a simple file with many concatenated jsons, e.g. # {json1}{json2} etc. if self.is_stacked_json: # note: this requires loading file completely first # not streaming optimized yet for record in HF.json_read_wrapper(HF.decode_stacked(file_handle.read())): yield record if self.is_line_separated_json: # json's separated by line ending for line in file_handle: record = HF.json_load_wrapper(line, single=True) yield record else: # normal json nesting, e.g. {{record1},{record2}} records = HF.json_load_wrapper(file_handle) if records: if isinstance(records, list): for record in records: yield record else: record = records yield record yield None # streaming version: # start_pos = 0 # while True: # try: # record = json.load(file_handle) # yield record # return # except json.JSONDecodeError as e: # file_handle.seek(start_pos) # json_str = file_handle.read(e.pos) # record = json.loads(json_str) # start_pos += e.pos # yield record def fetch_csv_data_from_file(self, file_handle): """Read csv entries from file (either *.txt or *.csv). The actual CSV formatting is not setable in config yet. There are many specifics, e.g. # QUOTE_NONE is used here because media saved from Flickr does not contain any quotes "" """ if self.csv_delim is None: self.csv_delim = "," kwargs = { "delimiter": self.csv_delim, "quotechar": '"', "quoting": csv.QUOTE_NONE, } if self.use_csv_dictreader: record_reader = csv.DictReader(f=file_handle, **kwargs) return record_reader record_reader = csv.reader(file_handle, **kwargs) return record_reader @staticmethod def initialize_connection( dbuser_output, dbserveraddress_output, dbname_output, dbpassword_output, dbserverport_output, readonly: bool = True, dict_cursor: Optional[bool] = None, ): """Establishes connection to DB (Postgres)""" if dbuser_output: connection = DBConnection( serveraddress=dbserveraddress_output, dbname=dbname_output, user=dbuser_output, password=dbpassword_output, port=dbserverport_output, readonly=readonly, ) conn, cursor = connection.connect(dict_cursor) else: conn = None cursor = None return conn, cursor @staticmethod def _read_local_files( input_path=None, recursive_load=None, local_file_type=None, skip_until_file=None ) -> List[str]: """Read Local Files according to config parameters and returns list of file-paths """ if recursive_load: excludefolderlist = [ "01_DataSetHistory", "02_UserData", "03_ClippedData", "04_MapVis", ] excludestartswithfile = ["log", "settings", "GridCoordinates"] loc_filelist = LoadData.scan_rec( input_path, file_format=local_file_type, excludefolderlist=excludefolderlist, excludestartswithfile=excludestartswithfile, ) else: loc_filelist_gen = input_path.glob(f"*.{local_file_type}") loc_filelist = [] for file_path in loc_filelist_gen: loc_filelist.append(file_path) if skip_until_file: file_index = LoadData._item_index_list(loc_filelist, skip_until_file) logging.getLogger("__main__").info( f"\nSkipped {len(loc_filelist) - len(loc_filelist[file_index:])}" f" input files. \n" ) loc_filelist = loc_filelist[file_index:] input_count = len(loc_filelist) if input_count == 0: sys.exit("No location files found.") else: return loc_filelist @staticmethod def _item_index_list(item_list, value, split_filename=True): index = 0 for element in item_list: if split_filename: element = os.path.split(element)[1] if element == value: return index index += 1 return -1 @staticmethod def load_geocodes(geo_config): """Loads coordinates-string tuples for geocoding text locations (Optional) """ locationsgeocode_dict = GeocodeLocations() locationsgeocode_dict.load_geocodelist(geo_config) geocode_dict = locationsgeocode_dict.geocode_dict return geocode_dict @staticmethod def load_ignore_sources(list_source=None): """Loads list of source types to be ignored""" ignore_source_list = set() with open(list_source, newline="", encoding="utf8") as file_handle: for ignore_source in file_handle: ignore_source_list.add(ignore_source.strip()) return ignore_source_list @staticmethod def scan_rec( root, subdirlimit=2, file_format="csv", excludefolderlist=None, excludestartswithfile=None, ): """Recursively scan subdir for datafiles""" rval = [] if excludefolderlist is None: excludefolderlist = [] if excludestartswithfile is None: excludestartswithfile = [] def do_scan(start_dir, output, depth=0): for file_handle in os.listdir(start_dir): file_handle_path = os.path.join(start_dir, file_handle) if os.path.isdir(file_handle_path): if depth < subdirlimit: efound = False # check for excludefolders: for entry in excludefolderlist: if entry in file_handle_path: efound = True break if efound is False: do_scan(file_handle_path, output, depth + 1) else: if file_handle_path.endswith(file_format): efound = False for entry in excludestartswithfile: if ntpath.basename(file_handle_path).startswith(entry): efound = True break if efound is False: output.append(file_handle_path) do_scan(root, rval, 0) return rval
Static methods
def initialize_connection(dbuser_output, dbserveraddress_output, dbname_output, dbpassword_output, dbserverport_output, readonly: bool = True, dict_cursor: Optional[bool] = None)
-
Establishes connection to DB (Postgres)
Expand source code
@staticmethod def initialize_connection( dbuser_output, dbserveraddress_output, dbname_output, dbpassword_output, dbserverport_output, readonly: bool = True, dict_cursor: Optional[bool] = None, ): """Establishes connection to DB (Postgres)""" if dbuser_output: connection = DBConnection( serveraddress=dbserveraddress_output, dbname=dbname_output, user=dbuser_output, password=dbpassword_output, port=dbserverport_output, readonly=readonly, ) conn, cursor = connection.connect(dict_cursor) else: conn = None cursor = None return conn, cursor
def load_geocodes(geo_config)
-
Loads coordinates-string tuples for geocoding text locations (Optional)
Expand source code
@staticmethod def load_geocodes(geo_config): """Loads coordinates-string tuples for geocoding text locations (Optional) """ locationsgeocode_dict = GeocodeLocations() locationsgeocode_dict.load_geocodelist(geo_config) geocode_dict = locationsgeocode_dict.geocode_dict return geocode_dict
def load_ignore_sources(list_source=None)
-
Loads list of source types to be ignored
Expand source code
@staticmethod def load_ignore_sources(list_source=None): """Loads list of source types to be ignored""" ignore_source_list = set() with open(list_source, newline="", encoding="utf8") as file_handle: for ignore_source in file_handle: ignore_source_list.add(ignore_source.strip()) return ignore_source_list
def scan_rec(root, subdirlimit=2, file_format='csv', excludefolderlist=None, excludestartswithfile=None)
-
Recursively scan subdir for datafiles
Expand source code
@staticmethod def scan_rec( root, subdirlimit=2, file_format="csv", excludefolderlist=None, excludestartswithfile=None, ): """Recursively scan subdir for datafiles""" rval = [] if excludefolderlist is None: excludefolderlist = [] if excludestartswithfile is None: excludestartswithfile = [] def do_scan(start_dir, output, depth=0): for file_handle in os.listdir(start_dir): file_handle_path = os.path.join(start_dir, file_handle) if os.path.isdir(file_handle_path): if depth < subdirlimit: efound = False # check for excludefolders: for entry in excludefolderlist: if entry in file_handle_path: efound = True break if efound is False: do_scan(file_handle_path, output, depth + 1) else: if file_handle_path.endswith(file_format): efound = False for entry in excludestartswithfile: if ntpath.basename(file_handle_path).startswith(entry): efound = True break if efound is False: output.append(file_handle_path) do_scan(root, rval, 0) return rval
def skip_empty_or_other(single_record)
-
Detect Rate Limiting Notice or empty records so they can be skipped.
Expand source code
@staticmethod def skip_empty_or_other(single_record): """Detect Rate Limiting Notice or empty records so they can be skipped. """ skip = False if not single_record or ( isinstance(single_record, dict) and single_record.get("limit") ): skip = True return skip
Methods
def convert_records(self, records: Iterator[Optional[Tuple[List[str], Optional[str]]]]) ‑> Iterator[List[Union[lbsnstructure.social_pb2.CompositeKey, lbsnstructure.social_pb2.Language, lbsnstructure.interlinkage_pb2.RelationshipKey, lbsnstructure.spatial_pb2.City, lbsnstructure.spatial_pb2.Country, lbsnstructure.social_pb2.Origin, lbsnstructure.spatial_pb2.Place, lbsnstructure.topical_pb2.Post, lbsnstructure.topical_pb2.PostReaction, lbsnstructure.interlinkage_pb2.Relationship, lbsnstructure.social_pb2.User, lbsnstructure.social_pb2.UserGroup]]]
-
Loops input json or csv records, converts to ProtoBuf structure and adds to records_dict
Returns statistic-counts, modifies (adds results to) import_mapper
Expand source code
def convert_records( self, records: Iterator[Optional[Tuple[List[str], Optional[str]]]] ) -> Iterator[List[LBSNObjects]]: """Loops input json or csv records, converts to ProtoBuf structure and adds to records_dict Returns statistic-counts, modifies (adds results to) import_mapper """ for record in records: self.count_glob += 1 # skip records based on count if self.skip_until_record and self.skip_until_record > self.count_glob: print(f"Skipping record {self.count_glob}", end="\r") continue record_type = None if self.is_local_input or self.dbformat_input == "lbsn": single_record = record[0] record_type = record[1] else: # e.g. dbformat_input == "json" self.db_row_number = record[0] single_record = record[2] if LoadData.skip_empty_or_other(single_record): # skip empty or malformed records continue # pass arguments by position, # record_type may not always be avaiable/ used by mapping args = [single_record, record_type] if self.local_file_type == "json" or not self.is_local_input: # note: db-records always returned as json-dict lbsn_records = self.import_mapper.parse_json_record(*args) elif self.local_file_type in ("txt", "csv"): lbsn_records = self.import_mapper.parse_csv_record(*args) else: sys.exit(f"Format {self.local_file_type} not supported.") # return record as pipe if lbsn_records is None: continue for lbsn_record in lbsn_records: yield lbsn_record
def fetch_csv_data_from_file(self, file_handle)
-
Read csv entries from file (either .txt or .csv).
The actual CSV formatting is not setable in config yet. There are many specifics, e.g.
QUOTE_NONE is used here because media saved from Flickr
does not contain any quotes ""
Expand source code
def fetch_csv_data_from_file(self, file_handle): """Read csv entries from file (either *.txt or *.csv). The actual CSV formatting is not setable in config yet. There are many specifics, e.g. # QUOTE_NONE is used here because media saved from Flickr does not contain any quotes "" """ if self.csv_delim is None: self.csv_delim = "," kwargs = { "delimiter": self.csv_delim, "quotechar": '"', "quoting": csv.QUOTE_NONE, } if self.use_csv_dictreader: record_reader = csv.DictReader(f=file_handle, **kwargs) return record_reader record_reader = csv.reader(file_handle, **kwargs) return record_reader
def fetch_json_data_from_file(self, file_handle)
-
Read json entries from file.
Typical form is [{json1},{json2}], if is_stacked_json is True: will process stacked jsons in the form of {json1}{json2}
If is_line_separated_json is true: {json1} {json2} …
Expand source code
def fetch_json_data_from_file(self, file_handle): """Read json entries from file. Typical form is [{json1},{json2}], if is_stacked_json is True: will process stacked jsons in the form of {json1}{json2} If is_line_separated_json is true: {json1} {json2} ... """ # records = [] # Stacked JSON is a simple file with many concatenated jsons, e.g. # {json1}{json2} etc. if self.is_stacked_json: # note: this requires loading file completely first # not streaming optimized yet for record in HF.json_read_wrapper(HF.decode_stacked(file_handle.read())): yield record if self.is_line_separated_json: # json's separated by line ending for line in file_handle: record = HF.json_load_wrapper(line, single=True) yield record else: # normal json nesting, e.g. {{record1},{record2}} records = HF.json_load_wrapper(file_handle) if records: if isinstance(records, list): for record in records: yield record else: record = records yield record yield None # streaming version: # start_pos = 0 # while True: # try: # record = json.load(file_handle) # yield record # return # except json.JSONDecodeError as e: # file_handle.seek(start_pos) # json_str = file_handle.read(e.pos) # record = json.loads(json_str) # start_pos += e.pos # yield record
def fetch_json_data_from_lbsn(self, cursor, start_id=None, get_max=None, number_of_records_to_fetch=10000, schema_name=None, table_name=None, key_col=None) ‑> Optional[List[List[str]]]
-
Fetches records from Postgres DB
Keyword arguments: cursor – db-cursor start_id – Offset for querying get_max – optional limit for retrieving records number_of_records_to_fetch – how many records should get fetched
Expand source code
def fetch_json_data_from_lbsn( self, cursor, start_id=None, get_max=None, number_of_records_to_fetch=10000, schema_name=None, table_name=None, key_col=None, ) -> Optional[List[List[str]]]: """Fetches records from Postgres DB Keyword arguments: cursor -- db-cursor start_id -- Offset for querying get_max -- optional limit for retrieving records number_of_records_to_fetch -- how many records should get fetched """ # if transferlimit is below number_of_records_to_fetch, e.g. 10000, # retrieve only necessary volume of records if get_max: number_of_records_to_fetch = min(number_of_records_to_fetch, get_max) query_sql = InputSQL.LBSN.get_sql( schema_name=schema_name, table_name=table_name, start_id=start_id, number_of_records_to_fetch=number_of_records_to_fetch, key_col=key_col, ) cursor.execute(query_sql) records = cursor.fetchall() if cursor.rowcount == 0: return None # update last returned db_row_number if key_col == None: self.continue_number = records[-1][0] if not self.start_number: # first returned db_row_number self.start_number = records[0][0] else: self.continue_number = records[-1].get(key_col) if not self.start_number: # first returned db_row_number self.start_number = records[0].get(key_col) return records
def fetch_record_from_file(self, file_handle)
-
Fetches CSV or JSON data (including stacked json) from file
Expand source code
def fetch_record_from_file(self, file_handle): """Fetches CSV or JSON data (including stacked json) from file""" if self.file_format in ["txt", "csv"]: record_reader = self.fetch_csv_data_from_file(file_handle) else: sys.exit(f"Format {self.file_format} not supported.") # return record pipeline for record in record_reader: yield record