Module lbsntransform.output.csv.store_csv

Module for storing common Proto LBSN Structure to CSV.

Expand source code
# -*- coding: utf-8 -*-

"""
Module for storing common Proto LBSN Structure to CSV.
"""

# pylint: disable=no-member

import base64
import csv
import os
from contextlib import ExitStack
from glob import glob
from heapq import merge as heapq_merge


from lbsntransform.output.lbsn.shared_structure_proto_lbsndb import \
    ProtoLBSNMapping
from lbsntransform.tools.helper_functions import HelperFunctions as HF


class LBSNcsv():
    """Class to convert and store protobuf records to CSV file(s).

    Because the amount of data might be quite big,
    CSVs are stored incrementally first.
    After no new data arrives, the tool will merge
    all single CSV files and eliminate duplicates.
    This is done using heapq_merge:
        - individual files are first sorted based on primary key
        - lines beginning with the same primry key
        are merged (each field is compared)
    Afterwards, 2 file types are generated:
        a) archive (ProtoBuf)
        b) CSV_copy import (postgres)

    Attributes:
        output_path_file    Where the CSVs will be stored.
                            Note: check for existing files!
        db_mapping          Reference to access functions
                            from ProtoLBSM_db_Mapping class
        store_csv_part      Current Number of CSV file parts
        suppress_linebreaks Usually, linebreaks will not be
                            a problem, linebreaks are nonetheless surpressed.
    """

    def __init__(self, suppress_linebreaks=True):
        self.output_path_file = f'{os.getcwd()}\\02_Output\\'
        self.db_mapping = ProtoLBSNMapping()
        self.store_csv_part = 0
        self.suppress_linebreaks = suppress_linebreaks
        if not os.path.exists(self.output_path_file):
            os.makedirs(self.output_path_file)

    def store_append_batch_to_csv(self, records, round_nr,
                                  type_name, pg_copy_format=False):
        """ Takes proto buf lbsn record dict and appends all records
            to correct CSV (type-specific)
        """
        file_path = f'{self.output_path_file}{type_name}_{round_nr:03d}.csv'
        with open(file_path, 'a', encoding='utf8') as file_handle:
            # csvOutput = csv.writer(f, delimiter=',',
            #                       lineterminator='\n',
            #                       quotechar='"', quoting=csv.QUOTE_MINIMAL)
            for record in records:
                serialized_record_b64 = self.serialize_encode_record(record)
                file_handle.write(
                    f'{record.pkey.id},{serialized_record_b64}\n')

    @classmethod
    def serialize_encode_record(cls, record):
        """ Serializes protobuf record as string and
            encodes in base64 for corrupt-resistant backup/store/transfer
        """
        serialized_record = record.SerializeToString()
        serialized_encoded_record = base64.b64encode(serialized_record)
        serialized_encoded_record_utf = serialized_encoded_record.decode(
            "utf-8")
        return serialized_encoded_record_utf

    # def writeCSVHeader(self, typeName):
    #    # create files and write headers
    #    #for typename, header in self.typeNamesHeaderDict.items():
    #    header = self.typeNamesHeaderDict[typeName]
    #    csvOutput = open(f'{self.output_path_file}'
    #                     f'{typeName}_{self.countRound:03d}.csv',
    #                     'w', encoding='utf8')
    #    csvOutput.write("%s\n" % header)

    def clean_csv_batches(self, batched_records):
        """ Merges all output streams per type at end
            and removes duplicates

            This is necessary because Postgres can't
            import Duplicates with /copy.
            Keep all records in RAM while processing input data is impossible,
            therefore merge happens only once at end.
        """
        x_cnt = 0
        self.store_csv_part += 1
        print('Cleaning and merging output files..')
        for type_name in batched_records:
            x_cnt += 1
            filelist = glob(f'{self.output_path_file}{type_name}_*.csv')
            if filelist:
                self.sort_files(filelist)
                if len(filelist) > 1:
                    print(
                        f'Cleaning & merging output files..{x_cnt}'
                        f'/{len(batched_records)}', end='\r')
                    self.merge_files(filelist, type_name)
                else:
                    # sec = input(f'only one File. {filelist}\n')
                    # no need to merge files if only one round
                    new_filename = filelist[0].replace('_001', '_Proto')
                    if os.path.isfile(new_filename):
                        os.remove(new_filename)
                    if os.path.isfile(filelist[0]):
                        os.rename(filelist[0], new_filename)
                self.remove_merge_duplicate_records_format_csv(type_name)

    @classmethod
    def sort_files(cls, filelist):
        """ Function for sorting files
            (precursor to remove duplicates)
        """
        for file_path in filelist:
            with open(file_path, 'r+', encoding='utf8') as batch_file:
                # skip header
                # header = batchFile.readline()
                lines = batch_file.readlines()
                # sort by first column
                lines.sort(key=lambda a_line: a_line.split()[0])
                # lines.sort()
                batch_file.seek(0)
                # delete original records in file
                batch_file.truncate()
                # write sorted records
                # batchFile.writeline(header)
                for line in lines:
                    batch_file.write(line)

    def merge_files(self, filelist, type_name):
        """ Merges multiple files to one
            using ExitStack
        """
        with ExitStack() as stack:
            files = [stack.enter_context(
                open(fname, encoding='utf8')) for fname in filelist]
            with open(f'{self.output_path_file}{type_name}_Proto.csv',
                      'w', encoding='utf8') as merged_file:
                merged_file.writelines(heapq_merge(*files))
        for file in filelist:
            os.remove(file)

    @classmethod
    def create_proto_by_descriptor_name(cls, desc_name):
        """Create new proto record by name
        """
        new_record = HF.dict_type_switcher(desc_name)
        return new_record

    # def parse_message(self,msgType,stringMessage):
    #        #result_class = reflection.GeneratedProtocolMessageType(msgType,
    #        (stringMessage,),{'DESCRIPTOR': descriptor, '__module__': None})
    #        msgClass=lbsnstructure.Structure_pb2[msgType]
    #        message=msgClass()
    #        message.ParseFromString(stringMessage)
    #        return message

    def remove_merge_duplicate_records_format_csv(self, type_name):
        """ Will merge all single proto batches and
            - remove duplicates
            - output formatted csv

            This Mainloop uses procedures below.
        """
        merged_filename = f'{self.output_path_file}{type_name}_Proto.csv'
        cleaned_merged_filename = f'{self.output_path_file}' \
                                  f'{type_name}_cleaned.csv'
        cleaned_merged_filename_csv = f'{self.output_path_file}' \
                                      f'{type_name}_pgCSV.csv'
        if os.path.isfile(merged_filename):
            merged_file = open(merged_filename, 'r', encoding='utf8')
            cleaned_merged_file = open(
                cleaned_merged_filename, 'w', encoding='utf8')
            cleaned_merged_file_copy = open(
                cleaned_merged_filename_csv, 'w', encoding='utf8')
            self.clean_merged_file(
                merged_file, cleaned_merged_file,
                cleaned_merged_file_copy, type_name)
            os.remove(merged_filename)
            os.rename(cleaned_merged_filename, merged_filename)

    def get_record_id_from_base64_encoded_string(self, line):
        """ Gets record ID from base 64 encoded string
            (unused function)
        """
        record = self.get_record_from_base64_encoded_string(line)
        return record.pkey.id

    def get_record_from_base64_encoded_string(self, line, type_name=None):
        """ Gets ProtoBuf record from base 64 encoded string.
        """
        record = self.create_proto_by_descriptor_name(type_name)
        record.ParseFromString(base64.b64decode(line))  # .strip("\n"))
        return record

    def merge_records(self, duplicate_record_lines, type_name):
        """ Will merge multiple proto buf records to one,
            eliminating duplicates and merging information.
        """
        if len(duplicate_record_lines) > 1:
            # first do a simple compare/unique
            unique_records = set(duplicate_record_lines)
            if len(unique_records) > 1:
                # input(f'Len: {len(unique_records)} : {unique_records}')
                # if more than one unqiue record infos,
                # get first and deep-compare-merge with following
                prev_duprecord = self.get_record_from_base64_encoded_string(
                    duplicate_record_lines[0], type_name)
                for duprecord in duplicate_record_lines[1:]:
                    # merge current record with previous until no more found
                    record = self.get_record_from_base64_encoded_string(
                        duprecord, type_name)
                    # will modify/overwrite prev_duprecord
                    HF.merge_existing_records(
                        prev_duprecord, record)
                merged_record = self.serialize_encode_record(prev_duprecord)
            else:
                # take first element
                merged_record = next(iter(unique_records))
        else:
            merged_record = duplicate_record_lines[0]
        return merged_record

    def format_b64_encoded_record_for_csv(self, record_b64, type_name):
        """ Will convert protobuf base 64 encoded (prepared) records and
            convert to CSV formatted list of lines.
        """
        record = self.get_record_from_base64_encoded_string(
            record_b64, type_name)
        # convert Protobuf to Value list
        prepared_record = self.db_mapping.func_prepare_selector(record)
        formatted_value_list = []
        for value in prepared_record:
            # CSV Writer can't produce CSV that can be directly read
            # by Postgres with /Copy
            # Format some types manually (e.g. arrays, null values)
            if isinstance(value, list):
                value = '{' + ",".join(value) + '}'
            elif self.suppress_linebreaks and isinstance(value, str):
                # replace linebreaks by actual string so we can use
                # heapqMerge to merge line by line
                value = value.replace('\n', '\\n').replace('\r', '\\r')
            formatted_value_list.append(value)
        return formatted_value_list

    def clean_merged_file(self, merged_file, cleaned_merged_file,
                          cleaned_merged_file_copy, type_name):
        """ Will merge files and remove duplicates records.
        """
        dupsremoved = 0
        with merged_file, cleaned_merged_file, cleaned_merged_file_copy:
            header = self.db_mapping.get_header_for_type(type_name)
            cleaned_merged_file_copy.write("%s\n" % header)
            csv_output = csv.writer(cleaned_merged_file_copy, delimiter=',',
                                    lineterminator='\n', quotechar='"',
                                    quoting=csv.QUOTE_MINIMAL)
            # start readlines/compare
            previous_record_id, previous_record_b64 = next(
                merged_file).split(',', 1)
            # strip linebreak from line ending
            previous_record_b64 = previous_record_b64.strip()
            duplicate_record_lines = []
            for line in merged_file:
                record_id, record_b64 = line.split(',', 1)
                # strip linebreak from line ending
                record_b64 = record_b64.strip()
                if record_id == previous_record_id:
                    # if duplicate, add to list to merge later
                    duplicate_record_lines.extend(
                        (previous_record_b64, record_b64))
                    continue
                else:
                    # if different id, do merge [if necessary],
                    # then continue processing
                    if duplicate_record_lines:
                        # add/overwrite new record line
                        # write merged record and continue
                        merged_record_b64 = self.merge_records(
                            duplicate_record_lines, type_name)
                        dupsremoved += len(duplicate_record_lines) - 1
                        duplicate_record_lines = []
                        previous_record_b64 = merged_record_b64
                cleaned_merged_file.write(
                    f'{previous_record_id},{previous_record_b64}\n')
                formatted_value_list = self.format_b64_encoded_record_for_csv(
                    previous_record_b64, type_name)
                csv_output.writerow(formatted_value_list)
                previous_record_id = record_id
                previous_record_b64 = record_b64
            # finally
            if duplicate_record_lines:
                final_merged_record = self.merge_records(
                    duplicate_record_lines, type_name)
            else:
                final_merged_record = previous_record_b64
            cleaned_merged_file.write(
                f'{previous_record_id},{final_merged_record}\n')
            formatted_value_list = self.format_b64_encoded_record_for_csv(
                final_merged_record, type_name)
            csv_output.writerow(formatted_value_list)
        print(
            f'{type_name} Duplicates Merged: {dupsremoved}'
            f'                             ')  # needed for easy overwrite

