Module lbsntransform.input.mappings.field_mapping_lbsn

Module for mapping LBSN (RAW) to common LBSN Structure (Protobuf).

Expand source code
# -*- coding: utf-8 -*-

"""
Module for mapping LBSN (RAW) to common LBSN Structure (Protobuf).
"""

# pylint: disable=no-member

import logging
from typing import Optional, Dict, Any
import lbsnstructure as lbsn
from google.protobuf.timestamp_pb2 import Timestamp
from google.protobuf.duration_pb2 import Duration
from shapely import wkb

from lbsntransform.tools.helper_functions import HelperFunctions as HF

MAPPING_ID = 0


def parse_geom(geom_hex):
    """Parse Postgis hex WKB to geometry WKT"""
    geom = wkb.loads(geom_hex, hex=True)
    return geom.wkt


def set_lbsn_attr(lbsn_obj, attr_name, in_record, geom: Optional[bool] = None):
    """Sets value for attr_name of lbsn_obj if
    attr_value is not None"""
    attr_value = in_record.get(attr_name)
    if attr_value is None:
        return
    if isinstance(attr_value, (list, dict)):
        if len(attr_value) == 0:
            return
        if isinstance(attr_value, list):
            # filter None values,
            # for backwards compatibility with lbsn databases
            # with erroneous empty data written
            attr_value = list(filter(None, attr_value))
        getattr(lbsn_obj, attr_name).extend(attr_value)
        return
    if geom:
        attr_value = parse_geom(attr_value)
    setattr(lbsn_obj, attr_name, attr_value)


def copydate_lbsn_attr(lbsn_obj_attr, copy_from_val):
    """Some protobuf fields cannot be assigned directly,
    this function applies copyfrom assignment"""
    date_pb = Timestamp()
    date_pb.FromDatetime(copy_from_val)
    lbsn_obj_attr.CopyFrom(date_pb)


def copyduration_lbsn_attr(lbsn_obj_attr, copy_from_val):
    """Some protobuf fields cannot be assigned directly,
    this function applies copyfrom assignment"""
    duration_pb = Duration()
    duration_pb.FromString(copy_from_val)
    lbsn_obj_attr.CopyFrom(duration_pb)


def set_lbsn_pkey(lbsn_obj_pkey, pkey_obj, pkey_val, origin_val):
    """Sets value for lbsn_obj_pkey of pkey_obj if
    pkey_val is not None"""
    if pkey_val is None:
        return
    pkey_obj = HF.new_lbsn_record_with_id(pkey_obj, pkey_val, origin_val)
    lbsn_obj_pkey.CopyFrom(pkey_obj.pkey)


