Module lbsntransform.lbsntransform_

LBSNTransform: Convert Raw Social Media data to common LBSN interchange format (ProtoBuf) and transfer to local CSV or LBSN Postgres Database

Expand source code
# -*- coding: utf-8 -*-

"""
LBSNTransform: Convert Raw Social Media data
         to common LBSN interchange format (ProtoBuf) and
         transfer to local CSV or LBSN Postgres Database
"""

from __future__ import absolute_import

__author__ = "Alexander Dunkel"
__license__ = "GNU GPLv3"

import io
import logging
import sys
from pathlib import Path

from lbsntransform.tools.helper_functions import HelperFunctions as HF
from lbsntransform.output.shared_structure import LBSNRecordDicts
from lbsntransform.output.submit_data import LBSNTransfer
from lbsntransform.input.load_data import LoadData


class LBSNTransform:
    """Import, convert and export RAW Location Based Social Media data,
    such as Twitter and Flickr, based on a common data structure concept
    in Google's ProtoBuf format (see package lbsnstructure).

    Input can be:
        - local CSV or Json (stacked/regular/line separated)
        - Postgres DB connection
    Output can be:
        - (local CSV)*
        - (local file with ProtoBuf encoded records)*
        - (local SQL file ready for "Import from" in Postgres LBSN db)*
        - Postgres DB connection (with existing LBSN raw DB Structure)
        - Postgres DB connection (with existing LBSN hll DB Structure)

        \* currently not supported

    Parameters
    ----------

    origin_id : int, optional (default=3)
        Type of input source. Each input source has its own import mapper
        defined in a class. Feel free to add or modify classes based
        on your needs. Pre-provided are:
            2    - Flickr
            2.1  - Flickr YFCC100M dataset
            3    - Twitter
    """

    def __init__(
        self,
        importer,
        logging_level=None,
        is_local_input: bool = False,
        transfer_count: int = 50000,
        csv_output: bool = True,
        csv_suppress_linebreaks: bool = True,
        dbuser_output=None,
        dbserveraddress_output=None,
        dbname_output=None,
        dbpassword_output=None,
        dbserverport_output=None,
        dbuser_input=None,
        dbserveraddress_input=None,
        dbname_input=None,
        dbpassword_input=None,
        dbserverport_input=None,
        dbformat_output=None,
        dbuser_hllworker=None,
        dbserveraddress_hllworker=None,
        dbname_hllworker=None,
        dbpassword_hllworker=None,
        dbserverport_hllworker=None,
        include_lbsn_bases=None,
        dry_run=None,
        hmac_key=None,
        commit_volume=None,
    ):
        """Init settings for LBSNTransform"""

        # init logger level
        if logging_level is None:
            logging_level = logging.INFO
        # Set Output to Replace in case of encoding issues (console/windows)
        sys.stdout = io.TextIOWrapper(
            sys.stdout.detach(), sys.stdout.encoding, "replace"
        )
        sys.stdout.flush()
        self.log = HF.set_logger()
        self.dry_run = dry_run
        # init global settings

        self.transfer_count = transfer_count
        self.importer = importer
        # get origin name and id from importer
        # e.g. yfcc100m dataset has origin id 21,
        # but is specified as general Flickr origin (2) in importer
        self.origin_id = self.importer.ORIGIN_ID
        self.origin_name = self.importer.ORIGIN_NAME
        # establish output connection
        self.dbuser_output = dbuser_output
        conn_output, cursor_output = LoadData.initialize_connection(
            dbuser_output,
            dbserveraddress_output,
            dbname_output,
            dbpassword_output,
            dbserverport_output,
        )
        if dbformat_output == "hll":
            __, cursor_hllworker = LoadData.initialize_connection(
                dbuser_hllworker,
                dbserveraddress_hllworker,
                dbname_hllworker,
                dbpassword_hllworker,
                dbserverport_hllworker,
                readonly=True,
            )
        else:
            cursor_hllworker = None

        # store global for closing connection later
        self.cursor_output = cursor_output
        self.output = LBSNTransfer(
            db_cursor=cursor_output,
            db_connection=conn_output,
            store_csv=csv_output,
            commit_volume=commit_volume,
            SUPPRESS_LINEBREAKS=csv_suppress_linebreaks,
            dbformat_output=dbformat_output,
            hllworker_cursor=cursor_hllworker,
            include_lbsn_bases=include_lbsn_bases,
            dry_run=self.dry_run,
        )
        # load from local json/csv or from PostgresDB
        self.cursor_input = None
        self.is_local_input = is_local_input
        if not self.is_local_input:
            __, cursor_input = LoadData.initialize_connection(
                dbuser_input,
                dbserveraddress_input,
                dbname_input,
                dbpassword_input,
                dbserverport_input,
                readonly=True,
                dict_cursor=True,
            )
            self.cursor_input = cursor_input

        # initialize stats
        self.processed_total = 0
        self.initial_loop = True
        self.how_long = None
        # field mapping structure
        # this is where all the converted data will be stored
        # note that one input record may contain many lbsn records
        self.lbsn_records = LBSNRecordDicts()
        # prepare hmac
        if self.output.dbformat_output == "hll":
            self.output.prepare_hmac(hmac_key)

    def add_processed_records(self, lbsn_record):
        """Adds one or multiple LBSN Records (ProtoBuf)
        to collection (dicts of LBSNRecords)

        Will automatically call self.store_lbsn_records()
        """
        self.lbsn_records.add_records_to_dict(lbsn_record)
        self.processed_total += 1
        # On the first loop
        # or after 50.000 (default) processed records,
        # store results
        if self.initial_loop:
            if self.output.dbformat_output == "lbsn":
                self.output.store_origin(self.origin_id, self.origin_name)
                self.store_lbsn_records()
            self.initial_loop = False
        if self.lbsn_records.count_glob >= self.transfer_count:
            print("\n", end="")
            self.store_lbsn_records()

    def store_lbsn_records(self):
        """Stores processed LBSN Records to chosen output format"""
        self.output.store_lbsn_record_dicts(self.lbsn_records)
        self.output.commit_changes()
        self.lbsn_records.clear()

    def finalize_output(self):
        """finalize all transactions (csv merge etc.)"""
        self.store_lbsn_records()
        self.output.finalize()
        # Close connections to DBs
        if not self.is_local_input:
            self.cursor_input.close()
        if self.dbuser_output:
            self.cursor_output.close()

    @staticmethod
    def close_log():
        """ "Closes log and writes to archive file"""
        logging.shutdown()
        # rename log file for archive purposes
        today = HF.get_str_formatted_today()
        outfile = Path(f"{today}.log")
        with open(outfile, "a+") as outfile:
            with open("log.log") as infile:
                outfile.write(f"\n")
                for line in infile:
                    outfile.write(line)