Classes

class LBSNcsv (suppress_linebreaks=True)

Class to convert and store protobuf records to CSV file(s).

Because the amount of data might be quite big, CSVs are stored incrementally first. After no new data arrives, the tool will merge all single CSV files and eliminate duplicates. This is done using heapq_merge: - individual files are first sorted based on primary key - lines beginning with the same primry key are merged (each field is compared) Afterwards, 2 file types are generated: a) archive (ProtoBuf) b) CSV_copy import (postgres)

Attributes

output_path_file Where the CSVs will be stored. Note: check for existing files! db_mapping Reference to access functions from ProtoLBSM_db_Mapping class store_csv_part Current Number of CSV file parts suppress_linebreaks Usually, linebreaks will not be a problem, linebreaks are nonetheless surpressed.

Expand source code
class LBSNcsv():
    """Class to convert and store protobuf records to CSV file(s).

    Because the amount of data might be quite big,
    CSVs are stored incrementally first.
    After no new data arrives, the tool will merge
    all single CSV files and eliminate duplicates.
    This is done using heapq_merge:
        - individual files are first sorted based on primary key
        - lines beginning with the same primry key
        are merged (each field is compared)
    Afterwards, 2 file types are generated:
        a) archive (ProtoBuf)
        b) CSV_copy import (postgres)

    Attributes:
        output_path_file    Where the CSVs will be stored.
                            Note: check for existing files!
        db_mapping          Reference to access functions
                            from ProtoLBSM_db_Mapping class
        store_csv_part      Current Number of CSV file parts
        suppress_linebreaks Usually, linebreaks will not be
                            a problem, linebreaks are nonetheless surpressed.
    """

    def __init__(self, suppress_linebreaks=True):
        self.output_path_file = f'{os.getcwd()}\\02_Output\\'
        self.db_mapping = ProtoLBSNMapping()
        self.store_csv_part = 0
        self.suppress_linebreaks = suppress_linebreaks
        if not os.path.exists(self.output_path_file):
            os.makedirs(self.output_path_file)

    def store_append_batch_to_csv(self, records, round_nr,
                                  type_name, pg_copy_format=False):
        """ Takes proto buf lbsn record dict and appends all records
            to correct CSV (type-specific)
        """
        file_path = f'{self.output_path_file}{type_name}_{round_nr:03d}.csv'
        with open(file_path, 'a', encoding='utf8') as file_handle:
            # csvOutput = csv.writer(f, delimiter=',',
            #                       lineterminator='\n',
            #                       quotechar='"', quoting=csv.QUOTE_MINIMAL)
            for record in records:
                serialized_record_b64 = self.serialize_encode_record(record)
                file_handle.write(
                    f'{record.pkey.id},{serialized_record_b64}\n')

    @classmethod
    def serialize_encode_record(cls, record):
        """ Serializes protobuf record as string and
            encodes in base64 for corrupt-resistant backup/store/transfer
        """
        serialized_record = record.SerializeToString()
        serialized_encoded_record = base64.b64encode(serialized_record)
        serialized_encoded_record_utf = serialized_encoded_record.decode(
            "utf-8")
        return serialized_encoded_record_utf

    # def writeCSVHeader(self, typeName):
    #    # create files and write headers
    #    #for typename, header in self.typeNamesHeaderDict.items():
    #    header = self.typeNamesHeaderDict[typeName]
    #    csvOutput = open(f'{self.output_path_file}'
    #                     f'{typeName}_{self.countRound:03d}.csv',
    #                     'w', encoding='utf8')
    #    csvOutput.write("%s\n" % header)

    def clean_csv_batches(self, batched_records):
        """ Merges all output streams per type at end
            and removes duplicates

            This is necessary because Postgres can't
            import Duplicates with /copy.
            Keep all records in RAM while processing input data is impossible,
            therefore merge happens only once at end.
        """
        x_cnt = 0
        self.store_csv_part += 1
        print('Cleaning and merging output files..')
        for type_name in batched_records:
            x_cnt += 1
            filelist = glob(f'{self.output_path_file}{type_name}_*.csv')
            if filelist:
                self.sort_files(filelist)
                if len(filelist) > 1:
                    print(
                        f'Cleaning & merging output files..{x_cnt}'
                        f'/{len(batched_records)}', end='\r')
                    self.merge_files(filelist, type_name)
                else:
                    # sec = input(f'only one File. {filelist}\n')
                    # no need to merge files if only one round
                    new_filename = filelist[0].replace('_001', '_Proto')
                    if os.path.isfile(new_filename):
                        os.remove(new_filename)
                    if os.path.isfile(filelist[0]):
                        os.rename(filelist[0], new_filename)
                self.remove_merge_duplicate_records_format_csv(type_name)

    @classmethod
    def sort_files(cls, filelist):
        """ Function for sorting files
            (precursor to remove duplicates)
        """
        for file_path in filelist:
            with open(file_path, 'r+', encoding='utf8') as batch_file:
                # skip header
                # header = batchFile.readline()
                lines = batch_file.readlines()
                # sort by first column
                lines.sort(key=lambda a_line: a_line.split()[0])
                # lines.sort()
                batch_file.seek(0)
                # delete original records in file
                batch_file.truncate()
                # write sorted records
                # batchFile.writeline(header)
                for line in lines:
                    batch_file.write(line)

    def merge_files(self, filelist, type_name):
        """ Merges multiple files to one
            using ExitStack
        """
        with ExitStack() as stack:
            files = [stack.enter_context(
                open(fname, encoding='utf8')) for fname in filelist]
            with open(f'{self.output_path_file}{type_name}_Proto.csv',
                      'w', encoding='utf8') as merged_file:
                merged_file.writelines(heapq_merge(*files))
        for file in filelist:
            os.remove(file)

    @classmethod
    def create_proto_by_descriptor_name(cls, desc_name):
        """Create new proto record by name
        """
        new_record = HF.dict_type_switcher(desc_name)
        return new_record

    # def parse_message(self,msgType,stringMessage):
    #        #result_class = reflection.GeneratedProtocolMessageType(msgType,
    #        (stringMessage,),{'DESCRIPTOR': descriptor, '__module__': None})
    #        msgClass=lbsnstructure.Structure_pb2[msgType]
    #        message=msgClass()
    #        message.ParseFromString(stringMessage)
    #        return message

    def remove_merge_duplicate_records_format_csv(self, type_name):
        """ Will merge all single proto batches and
            - remove duplicates
            - output formatted csv

            This Mainloop uses procedures below.
        """
        merged_filename = f'{self.output_path_file}{type_name}_Proto.csv'
        cleaned_merged_filename = f'{self.output_path_file}' \
                                  f'{type_name}_cleaned.csv'
        cleaned_merged_filename_csv = f'{self.output_path_file}' \
                                      f'{type_name}_pgCSV.csv'
        if os.path.isfile(merged_filename):
            merged_file = open(merged_filename, 'r', encoding='utf8')
            cleaned_merged_file = open(
                cleaned_merged_filename, 'w', encoding='utf8')
            cleaned_merged_file_copy = open(
                cleaned_merged_filename_csv, 'w', encoding='utf8')
            self.clean_merged_file(
                merged_file, cleaned_merged_file,
                cleaned_merged_file_copy, type_name)
            os.remove(merged_filename)
            os.rename(cleaned_merged_filename, merged_filename)

    def get_record_id_from_base64_encoded_string(self, line):
        """ Gets record ID from base 64 encoded string
            (unused function)
        """
        record = self.get_record_from_base64_encoded_string(line)
        return record.pkey.id

    def get_record_from_base64_encoded_string(self, line, type_name=None):
        """ Gets ProtoBuf record from base 64 encoded string.
        """
        record = self.create_proto_by_descriptor_name(type_name)
        record.ParseFromString(base64.b64decode(line))  # .strip("\n"))
        return record

    def merge_records(self, duplicate_record_lines, type_name):
        """ Will merge multiple proto buf records to one,
            eliminating duplicates and merging information.
        """
        if len(duplicate_record_lines) > 1:
            # first do a simple compare/unique
            unique_records = set(duplicate_record_lines)
            if len(unique_records) > 1:
                # input(f'Len: {len(unique_records)} : {unique_records}')
                # if more than one unqiue record infos,
                # get first and deep-compare-merge with following
                prev_duprecord = self.get_record_from_base64_encoded_string(
                    duplicate_record_lines[0], type_name)
                for duprecord in duplicate_record_lines[1:]:
                    # merge current record with previous until no more found
                    record = self.get_record_from_base64_encoded_string(
                        duprecord, type_name)
                    # will modify/overwrite prev_duprecord
                    HF.merge_existing_records(
                        prev_duprecord, record)
                merged_record = self.serialize_encode_record(prev_duprecord)
            else:
                # take first element
                merged_record = next(iter(unique_records))
        else:
            merged_record = duplicate_record_lines[0]
        return merged_record

    def format_b64_encoded_record_for_csv(self, record_b64, type_name):
        """ Will convert protobuf base 64 encoded (prepared) records and
            convert to CSV formatted list of lines.
        """
        record = self.get_record_from_base64_encoded_string(
            record_b64, type_name)
        # convert Protobuf to Value list
        prepared_record = self.db_mapping.func_prepare_selector(record)
        formatted_value_list = []
        for value in prepared_record:
            # CSV Writer can't produce CSV that can be directly read
            # by Postgres with /Copy
            # Format some types manually (e.g. arrays, null values)
            if isinstance(value, list):
                value = '{' + ",".join(value) + '}'
            elif self.suppress_linebreaks and isinstance(value, str):
                # replace linebreaks by actual string so we can use
                # heapqMerge to merge line by line
                value = value.replace('\n', '\\n').replace('\r', '\\r')
            formatted_value_list.append(value)
        return formatted_value_list

    def clean_merged_file(self, merged_file, cleaned_merged_file,
                          cleaned_merged_file_copy, type_name):
        """ Will merge files and remove duplicates records.
        """
        dupsremoved = 0
        with merged_file, cleaned_merged_file, cleaned_merged_file_copy:
            header = self.db_mapping.get_header_for_type(type_name)
            cleaned_merged_file_copy.write("%s\n" % header)
            csv_output = csv.writer(cleaned_merged_file_copy, delimiter=',',
                                    lineterminator='\n', quotechar='"',
                                    quoting=csv.QUOTE_MINIMAL)
            # start readlines/compare
            previous_record_id, previous_record_b64 = next(
                merged_file).split(',', 1)
            # strip linebreak from line ending
            previous_record_b64 = previous_record_b64.strip()
            duplicate_record_lines = []
            for line in merged_file:
                record_id, record_b64 = line.split(',', 1)
                # strip linebreak from line ending
                record_b64 = record_b64.strip()
                if record_id == previous_record_id:
                    # if duplicate, add to list to merge later
                    duplicate_record_lines.extend(
                        (previous_record_b64, record_b64))
                    continue
                else:
                    # if different id, do merge [if necessary],
                    # then continue processing
                    if duplicate_record_lines:
                        # add/overwrite new record line
                        # write merged record and continue
                        merged_record_b64 = self.merge_records(
                            duplicate_record_lines, type_name)
                        dupsremoved += len(duplicate_record_lines) - 1
                        duplicate_record_lines = []
                        previous_record_b64 = merged_record_b64
                cleaned_merged_file.write(
                    f'{previous_record_id},{previous_record_b64}\n')
                formatted_value_list = self.format_b64_encoded_record_for_csv(
                    previous_record_b64, type_name)
                csv_output.writerow(formatted_value_list)
                previous_record_id = record_id
                previous_record_b64 = record_b64
            # finally
            if duplicate_record_lines:
                final_merged_record = self.merge_records(
                    duplicate_record_lines, type_name)
            else:
                final_merged_record = previous_record_b64
            cleaned_merged_file.write(
                f'{previous_record_id},{final_merged_record}\n')
            formatted_value_list = self.format_b64_encoded_record_for_csv(
                final_merged_record, type_name)
            csv_output.writerow(formatted_value_list)
        print(
            f'{type_name} Duplicates Merged: {dupsremoved}'
            f'                             ')  # needed for easy overwrite