class importer:
    """Provides mapping function from LBSN (raw) endpoints to
    protobuf lbsnstructure
    """

    ORIGIN_NAME = "LBSN"
    ORIGIN_ID = 0

    def __init__(self, **_):
        # We're dealing with LBSN in this class, lets create the OriginID
        # globally
        # this OriginID is required for all CompositeKeys
        origin = lbsn.Origin()
        origin.origin_id = lbsn.Origin.LBSN
        self.origin = origin
        self.null_island = 0
        # this is where all the data will be stored
        # self.lbsn_records = []
        self.log = logging.getLogger("__main__")  # get the main logger object
        self.skipped_count = 0
        self.skipped_low_geoaccuracy = 0

    @classmethod
    def get_func_record(cls, record: Dict[str, Any], input_type: Optional[str] = None):
        """Returns mapping function for input_type"""
        FUNC_MAP = {
            lbsn.Origin().DESCRIPTOR.name: cls.extract_origin,
            lbsn.Country().DESCRIPTOR.name: cls.extract_country,
            lbsn.City().DESCRIPTOR.name: cls.extract_city,
            lbsn.Place().DESCRIPTOR.name: cls.extract_place,
            lbsn.UserGroup().DESCRIPTOR.name: cls.extract_usergroup,
            lbsn.User().DESCRIPTOR.name: cls.extract_user,
            lbsn.Post().DESCRIPTOR.name: cls.extract_post,
            lbsn.PostReaction().DESCRIPTOR.name: cls.extract_postreaction,
            lbsn.Event().DESCRIPTOR.name: cls.extract_event,
        }
        func_map = FUNC_MAP.get(input_type)
        # create origin always the same
        origin = lbsn.Origin()
        origin.origin_id = record.get("origin_id")
        return func_map(record, origin)

    def parse_json_record(
        self, record: Dict[str, Any], input_type: Optional[str] = None
    ):
        """Entry point for LBSN data:

        Attributes:
        record: tuple
            0:    A single row from LBSN, stored as dict
            1:    input_type    Type of LBSN record (User, Post, Place etc.)
        """
        record = self.get_func_record(record, input_type)
        # return list of single item
        return [record]

    @classmethod
    def extract_origin(cls, record, origin):
        origin.name = record.get("name")
        return origin

    @classmethod
    def extract_country(cls, record, origin):
        country = HF.new_lbsn_record_with_id(
            lbsn.Country(), record.get("country_guid"), origin
        )
        set_lbsn_attr(country, "name", record)
        geom_center = record.get("geom_center")
        if geom_center:
            setattr(country, "geom_center", parse_geom(geom_center))
        geom_area = record.get("geom_area")
        if geom_area:
            setattr(country, "geom_area", parse_geom(geom_area))
        set_lbsn_attr(country, "url", record)
        set_lbsn_attr(country, "name_alternatives", record)
        return country

    @classmethod
    def extract_city(cls, record, origin):
        city = HF.new_lbsn_record_with_id(lbsn.City(), record.get("city_guid"), origin)
        set_lbsn_attr(city, "name", record)
        geom_center = record.get("geom_center")
        if geom_center:
            setattr(city, "geom_center", parse_geom(geom_center))
        geom_area = record.get("geom_area")
        if geom_area:
            setattr(city, "geom_area", parse_geom(geom_area))
        country_guid = record.get("country_guid")
        if country_guid:
            city.country_pkey.CopyFrom(
                HF.new_lbsn_record_with_id(
                    lbsn.Country(), record.get("country_guid"), origin
                ).pkey
            )
        set_lbsn_attr(city, "url", record)
        set_lbsn_attr(city, "name_alternatives", record)
        set_lbsn_attr(city, "sub_type", record)
        return city

    @classmethod
    def extract_place(cls, record, origin):
        place = HF.new_lbsn_record_with_id(
            lbsn.Place(), record.get("place_guid"), origin
        )
        set_lbsn_attr(place, "name", record)
        set_lbsn_attr(place, "post_count", record)
        set_lbsn_attr(place, "url", record)
        geom_center = record.get("geom_center")
        if geom_center:
            setattr(place, "geom_center", parse_geom(geom_center))
        geom_area = record.get("geom_area")
        if geom_area:
            setattr(place, "geom_area", parse_geom(geom_area))
        city_guid = record.get("city_guid")
        if city_guid:
            set_lbsn_pkey(place.city_pkey, lbsn.City(), record.get("city_guid"), origin)
        set_lbsn_attr(place, "name_alternatives", record)
        set_lbsn_attr(place, "place_description", record)
        set_lbsn_attr(place, "place_website", record)
        set_lbsn_attr(place, "place_phone", record)
        set_lbsn_attr(place, "address", record)
        set_lbsn_attr(place, "zip_code", record)
        set_lbsn_attr(place, "attributes", record)
        set_lbsn_attr(place, "checkin_count", record)
        set_lbsn_attr(place, "like_count", record)
        set_lbsn_attr(place, "parent_places", record)
        return place

    @classmethod
    def extract_usergroup(cls, record, origin):
        usergroup = HF.new_lbsn_record_with_id(
            lbsn.UserGroup(), record.get("usergroup_guid"), origin
        )
        usergroup.usergroup_name = record.get("usergroup_name")
        usergroup.usergroup_description = record.get("usergroup_description")
        usergroup.member_count = record.get("member_count")
        usergroup.usergroup_createdate = record.get("usergroup_createdate")
        usergroup.user_owner = record.get("user_owner")
        user_owner = record.get("user_owner")
        if user_owner:
            usergroup.user_owner_pkey.CopyFrom(
                HF.new_lbsn_record_with_id(
                    lbsn.User(), record.get("user_owner"), origin
                ).pkey
            )
        return usergroup

    @classmethod
    def extract_user(cls, record, origin):
        user = HF.new_lbsn_record_with_id(lbsn.User(), record.get("user_guid"), origin)
        set_lbsn_attr(user, "user_name", record)
        set_lbsn_attr(user, "user_fullname", record)
        set_lbsn_attr(user, "follows", record)
        set_lbsn_attr(user, "followed", record)
        set_lbsn_attr(user, "biography", record)
        set_lbsn_attr(user, "post_count", record)
        set_lbsn_attr(user, "url", record)
        set_lbsn_attr(user, "is_private", record)
        set_lbsn_attr(user, "is_available", record)

        lang = record.get("user_language")
        if lang:
            ref_user_language = lbsn.Language()
            ref_user_language.language_short = lang
            user.user_language.CopyFrom(ref_user_language)

        set_lbsn_attr(user, "user_location", record)
        user_location_geom = record.get("user_location_geom")
        if user_location_geom:
            setattr(user, "user_location_geom", parse_geom(user_location_geom))
        set_lbsn_attr(user, "liked_count", record)
        active_since = record.get("active_since")
        if active_since:
            copydate_lbsn_attr(user.active_since, active_since)
        set_lbsn_attr(user, "profile_image_url", record)
        set_lbsn_attr(user, "user_timezone", record)
        set_lbsn_attr(user, "user_utc_offset", record)
        set_lbsn_attr(user, "user_groups_member", record)
        set_lbsn_attr(user, "user_groups_follows", record)
        set_lbsn_attr(user, "group_count", record)
        return user

    @classmethod
    def extract_post(cls, record, origin):
        """Extract post attributes

        TODO: Extract nested LBSN objects (e.g. spatial.city etc.)"""
        post = HF.new_lbsn_record_with_id(lbsn.Post(), record.get("post_guid"), origin)
        post_latlng = record.get("post_latlng")
        if post_latlng:
            setattr(post, "post_latlng", parse_geom(post_latlng))
        place_guid = record.get("place_guid")
        if place_guid:
            set_lbsn_pkey(
                post.place_pkey, lbsn.Place(), record.get("place_guid"), origin
            )
        city_guid = record.get("city_guid")
        if city_guid:
            set_lbsn_pkey(post.city_pkey, lbsn.City(), record.get("city_guid"), origin)
        country_guid = record.get("country_guid")
        if country_guid:
            set_lbsn_pkey(
                post.country_pkey, lbsn.Country(), record.get("country_guid"), origin
            )
        set_lbsn_pkey(post.user_pkey, lbsn.User(), record.get("user_guid"), origin)
        pub_date = record.get("post_publish_date")
        if pub_date:
            copydate_lbsn_attr(post.post_publish_date, pub_date)
        set_lbsn_attr(post, "post_body", record)
        geo_acc = record.get("post_geoaccuracy")
        if geo_acc:
            # get enum value
            post.post_geoaccuracy = lbsn.Post.PostGeoaccuracy.Value(geo_acc.upper())
        set_lbsn_attr(post, "hashtags", record)
        set_lbsn_attr(post, "topic_group", record)
        set_lbsn_attr(post, "post_downvotes", record)
        set_lbsn_attr(post, "emoji", record)
        set_lbsn_attr(post, "post_like_count", record)
        set_lbsn_attr(post, "post_comment_count", record)
        set_lbsn_attr(post, "post_views_count", record)
        set_lbsn_attr(post, "post_title", record)
        crt_date = record.get("post_create_date")
        if crt_date:
            copydate_lbsn_attr(post.post_create_date, crt_date)
        set_lbsn_attr(post, "post_thumbnail_url", record)
        set_lbsn_attr(post, "post_url", record)
        post_type = record.get("post_type")
        if post_type:
            # compatibility: earlier lbsnstructure
            # had 'carousel' as post type available,
            # which is now 'image'
            if post_type == "carousel":
                post_type = "image"
            # get enum value
            post.post_type = lbsn.Post.PostType.Value(post_type.upper())
        set_lbsn_attr(post, "post_filter", record)
        set_lbsn_attr(post, "post_quote_count", record)
        set_lbsn_attr(post, "post_share_count", record)
        lang = record.get("post_language")
        if lang:
            ref_post_language = lbsn.Language()
            ref_post_language.language_short = lang
            post.post_language.CopyFrom(ref_post_language)
        set_lbsn_attr(post, "input_source", record)
        user_mentions = record.get("user_mentions")
        if user_mentions:
            mentioned_users_list = []
            for user_id in user_mentions:  # iterate over the list
                ref_user_record = HF.new_lbsn_record_with_id(
                    lbsn.User(), user_id, origin
                )
                mentioned_users_list.append(ref_user_record)
            post.user_mentions_pkey.extend(
                [user_ref.pkey for user_ref in mentioned_users_list]
            )
        set_lbsn_attr(post, "post_content_license", record)
        return post

    @classmethod
    def extract_event(cls, record, origin):
        event = HF.new_lbsn_record_with_id(
            lbsn.Event(), record.get("event_guid"), origin
        )
        set_lbsn_attr(event, "name", record)
        event_latlng = record.get("event_latlng")
        if event_latlng:
            setattr(event, "event_latlng", parse_geom(event_latlng))
        event_area = record.get("event_area")
        if event_area:
            setattr(event, "event_area", parse_geom(event_area))
        set_lbsn_attr(event, "event_website", record)
        event_date = record.get("event_date")
        if event_date:
            copydate_lbsn_attr(event.event_date, event_date)
        event_date_start = record.get("event_date_start")
        if event_date_start:
            copydate_lbsn_attr(event.event_date_start, event_date_start)
        event_date_end = record.get("event_date_end")
        if event_date_end:
            copydate_lbsn_attr(event.event_date_end, event_date_end)
        duration = record.get("duration")
        if duration:
            copyduration_lbsn_attr(event.duration, duration)
        place_guid = record.get("place_guid")
        if place_guid:
            set_lbsn_pkey(
                event.place_pkey, lbsn.Place(), record.get("place_guid"), origin
            )
        city_guid = record.get("city_guid")
        if city_guid:
            set_lbsn_pkey(event.city_pkey, lbsn.City(), record.get("city_guid"), origin)
        country_guid = record.get("country_guid")
        if country_guid:
            set_lbsn_pkey(
                event.country_pkey, lbsn.Country(), record.get("country_guid"), origin
            )
        set_lbsn_pkey(event.user_pkey, lbsn.User(), record.get("user_guid"), origin)
        set_lbsn_attr(event, "event_description", record)
        set_lbsn_attr(event, "event_type", record)
        set_lbsn_attr(event, "event_share_count", record)
        set_lbsn_attr(event, "event_like_count", record)
        set_lbsn_attr(event, "event_comment_count", record)
        set_lbsn_attr(event, "event_views_count", record)
        set_lbsn_attr(event, "event_engage_count", record)
        return event

    @classmethod
    def extract_postreaction(cls, record):
        raise NotImplementedError("Mapping of post reactions is not yet implemented")