Classes

class LBSNTransform (importer, logging_level=None, is_local_input: bool = False, transfer_count: int = 50000, csv_output: bool = True, csv_suppress_linebreaks: bool = True, dbuser_output=None, dbserveraddress_output=None, dbname_output=None, dbpassword_output=None, dbserverport_output=None, dbuser_input=None, dbserveraddress_input=None, dbname_input=None, dbpassword_input=None, dbserverport_input=None, dbformat_output=None, dbuser_hllworker=None, dbserveraddress_hllworker=None, dbname_hllworker=None, dbpassword_hllworker=None, dbserverport_hllworker=None, include_lbsn_bases=None, dry_run=None, hmac_key=None, commit_volume=None)

Import, convert and export RAW Location Based Social Media data, such as Twitter and Flickr, based on a common data structure concept in Google's ProtoBuf format (see package lbsnstructure).

Input can be: - local CSV or Json (stacked/regular/line separated) - Postgres DB connection Output can be: - (local CSV) - (local file with ProtoBuf encoded records) - (local SQL file ready for "Import from" in Postgres LBSN db)* - Postgres DB connection (with existing LBSN raw DB Structure) - Postgres DB connection (with existing LBSN hll DB Structure)

\* currently not supported

Parameters

origin_id : int, optional (default=3)
Type of input source. Each input source has its own import mapper defined in a class. Feel free to add or modify classes based on your needs. Pre-provided are: 2 - Flickr 2.1 - Flickr YFCC100M dataset 3 - Twitter