Static methods

def create_proto_by_descriptor_name(desc_name)

Create new proto record by name

Expand source code
@classmethod
def create_proto_by_descriptor_name(cls, desc_name):
    """Create new proto record by name
    """
    new_record = HF.dict_type_switcher(desc_name)
    return new_record
def serialize_encode_record(record)

Serializes protobuf record as string and encodes in base64 for corrupt-resistant backup/store/transfer

Expand source code
@classmethod
def serialize_encode_record(cls, record):
    """ Serializes protobuf record as string and
        encodes in base64 for corrupt-resistant backup/store/transfer
    """
    serialized_record = record.SerializeToString()
    serialized_encoded_record = base64.b64encode(serialized_record)
    serialized_encoded_record_utf = serialized_encoded_record.decode(
        "utf-8")
    return serialized_encoded_record_utf
def sort_files(filelist)

Function for sorting files (precursor to remove duplicates)

Expand source code
@classmethod
def sort_files(cls, filelist):
    """ Function for sorting files
        (precursor to remove duplicates)
    """
    for file_path in filelist:
        with open(file_path, 'r+', encoding='utf8') as batch_file:
            # skip header
            # header = batchFile.readline()
            lines = batch_file.readlines()
            # sort by first column
            lines.sort(key=lambda a_line: a_line.split()[0])
            # lines.sort()
            batch_file.seek(0)
            # delete original records in file
            batch_file.truncate()
            # write sorted records
            # batchFile.writeline(header)
            for line in lines:
                batch_file.write(line)