Functions

def copydate_lbsn_attr(lbsn_obj_attr, copy_from_val)

Some protobuf fields cannot be assigned directly, this function applies copyfrom assignment

Expand source code
def copydate_lbsn_attr(lbsn_obj_attr, copy_from_val):
    """Some protobuf fields cannot be assigned directly,
    this function applies copyfrom assignment"""
    date_pb = Timestamp()
    date_pb.FromDatetime(copy_from_val)
    lbsn_obj_attr.CopyFrom(date_pb)
def copyduration_lbsn_attr(lbsn_obj_attr, copy_from_val)

Some protobuf fields cannot be assigned directly, this function applies copyfrom assignment

Expand source code
def copyduration_lbsn_attr(lbsn_obj_attr, copy_from_val):
    """Some protobuf fields cannot be assigned directly,
    this function applies copyfrom assignment"""
    duration_pb = Duration()
    duration_pb.FromString(copy_from_val)
    lbsn_obj_attr.CopyFrom(duration_pb)
def parse_geom(geom_hex)

Parse Postgis hex WKB to geometry WKT

Expand source code
def parse_geom(geom_hex):
    """Parse Postgis hex WKB to geometry WKT"""
    geom = wkb.loads(geom_hex, hex=True)
    return geom.wkt
def set_lbsn_attr(lbsn_obj, attr_name, in_record, geom: Optional[bool] = None)