Init settings for LBSNTransform

Expand source code
class LBSNTransform:
    """Import, convert and export RAW Location Based Social Media data,
    such as Twitter and Flickr, based on a common data structure concept
    in Google's ProtoBuf format (see package lbsnstructure).

    Input can be:
        - local CSV or Json (stacked/regular/line separated)
        - Postgres DB connection
    Output can be:
        - (local CSV)*
        - (local file with ProtoBuf encoded records)*
        - (local SQL file ready for "Import from" in Postgres LBSN db)*
        - Postgres DB connection (with existing LBSN raw DB Structure)
        - Postgres DB connection (with existing LBSN hll DB Structure)

        \* currently not supported

    Parameters
    ----------

    origin_id : int, optional (default=3)
        Type of input source. Each input source has its own import mapper
        defined in a class. Feel free to add or modify classes based
        on your needs. Pre-provided are:
            2    - Flickr
            2.1  - Flickr YFCC100M dataset
            3    - Twitter
    """

    def __init__(
        self,
        importer,
        logging_level=None,
        is_local_input: bool = False,
        transfer_count: int = 50000,
        csv_output: bool = True,
        csv_suppress_linebreaks: bool = True,
        dbuser_output=None,
        dbserveraddress_output=None,
        dbname_output=None,
        dbpassword_output=None,
        dbserverport_output=None,
        dbuser_input=None,
        dbserveraddress_input=None,
        dbname_input=None,
        dbpassword_input=None,
        dbserverport_input=None,
        dbformat_output=None,
        dbuser_hllworker=None,
        dbserveraddress_hllworker=None,
        dbname_hllworker=None,
        dbpassword_hllworker=None,
        dbserverport_hllworker=None,
        include_lbsn_bases=None,
        dry_run=None,
        hmac_key=None,
        commit_volume=None,
    ):
        """Init settings for LBSNTransform"""

        # init logger level
        if logging_level is None:
            logging_level = logging.INFO
        # Set Output to Replace in case of encoding issues (console/windows)
        sys.stdout = io.TextIOWrapper(
            sys.stdout.detach(), sys.stdout.encoding, "replace"
        )
        sys.stdout.flush()
        self.log = HF.set_logger()
        self.dry_run = dry_run
        # init global settings

        self.transfer_count = transfer_count
        self.importer = importer
        # get origin name and id from importer
        # e.g. yfcc100m dataset has origin id 21,
        # but is specified as general Flickr origin (2) in importer
        self.origin_id = self.importer.ORIGIN_ID
        self.origin_name = self.importer.ORIGIN_NAME
        # establish output connection
        self.dbuser_output = dbuser_output
        conn_output, cursor_output = LoadData.initialize_connection(
            dbuser_output,
            dbserveraddress_output,
            dbname_output,
            dbpassword_output,
            dbserverport_output,
        )
        if dbformat_output == "hll":
            __, cursor_hllworker = LoadData.initialize_connection(
                dbuser_hllworker,
                dbserveraddress_hllworker,
                dbname_hllworker,
                dbpassword_hllworker,
                dbserverport_hllworker,
                readonly=True,
            )
        else:
            cursor_hllworker = None

        # store global for closing connection later
        self.cursor_output = cursor_output
        self.output = LBSNTransfer(
            db_cursor=cursor_output,
            db_connection=conn_output,
            store_csv=csv_output,
            commit_volume=commit_volume,
            SUPPRESS_LINEBREAKS=csv_suppress_linebreaks,
            dbformat_output=dbformat_output,
            hllworker_cursor=cursor_hllworker,
            include_lbsn_bases=include_lbsn_bases,
            dry_run=self.dry_run,
        )
        # load from local json/csv or from PostgresDB
        self.cursor_input = None
        self.is_local_input = is_local_input
        if not self.is_local_input:
            __, cursor_input = LoadData.initialize_connection(
                dbuser_input,
                dbserveraddress_input,
                dbname_input,
                dbpassword_input,
                dbserverport_input,
                readonly=True,
                dict_cursor=True,
            )
            self.cursor_input = cursor_input

        # initialize stats
        self.processed_total = 0
        self.initial_loop = True
        self.how_long = None
        # field mapping structure
        # this is where all the converted data will be stored
        # note that one input record may contain many lbsn records
        self.lbsn_records = LBSNRecordDicts()
        # prepare hmac
        if self.output.dbformat_output == "hll":
            self.output.prepare_hmac(hmac_key)

    def add_processed_records(self, lbsn_record):
        """Adds one or multiple LBSN Records (ProtoBuf)
        to collection (dicts of LBSNRecords)

        Will automatically call self.store_lbsn_records()
        """
        self.lbsn_records.add_records_to_dict(lbsn_record)
        self.processed_total += 1
        # On the first loop
        # or after 50.000 (default) processed records,
        # store results
        if self.initial_loop:
            if self.output.dbformat_output == "lbsn":
                self.output.store_origin(self.origin_id, self.origin_name)
                self.store_lbsn_records()
            self.initial_loop = False
        if self.lbsn_records.count_glob >= self.transfer_count:
            print("\n", end="")
            self.store_lbsn_records()

    def store_lbsn_records(self):
        """Stores processed LBSN Records to chosen output format"""
        self.output.store_lbsn_record_dicts(self.lbsn_records)
        self.output.commit_changes()
        self.lbsn_records.clear()

    def finalize_output(self):
        """finalize all transactions (csv merge etc.)"""
        self.store_lbsn_records()
        self.output.finalize()
        # Close connections to DBs
        if not self.is_local_input:
            self.cursor_input.close()
        if self.dbuser_output:
            self.cursor_output.close()

    @staticmethod
    def close_log():
        """ "Closes log and writes to archive file"""
        logging.shutdown()
        # rename log file for archive purposes
        today = HF.get_str_formatted_today()
        outfile = Path(f"{today}.log")
        with open(outfile, "a+") as outfile:
            with open("log.log") as infile:
                outfile.write(f"\n")
                for line in infile:
                    outfile.write(line)