Methods

def clean_csv_batches(self, batched_records)

Merges all output streams per type at end and removes duplicates

This is necessary because Postgres can't import Duplicates with /copy. Keep all records in RAM while processing input data is impossible, therefore merge happens only once at end.

Expand source code
def clean_csv_batches(self, batched_records):
    """ Merges all output streams per type at end
        and removes duplicates

        This is necessary because Postgres can't
        import Duplicates with /copy.
        Keep all records in RAM while processing input data is impossible,
        therefore merge happens only once at end.
    """
    x_cnt = 0
    self.store_csv_part += 1
    print('Cleaning and merging output files..')
    for type_name in batched_records:
        x_cnt += 1
        filelist = glob(f'{self.output_path_file}{type_name}_*.csv')
        if filelist:
            self.sort_files(filelist)
            if len(filelist) > 1:
                print(
                    f'Cleaning & merging output files..{x_cnt}'
                    f'/{len(batched_records)}', end='\r')
                self.merge_files(filelist, type_name)
            else:
                # sec = input(f'only one File. {filelist}\n')
                # no need to merge files if only one round
                new_filename = filelist[0].replace('_001', '_Proto')
                if os.path.isfile(new_filename):
                    os.remove(new_filename)
                if os.path.isfile(filelist[0]):
                    os.rename(filelist[0], new_filename)
            self.remove_merge_duplicate_records_format_csv(type_name)