Sets value for attr_name of lbsn_obj if attr_value is not None

Expand source code
def set_lbsn_attr(lbsn_obj, attr_name, in_record, geom: Optional[bool] = None):
    """Sets value for attr_name of lbsn_obj if
    attr_value is not None"""
    attr_value = in_record.get(attr_name)
    if attr_value is None:
        return
    if isinstance(attr_value, (list, dict)):
        if len(attr_value) == 0:
            return
        if isinstance(attr_value, list):
            # filter None values,
            # for backwards compatibility with lbsn databases
            # with erroneous empty data written
            attr_value = list(filter(None, attr_value))
        getattr(lbsn_obj, attr_name).extend(attr_value)
        return
    if geom:
        attr_value = parse_geom(attr_value)
    setattr(lbsn_obj, attr_name, attr_value)
def set_lbsn_pkey(lbsn_obj_pkey, pkey_obj, pkey_val, origin_val)

Sets value for lbsn_obj_pkey of pkey_obj if pkey_val is not None

Expand source code
def set_lbsn_pkey(lbsn_obj_pkey, pkey_obj, pkey_val, origin_val):
    """Sets value for lbsn_obj_pkey of pkey_obj if
    pkey_val is not None"""
    if pkey_val is None:
        return
    pkey_obj = HF.new_lbsn_record_with_id(pkey_obj, pkey_val, origin_val)
    lbsn_obj_pkey.CopyFrom(pkey_obj.pkey)

Classes

class importer (**_)

Provides mapping function from LBSN (raw) endpoints to protobuf lbsnstructure