Static methods

def close_log()

"Closes log and writes to archive file

Expand source code
@staticmethod
def close_log():
    """ "Closes log and writes to archive file"""
    logging.shutdown()
    # rename log file for archive purposes
    today = HF.get_str_formatted_today()
    outfile = Path(f"{today}.log")
    with open(outfile, "a+") as outfile:
        with open("log.log") as infile:
            outfile.write(f"\n")
            for line in infile:
                outfile.write(line)

Methods

def add_processed_records(self, lbsn_record)

Adds one or multiple LBSN Records (ProtoBuf) to collection (dicts of LBSNRecords)

Will automatically call self.store_lbsn_records()

Expand source code
def add_processed_records(self, lbsn_record):
    """Adds one or multiple LBSN Records (ProtoBuf)
    to collection (dicts of LBSNRecords)

    Will automatically call self.store_lbsn_records()
    """
    self.lbsn_records.add_records_to_dict(lbsn_record)
    self.processed_total += 1
    # On the first loop
    # or after 50.000 (default) processed records,
    # store results
    if self.initial_loop:
        if self.output.dbformat_output == "lbsn":
            self.output.store_origin(self.origin_id, self.origin_name)
            self.store_lbsn_records()
        self.initial_loop = False
    if self.lbsn_records.count_glob >= self.transfer_count:
        print("\n", end="")
        self.store_lbsn_records()
def finalize_output(self)

finalize all transactions (csv merge etc.)

Expand source code
def finalize_output(self):
    """finalize all transactions (csv merge etc.)"""
    self.store_lbsn_records()
    self.output.finalize()
    # Close connections to DBs
    if not self.is_local_input:
        self.cursor_input.close()
    if self.dbuser_output:
        self.cursor_output.close()
def store_lbsn_records(self)

Stores processed LBSN Records to chosen output format

Expand source code
def store_lbsn_records(self):
    """Stores processed LBSN Records to chosen output format"""
    self.output.store_lbsn_record_dicts(self.lbsn_records)
    self.output.commit_changes()
    self.lbsn_records.clear()