def clean_merged_file(self, merged_file, cleaned_merged_file, cleaned_merged_file_copy, type_name)

Will merge files and remove duplicates records.

Expand source code
def clean_merged_file(self, merged_file, cleaned_merged_file,
                      cleaned_merged_file_copy, type_name):
    """ Will merge files and remove duplicates records.
    """
    dupsremoved = 0
    with merged_file, cleaned_merged_file, cleaned_merged_file_copy:
        header = self.db_mapping.get_header_for_type(type_name)
        cleaned_merged_file_copy.write("%s\n" % header)
        csv_output = csv.writer(cleaned_merged_file_copy, delimiter=',',
                                lineterminator='\n', quotechar='"',
                                quoting=csv.QUOTE_MINIMAL)
        # start readlines/compare
        previous_record_id, previous_record_b64 = next(
            merged_file).split(',', 1)
        # strip linebreak from line ending
        previous_record_b64 = previous_record_b64.strip()
        duplicate_record_lines = []
        for line in merged_file:
            record_id, record_b64 = line.split(',', 1)
            # strip linebreak from line ending
            record_b64 = record_b64.strip()
            if record_id == previous_record_id:
                # if duplicate, add to list to merge later
                duplicate_record_lines.extend(
                    (previous_record_b64, record_b64))
                continue
            else:
                # if different id, do merge [if necessary],
                # then continue processing
                if duplicate_record_lines:
                    # add/overwrite new record line
                    # write merged record and continue
                    merged_record_b64 = self.merge_records(
                        duplicate_record_lines, type_name)
                    dupsremoved += len(duplicate_record_lines) - 1
                    duplicate_record_lines = []
                    previous_record_b64 = merged_record_b64
            cleaned_merged_file.write(
                f'{previous_record_id},{previous_record_b64}\n')
            formatted_value_list = self.format_b64_encoded_record_for_csv(
                previous_record_b64, type_name)
            csv_output.writerow(formatted_value_list)
            previous_record_id = record_id
            previous_record_b64 = record_b64
        # finally
        if duplicate_record_lines:
            final_merged_record = self.merge_records(
                duplicate_record_lines, type_name)
        else:
            final_merged_record = previous_record_b64
        cleaned_merged_file.write(
            f'{previous_record_id},{final_merged_record}\n')
        formatted_value_list = self.format_b64_encoded_record_for_csv(
            final_merged_record, type_name)
        csv_output.writerow(formatted_value_list)
    print(
        f'{type_name} Duplicates Merged: {dupsremoved}'
        f'                             ')  # needed for easy overwrite