Expand source code
class importer:
    """Provides mapping function from LBSN (raw) endpoints to
    protobuf lbsnstructure
    """

    ORIGIN_NAME = "LBSN"
    ORIGIN_ID = 0

    def __init__(self, **_):
        # We're dealing with LBSN in this class, lets create the OriginID
        # globally
        # this OriginID is required for all CompositeKeys
        origin = lbsn.Origin()
        origin.origin_id = lbsn.Origin.LBSN
        self.origin = origin
        self.null_island = 0
        # this is where all the data will be stored
        # self.lbsn_records = []
        self.log = logging.getLogger("__main__")  # get the main logger object
        self.skipped_count = 0
        self.skipped_low_geoaccuracy = 0

    @classmethod
    def get_func_record(cls, record: Dict[str, Any], input_type: Optional[str] = None):
        """Returns mapping function for input_type"""
        FUNC_MAP = {
            lbsn.Origin().DESCRIPTOR.name: cls.extract_origin,
            lbsn.Country().DESCRIPTOR.name: cls.extract_country,
            lbsn.City().DESCRIPTOR.name: cls.extract_city,
            lbsn.Place().DESCRIPTOR.name: cls.extract_place,
            lbsn.UserGroup().DESCRIPTOR.name: cls.extract_usergroup,
            lbsn.User().DESCRIPTOR.name: cls.extract_user,
            lbsn.Post().DESCRIPTOR.name: cls.extract_post,
            lbsn.PostReaction().DESCRIPTOR.name: cls.extract_postreaction,
            lbsn.Event().DESCRIPTOR.name: cls.extract_event,
        }
        func_map = FUNC_MAP.get(input_type)
        # create origin always the same
        origin = lbsn.Origin()
        origin.origin_id = record.get("origin_id")
        return func_map(record, origin)

    def parse_json_record(
        self, record: Dict[str, Any], input_type: Optional[str] = None
    ):
        """Entry point for LBSN data:

        Attributes:
        record: tuple
            0:    A single row from LBSN, stored as dict
            1:    input_type    Type of LBSN record (User, Post, Place etc.)
        """
        record = self.get_func_record(record, input_type)
        # return list of single item
        return [record]

    @classmethod
    def extract_origin(cls, record, origin):
        origin.name = record.get("name")
        return origin

    @classmethod
    def extract_country(cls, record, origin):
        country = HF.new_lbsn_record_with_id(
            lbsn.Country(), record.get("country_guid"), origin
        )
        set_lbsn_attr(country, "name", record)
        geom_center = record.get("geom_center")
        if geom_center:
            setattr(country, "geom_center", parse_geom(geom_center))
        geom_area = record.get("geom_area")
        if geom_area:
            setattr(country, "geom_area", parse_geom(geom_area))
        set_lbsn_attr(country, "url", record)
        set_lbsn_attr(country, "name_alternatives", record)
        return country

    @classmethod
    def extract_city(cls, record, origin):
        city = HF.new_lbsn_record_with_id(lbsn.City(), record.get("city_guid"), origin)
        set_lbsn_attr(city, "name", record)
        geom_center = record.get("geom_center")
        if geom_center:
            setattr(city, "geom_center", parse_geom(geom_center))
        geom_area = record.get("geom_area")
        if geom_area:
            setattr(city, "geom_area", parse_geom(geom_area))
        country_guid = record.get("country_guid")
        if country_guid:
            city.country_pkey.CopyFrom(
                HF.new_lbsn_record_with_id(
                    lbsn.Country(), record.get("country_guid"), origin
                ).pkey
            )
        set_lbsn_attr(city, "url", record)
        set_lbsn_attr(city, "name_alternatives", record)
        set_lbsn_attr(city, "sub_type", record)
        return city

    @classmethod
    def extract_place(cls, record, origin):
        place = HF.new_lbsn_record_with_id(
            lbsn.Place(), record.get("place_guid"), origin
        )
        set_lbsn_attr(place, "name", record)
        set_lbsn_attr(place, "post_count", record)
        set_lbsn_attr(place, "url", record)
        geom_center = record.get("geom_center")
        if geom_center:
            setattr(place, "geom_center", parse_geom(geom_center))
        geom_area = record.get("geom_area")
        if geom_area:
            setattr(place, "geom_area", parse_geom(geom_area))
        city_guid = record.get("city_guid")
        if city_guid:
            set_lbsn_pkey(place.city_pkey, lbsn.City(), record.get("city_guid"), origin)
        set_lbsn_attr(place, "name_alternatives", record)
        set_lbsn_attr(place, "place_description", record)
        set_lbsn_attr(place, "place_website", record)
        set_lbsn_attr(place, "place_phone", record)
        set_lbsn_attr(place, "address", record)
        set_lbsn_attr(place, "zip_code", record)
        set_lbsn_attr(place, "attributes", record)
        set_lbsn_attr(place, "checkin_count", record)
        set_lbsn_attr(place, "like_count", record)
        set_lbsn_attr(place, "parent_places", record)
        return place

    @classmethod
    def extract_usergroup(cls, record, origin):
        usergroup = HF.new_lbsn_record_with_id(
            lbsn.UserGroup(), record.get("usergroup_guid"), origin
        )
        usergroup.usergroup_name = record.get("usergroup_name")
        usergroup.usergroup_description = record.get("usergroup_description")
        usergroup.member_count = record.get("member_count")
        usergroup.usergroup_createdate = record.get("usergroup_createdate")
        usergroup.user_owner = record.get("user_owner")
        user_owner = record.get("user_owner")
        if user_owner:
            usergroup.user_owner_pkey.CopyFrom(
                HF.new_lbsn_record_with_id(
                    lbsn.User(), record.get("user_owner"), origin
                ).pkey
            )
        return usergroup

    @classmethod
    def extract_user(cls, record, origin):
        user = HF.new_lbsn_record_with_id(lbsn.User(), record.get("user_guid"), origin)
        set_lbsn_attr(user, "user_name", record)
        set_lbsn_attr(user, "user_fullname", record)
        set_lbsn_attr(user, "follows", record)
        set_lbsn_attr(user, "followed", record)
        set_lbsn_attr(user, "biography", record)
        set_lbsn_attr(user, "post_count", record)
        set_lbsn_attr(user, "url", record)
        set_lbsn_attr(user, "is_private", record)
        set_lbsn_attr(user, "is_available", record)

        lang = record.get("user_language")
        if lang:
            ref_user_language = lbsn.Language()
            ref_user_language.language_short = lang
            user.user_language.CopyFrom(ref_user_language)

        set_lbsn_attr(user, "user_location", record)
        user_location_geom = record.get("user_location_geom")
        if user_location_geom:
            setattr(user, "user_location_geom", parse_geom(user_location_geom))
        set_lbsn_attr(user, "liked_count", record)
        active_since = record.get("active_since")
        if active_since:
            copydate_lbsn_attr(user.active_since, active_since)
        set_lbsn_attr(user, "profile_image_url", record)
        set_lbsn_attr(user, "user_timezone", record)
        set_lbsn_attr(user, "user_utc_offset", record)
        set_lbsn_attr(user, "user_groups_member", record)
        set_lbsn_attr(user, "user_groups_follows", record)
        set_lbsn_attr(user, "group_count", record)
        return user

    @classmethod
    def extract_post(cls, record, origin):
        """Extract post attributes

        TODO: Extract nested LBSN objects (e.g. spatial.city etc.)"""
        post = HF.new_lbsn_record_with_id(lbsn.Post(), record.get("post_guid"), origin)
        post_latlng = record.get("post_latlng")
        if post_latlng:
            setattr(post, "post_latlng", parse_geom(post_latlng))
        place_guid = record.get("place_guid")
        if place_guid:
            set_lbsn_pkey(
                post.place_pkey, lbsn.Place(), record.get("place_guid"), origin
            )
        city_guid = record.get("city_guid")
        if city_guid:
            set_lbsn_pkey(post.city_pkey, lbsn.City(), record.get("city_guid"), origin)
        country_guid = record.get("country_guid")
        if country_guid:
            set_lbsn_pkey(
                post.country_pkey, lbsn.Country(), record.get("country_guid"), origin
            )
        set_lbsn_pkey(post.user_pkey, lbsn.User(), record.get("user_guid"), origin)
        pub_date = record.get("post_publish_date")
        if pub_date:
            copydate_lbsn_attr(post.post_publish_date, pub_date)
        set_lbsn_attr(post, "post_body", record)
        geo_acc = record.get("post_geoaccuracy")
        if geo_acc:
            # get enum value
            post.post_geoaccuracy = lbsn.Post.PostGeoaccuracy.Value(geo_acc.upper())
        set_lbsn_attr(post, "hashtags", record)
        set_lbsn_attr(post, "topic_group", record)
        set_lbsn_attr(post, "post_downvotes", record)
        set_lbsn_attr(post, "emoji", record)
        set_lbsn_attr(post, "post_like_count", record)
        set_lbsn_attr(post, "post_comment_count", record)
        set_lbsn_attr(post, "post_views_count", record)
        set_lbsn_attr(post, "post_title", record)
        crt_date = record.get("post_create_date")
        if crt_date:
            copydate_lbsn_attr(post.post_create_date, crt_date)
        set_lbsn_attr(post, "post_thumbnail_url", record)
        set_lbsn_attr(post, "post_url", record)
        post_type = record.get("post_type")
        if post_type:
            # compatibility: earlier lbsnstructure
            # had 'carousel' as post type available,
            # which is now 'image'
            if post_type == "carousel":
                post_type = "image"
            # get enum value
            post.post_type = lbsn.Post.PostType.Value(post_type.upper())
        set_lbsn_attr(post, "post_filter", record)
        set_lbsn_attr(post, "post_quote_count", record)
        set_lbsn_attr(post, "post_share_count", record)
        lang = record.get("post_language")
        if lang:
            ref_post_language = lbsn.Language()
            ref_post_language.language_short = lang
            post.post_language.CopyFrom(ref_post_language)
        set_lbsn_attr(post, "input_source", record)
        user_mentions = record.get("user_mentions")
        if user_mentions:
            mentioned_users_list = []
            for user_id in user_mentions:  # iterate over the list
                ref_user_record = HF.new_lbsn_record_with_id(
                    lbsn.User(), user_id, origin
                )
                mentioned_users_list.append(ref_user_record)
            post.user_mentions_pkey.extend(
                [user_ref.pkey for user_ref in mentioned_users_list]
            )
        set_lbsn_attr(post, "post_content_license", record)
        return post

    @classmethod
    def extract_event(cls, record, origin):
        event = HF.new_lbsn_record_with_id(
            lbsn.Event(), record.get("event_guid"), origin
        )
        set_lbsn_attr(event, "name", record)
        event_latlng = record.get("event_latlng")
        if event_latlng:
            setattr(event, "event_latlng", parse_geom(event_latlng))
        event_area = record.get("event_area")
        if event_area:
            setattr(event, "event_area", parse_geom(event_area))
        set_lbsn_attr(event, "event_website", record)
        event_date = record.get("event_date")
        if event_date:
            copydate_lbsn_attr(event.event_date, event_date)
        event_date_start = record.get("event_date_start")
        if event_date_start:
            copydate_lbsn_attr(event.event_date_start, event_date_start)
        event_date_end = record.get("event_date_end")
        if event_date_end:
            copydate_lbsn_attr(event.event_date_end, event_date_end)
        duration = record.get("duration")
        if duration:
            copyduration_lbsn_attr(event.duration, duration)
        place_guid = record.get("place_guid")
        if place_guid:
            set_lbsn_pkey(
                event.place_pkey, lbsn.Place(), record.get("place_guid"), origin
            )
        city_guid = record.get("city_guid")
        if city_guid:
            set_lbsn_pkey(event.city_pkey, lbsn.City(), record.get("city_guid"), origin)
        country_guid = record.get("country_guid")
        if country_guid:
            set_lbsn_pkey(
                event.country_pkey, lbsn.Country(), record.get("country_guid"), origin
            )
        set_lbsn_pkey(event.user_pkey, lbsn.User(), record.get("user_guid"), origin)
        set_lbsn_attr(event, "event_description", record)
        set_lbsn_attr(event, "event_type", record)
        set_lbsn_attr(event, "event_share_count", record)
        set_lbsn_attr(event, "event_like_count", record)
        set_lbsn_attr(event, "event_comment_count", record)
        set_lbsn_attr(event, "event_views_count", record)
        set_lbsn_attr(event, "event_engage_count", record)
        return event

    @classmethod
    def extract_postreaction(cls, record):
        raise NotImplementedError("Mapping of post reactions is not yet implemented")