def format_b64_encoded_record_for_csv(self, record_b64, type_name)

Will convert protobuf base 64 encoded (prepared) records and convert to CSV formatted list of lines.

Expand source code
def format_b64_encoded_record_for_csv(self, record_b64, type_name):
    """ Will convert protobuf base 64 encoded (prepared) records and
        convert to CSV formatted list of lines.
    """
    record = self.get_record_from_base64_encoded_string(
        record_b64, type_name)
    # convert Protobuf to Value list
    prepared_record = self.db_mapping.func_prepare_selector(record)
    formatted_value_list = []
    for value in prepared_record:
        # CSV Writer can't produce CSV that can be directly read
        # by Postgres with /Copy
        # Format some types manually (e.g. arrays, null values)
        if isinstance(value, list):
            value = '{' + ",".join(value) + '}'
        elif self.suppress_linebreaks and isinstance(value, str):
            # replace linebreaks by actual string so we can use
            # heapqMerge to merge line by line
            value = value.replace('\n', '\\n').replace('\r', '\\r')
        formatted_value_list.append(value)
    return formatted_value_list
def get_record_from_base64_encoded_string(self, line, type_name=None)

Gets ProtoBuf record from base 64 encoded string.

Expand source code
def get_record_from_base64_encoded_string(self, line, type_name=None):
    """ Gets ProtoBuf record from base 64 encoded string.
    """
    record = self.create_proto_by_descriptor_name(type_name)
    record.ParseFromString(base64.b64decode(line))  # .strip("\n"))
    return record