Class variables

var ORIGIN_ID
var ORIGIN_NAME

Static methods

def extract_city(record, origin)
Expand source code
@classmethod
def extract_city(cls, record, origin):
    city = HF.new_lbsn_record_with_id(lbsn.City(), record.get("city_guid"), origin)
    set_lbsn_attr(city, "name", record)
    geom_center = record.get("geom_center")
    if geom_center:
        setattr(city, "geom_center", parse_geom(geom_center))
    geom_area = record.get("geom_area")
    if geom_area:
        setattr(city, "geom_area", parse_geom(geom_area))
    country_guid = record.get("country_guid")
    if country_guid:
        city.country_pkey.CopyFrom(
            HF.new_lbsn_record_with_id(
                lbsn.Country(), record.get("country_guid"), origin
            ).pkey
        )
    set_lbsn_attr(city, "url", record)
    set_lbsn_attr(city, "name_alternatives", record)
    set_lbsn_attr(city, "sub_type", record)
    return city
def extract_country(record, origin)
Expand source code
@classmethod
def extract_country(cls, record, origin):
    country = HF.new_lbsn_record_with_id(
        lbsn.Country(), record.get("country_guid"), origin
    )
    set_lbsn_attr(country, "name", record)
    geom_center = record.get("geom_center")
    if geom_center:
        setattr(country, "geom_center", parse_geom(geom_center))
    geom_area = record.get("geom_area")
    if geom_area:
        setattr(country, "geom_area", parse_geom(geom_area))
    set_lbsn_attr(country, "url", record)
    set_lbsn_attr(country, "name_alternatives", record)
    return country