def get_record_id_from_base64_encoded_string(self, line)

Gets record ID from base 64 encoded string (unused function)

Expand source code
def get_record_id_from_base64_encoded_string(self, line):
    """ Gets record ID from base 64 encoded string
        (unused function)
    """
    record = self.get_record_from_base64_encoded_string(line)
    return record.pkey.id
def merge_files(self, filelist, type_name)

Merges multiple files to one using ExitStack

Expand source code
def merge_files(self, filelist, type_name):
    """ Merges multiple files to one
        using ExitStack
    """
    with ExitStack() as stack:
        files = [stack.enter_context(
            open(fname, encoding='utf8')) for fname in filelist]
        with open(f'{self.output_path_file}{type_name}_Proto.csv',
                  'w', encoding='utf8') as merged_file:
            merged_file.writelines(heapq_merge(*files))
    for file in filelist:
        os.remove(file)
def merge_records(self, duplicate_record_lines, type_name)

Will merge multiple proto buf records to one, eliminating duplicates and merging information.

Expand source code
def merge_records(self, duplicate_record_lines, type_name):
    """ Will merge multiple proto buf records to one,
        eliminating duplicates and merging information.
    """
    if len(duplicate_record_lines) > 1:
        # first do a simple compare/unique
        unique_records = set(duplicate_record_lines)
        if len(unique_records) > 1:
            # input(f'Len: {len(unique_records)} : {unique_records}')
            # if more than one unqiue record infos,
            # get first and deep-compare-merge with following
            prev_duprecord = self.get_record_from_base64_encoded_string(
                duplicate_record_lines[0], type_name)
            for duprecord in duplicate_record_lines[1:]:
                # merge current record with previous until no more found
                record = self.get_record_from_base64_encoded_string(
                    duprecord, type_name)
                # will modify/overwrite prev_duprecord
                HF.merge_existing_records(
                    prev_duprecord, record)
            merged_record = self.serialize_encode_record(prev_duprecord)
        else:
            # take first element
            merged_record = next(iter(unique_records))
    else:
        merged_record = duplicate_record_lines[0]
    return merged_record