def extract_event(record, origin)
Expand source code
@classmethod
def extract_event(cls, record, origin):
    event = HF.new_lbsn_record_with_id(
        lbsn.Event(), record.get("event_guid"), origin
    )
    set_lbsn_attr(event, "name", record)
    event_latlng = record.get("event_latlng")
    if event_latlng:
        setattr(event, "event_latlng", parse_geom(event_latlng))
    event_area = record.get("event_area")
    if event_area:
        setattr(event, "event_area", parse_geom(event_area))
    set_lbsn_attr(event, "event_website", record)
    event_date = record.get("event_date")
    if event_date:
        copydate_lbsn_attr(event.event_date, event_date)
    event_date_start = record.get("event_date_start")
    if event_date_start:
        copydate_lbsn_attr(event.event_date_start, event_date_start)
    event_date_end = record.get("event_date_end")
    if event_date_end:
        copydate_lbsn_attr(event.event_date_end, event_date_end)
    duration = record.get("duration")
    if duration:
        copyduration_lbsn_attr(event.duration, duration)
    place_guid = record.get("place_guid")
    if place_guid:
        set_lbsn_pkey(
            event.place_pkey, lbsn.Place(), record.get("place_guid"), origin
        )
    city_guid = record.get("city_guid")
    if city_guid:
        set_lbsn_pkey(event.city_pkey, lbsn.City(), record.get("city_guid"), origin)
    country_guid = record.get("country_guid")
    if country_guid:
        set_lbsn_pkey(
            event.country_pkey, lbsn.Country(), record.get("country_guid"), origin
        )
    set_lbsn_pkey(event.user_pkey, lbsn.User(), record.get("user_guid"), origin)
    set_lbsn_attr(event, "event_description", record)
    set_lbsn_attr(event, "event_type", record)
    set_lbsn_attr(event, "event_share_count", record)
    set_lbsn_attr(event, "event_like_count", record)
    set_lbsn_attr(event, "event_comment_count", record)
    set_lbsn_attr(event, "event_views_count", record)
    set_lbsn_attr(event, "event_engage_count", record)
    return event
def extract_origin(record, origin)
Expand source code
@classmethod
def extract_origin(cls, record, origin):
    origin.name = record.get("name")
    return origin
def extract_place(record, origin)
Expand source code
@classmethod
def extract_place(cls, record, origin):
    place = HF.new_lbsn_record_with_id(
        lbsn.Place(), record.get("place_guid"), origin
    )
    set_lbsn_attr(place, "name", record)
    set_lbsn_attr(place, "post_count", record)
    set_lbsn_attr(place, "url", record)
    geom_center = record.get("geom_center")
    if geom_center:
        setattr(place, "geom_center", parse_geom(geom_center))
    geom_area = record.get("geom_area")
    if geom_area:
        setattr(place, "geom_area", parse_geom(geom_area))
    city_guid = record.get("city_guid")
    if city_guid:
        set_lbsn_pkey(place.city_pkey, lbsn.City(), record.get("city_guid"), origin)
    set_lbsn_attr(place, "name_alternatives", record)
    set_lbsn_attr(place, "place_description", record)
    set_lbsn_attr(place, "place_website", record)
    set_lbsn_attr(place, "place_phone", record)
    set_lbsn_attr(place, "address", record)
    set_lbsn_attr(place, "zip_code", record)
    set_lbsn_attr(place, "attributes", record)
    set_lbsn_attr(place, "checkin_count", record)
    set_lbsn_attr(place, "like_count", record)
    set_lbsn_attr(place, "parent_places", record)
    return place
def extract_post(record, origin)

Extract post attributes

TODO: Extract nested LBSN objects (e.g. spatial.city etc.)

Expand source code
@classmethod
def extract_post(cls, record, origin):
    """Extract post attributes

    TODO: Extract nested LBSN objects (e.g. spatial.city etc.)"""
    post = HF.new_lbsn_record_with_id(lbsn.Post(), record.get("post_guid"), origin)
    post_latlng = record.get("post_latlng")
    if post_latlng:
        setattr(post, "post_latlng", parse_geom(post_latlng))
    place_guid = record.get("place_guid")
    if place_guid:
        set_lbsn_pkey(
            post.place_pkey, lbsn.Place(), record.get("place_guid"), origin
        )
    city_guid = record.get("city_guid")
    if city_guid:
        set_lbsn_pkey(post.city_pkey, lbsn.City(), record.get("city_guid"), origin)
    country_guid = record.get("country_guid")
    if country_guid:
        set_lbsn_pkey(
            post.country_pkey, lbsn.Country(), record.get("country_guid"), origin
        )
    set_lbsn_pkey(post.user_pkey, lbsn.User(), record.get("user_guid"), origin)
    pub_date = record.get("post_publish_date")
    if pub_date:
        copydate_lbsn_attr(post.post_publish_date, pub_date)
    set_lbsn_attr(post, "post_body", record)
    geo_acc = record.get("post_geoaccuracy")
    if geo_acc:
        # get enum value
        post.post_geoaccuracy = lbsn.Post.PostGeoaccuracy.Value(geo_acc.upper())
    set_lbsn_attr(post, "hashtags", record)
    set_lbsn_attr(post, "topic_group", record)
    set_lbsn_attr(post, "post_downvotes", record)
    set_lbsn_attr(post, "emoji", record)
    set_lbsn_attr(post, "post_like_count", record)
    set_lbsn_attr(post, "post_comment_count", record)
    set_lbsn_attr(post, "post_views_count", record)
    set_lbsn_attr(post, "post_title", record)
    crt_date = record.get("post_create_date")
    if crt_date:
        copydate_lbsn_attr(post.post_create_date, crt_date)
    set_lbsn_attr(post, "post_thumbnail_url", record)
    set_lbsn_attr(post, "post_url", record)
    post_type = record.get("post_type")
    if post_type:
        # compatibility: earlier lbsnstructure
        # had 'carousel' as post type available,
        # which is now 'image'
        if post_type == "carousel":
            post_type = "image"
        # get enum value
        post.post_type = lbsn.Post.PostType.Value(post_type.upper())
    set_lbsn_attr(post, "post_filter", record)
    set_lbsn_attr(post, "post_quote_count", record)
    set_lbsn_attr(post, "post_share_count", record)
    lang = record.get("post_language")
    if lang:
        ref_post_language = lbsn.Language()
        ref_post_language.language_short = lang
        post.post_language.CopyFrom(ref_post_language)
    set_lbsn_attr(post, "input_source", record)
    user_mentions = record.get("user_mentions")
    if user_mentions:
        mentioned_users_list = []
        for user_id in user_mentions:  # iterate over the list
            ref_user_record = HF.new_lbsn_record_with_id(
                lbsn.User(), user_id, origin
            )
            mentioned_users_list.append(ref_user_record)
        post.user_mentions_pkey.extend(
            [user_ref.pkey for user_ref in mentioned_users_list]
        )
    set_lbsn_attr(post, "post_content_license", record)
    return post