def remove_merge_duplicate_records_format_csv(self, type_name)

Will merge all single proto batches and - remove duplicates - output formatted csv

This Mainloop uses procedures below.

Expand source code
def remove_merge_duplicate_records_format_csv(self, type_name):
    """ Will merge all single proto batches and
        - remove duplicates
        - output formatted csv

        This Mainloop uses procedures below.
    """
    merged_filename = f'{self.output_path_file}{type_name}_Proto.csv'
    cleaned_merged_filename = f'{self.output_path_file}' \
                              f'{type_name}_cleaned.csv'
    cleaned_merged_filename_csv = f'{self.output_path_file}' \
                                  f'{type_name}_pgCSV.csv'
    if os.path.isfile(merged_filename):
        merged_file = open(merged_filename, 'r', encoding='utf8')
        cleaned_merged_file = open(
            cleaned_merged_filename, 'w', encoding='utf8')
        cleaned_merged_file_copy = open(
            cleaned_merged_filename_csv, 'w', encoding='utf8')
        self.clean_merged_file(
            merged_file, cleaned_merged_file,
            cleaned_merged_file_copy, type_name)
        os.remove(merged_filename)
        os.rename(cleaned_merged_filename, merged_filename)
def store_append_batch_to_csv(self, records, round_nr, type_name, pg_copy_format=False)

Takes proto buf lbsn record dict and appends all records to correct CSV (type-specific)

Expand source code
def store_append_batch_to_csv(self, records, round_nr,
                              type_name, pg_copy_format=False):
    """ Takes proto buf lbsn record dict and appends all records
        to correct CSV (type-specific)
    """
    file_path = f'{self.output_path_file}{type_name}_{round_nr:03d}.csv'
    with open(file_path, 'a', encoding='utf8') as file_handle:
        # csvOutput = csv.writer(f, delimiter=',',
        #                       lineterminator='\n',
        #                       quotechar='"', quoting=csv.QUOTE_MINIMAL)
        for record in records:
            serialized_record_b64 = self.serialize_encode_record(record)
            file_handle.write(
                f'{record.pkey.id},{serialized_record_b64}\n')