def extract_postreaction(record)
Expand source code
@classmethod
def extract_postreaction(cls, record):
    raise NotImplementedError("Mapping of post reactions is not yet implemented")
def extract_user(record, origin)
Expand source code
@classmethod
def extract_user(cls, record, origin):
    user = HF.new_lbsn_record_with_id(lbsn.User(), record.get("user_guid"), origin)
    set_lbsn_attr(user, "user_name", record)
    set_lbsn_attr(user, "user_fullname", record)
    set_lbsn_attr(user, "follows", record)
    set_lbsn_attr(user, "followed", record)
    set_lbsn_attr(user, "biography", record)
    set_lbsn_attr(user, "post_count", record)
    set_lbsn_attr(user, "url", record)
    set_lbsn_attr(user, "is_private", record)
    set_lbsn_attr(user, "is_available", record)

    lang = record.get("user_language")
    if lang:
        ref_user_language = lbsn.Language()
        ref_user_language.language_short = lang
        user.user_language.CopyFrom(ref_user_language)

    set_lbsn_attr(user, "user_location", record)
    user_location_geom = record.get("user_location_geom")
    if user_location_geom:
        setattr(user, "user_location_geom", parse_geom(user_location_geom))
    set_lbsn_attr(user, "liked_count", record)
    active_since = record.get("active_since")
    if active_since:
        copydate_lbsn_attr(user.active_since, active_since)
    set_lbsn_attr(user, "profile_image_url", record)
    set_lbsn_attr(user, "user_timezone", record)
    set_lbsn_attr(user, "user_utc_offset", record)
    set_lbsn_attr(user, "user_groups_member", record)
    set_lbsn_attr(user, "user_groups_follows", record)
    set_lbsn_attr(user, "group_count", record)
    return user
def extract_usergroup(record, origin)
Expand source code
@classmethod
def extract_usergroup(cls, record, origin):
    usergroup = HF.new_lbsn_record_with_id(
        lbsn.UserGroup(), record.get("usergroup_guid"), origin
    )
    usergroup.usergroup_name = record.get("usergroup_name")
    usergroup.usergroup_description = record.get("usergroup_description")
    usergroup.member_count = record.get("member_count")
    usergroup.usergroup_createdate = record.get("usergroup_createdate")
    usergroup.user_owner = record.get("user_owner")
    user_owner = record.get("user_owner")
    if user_owner:
        usergroup.user_owner_pkey.CopyFrom(
            HF.new_lbsn_record_with_id(
                lbsn.User(), record.get("user_owner"), origin
            ).pkey
        )
    return usergroup
def get_func_record(record: Dict[str, Any], input_type: Optional[str] = None)

Returns mapping function for input_type

Expand source code
@classmethod
def get_func_record(cls, record: Dict[str, Any], input_type: Optional[str] = None):
    """Returns mapping function for input_type"""
    FUNC_MAP = {
        lbsn.Origin().DESCRIPTOR.name: cls.extract_origin,
        lbsn.Country().DESCRIPTOR.name: cls.extract_country,
        lbsn.City().DESCRIPTOR.name: cls.extract_city,
        lbsn.Place().DESCRIPTOR.name: cls.extract_place,
        lbsn.UserGroup().DESCRIPTOR.name: cls.extract_usergroup,
        lbsn.User().DESCRIPTOR.name: cls.extract_user,
        lbsn.Post().DESCRIPTOR.name: cls.extract_post,
        lbsn.PostReaction().DESCRIPTOR.name: cls.extract_postreaction,
        lbsn.Event().DESCRIPTOR.name: cls.extract_event,
    }
    func_map = FUNC_MAP.get(input_type)
    # create origin always the same
    origin = lbsn.Origin()
    origin.origin_id = record.get("origin_id")
    return func_map(record, origin)

Methods

def parse_json_record(self, record: Dict[str, Any], input_type: Optional[str] = None)

Entry point for LBSN data:

Attributes: record: tuple 0: A single row from LBSN, stored as dict 1: input_type Type of LBSN record (User, Post, Place etc.)

Expand source code
def parse_json_record(
    self, record: Dict[str, Any], input_type: Optional[str] = None
):
    """Entry point for LBSN data:

    Attributes:
    record: tuple
        0:    A single row from LBSN, stored as dict
        1:    input_type    Type of LBSN record (User, Post, Place etc.)
    """
    record = self.get_func_record(record, input_type)
    # return list of single item
    return [record]