Module lbsntransform.tools.db_connection

Provides functions to connect to Postgres DB

Expand source code
# -*- coding: utf-8 -*-
"""Provides functions to connect to Postgres DB

"""

import datetime
import getpass
import logging
import sys

import psycopg2
import psycopg2.extras
from psycopg2.extras import DictCursor

LOG = logging.getLogger()


class DBConnection():
    """ Class for connectiong to Postgres. """

    def __init__(self, serveraddress=None, dbname=None,
                 user=None, password=0, readonly=False,
                 sslmode='prefer', port=5432):
        """Initialize DBConnection object with attributes, if passed. """
        self.serveraddress = serveraddress
        self.dbname = dbname
        self.user = user
        self.password = password
        self.readonly = readonly
        self.sslmode = sslmode  # Choose 'disable' to connect without ssl
        self.port = port

    def connect(self, dict_cursor=None):
        """Database config. """
        conf = {
            "host": self.serveraddress,
            "dbname": self.dbname,
            "user": self.user,
            "sslmode": self.sslmode,
            "port": self.port
        }
        # Connect to database
        if self.password == 0:
            promt_txt = f"Enter the password for {conf['user']}: "
            if sys.stdin.isatty():
                conf["password"] = getpass.getpass(prompt=promt_txt)
            else:
                print(promt_txt)
                conf["password"] = sys.stdin.readline().rstrip()
        else:
            conf["password"] = self.password
        # define connection string
        conn_string = f"host='{conf['host']}'" \
                      f"dbname='{conf['dbname']}'" \
                      f"user='{conf['user']}'" \
                      f"password='{conf['password']}'" \
                      f"sslmode='{conf['sslmode']}'" \
                      f"port='{conf['port']}'" \
                      f"application_name='LBSN Batch Transfer'"
        cursor_factory = None
        if dict_cursor:
            cursor_factory = DictCursor
        # get a connection, if a connect cannot be made an
        # exception will be raised here
        try:
            conn = psycopg2.connect(conn_string, cursor_factory=cursor_factory)
        except Exception as err:
            print(err)
            sys.exit()
        # activate dict to hstore conversion globally
        psycopg2.extras.register_hstore(conn, globally=True)
        # conn.cursor will return a cursor object,
        # this will be used to perform queries
        cursor = conn.cursor()
        dnow = datetime.datetime.now()
        LOG.info(f'{dnow.strftime("%Y-%m-%d %H:%M:%S")} '
                 f'- Connected to {self.dbname}')
        return conn, cursor

Classes

class DBConnection (serveraddress=None, dbname=None, user=None, password=0, readonly=False, sslmode='prefer', port=5432)

Class for connectiong to Postgres.

Initialize DBConnection object with attributes, if passed.

Expand source code
class DBConnection():
    """ Class for connectiong to Postgres. """

    def __init__(self, serveraddress=None, dbname=None,
                 user=None, password=0, readonly=False,
                 sslmode='prefer', port=5432):
        """Initialize DBConnection object with attributes, if passed. """
        self.serveraddress = serveraddress
        self.dbname = dbname
        self.user = user
        self.password = password
        self.readonly = readonly
        self.sslmode = sslmode  # Choose 'disable' to connect without ssl
        self.port = port

    def connect(self, dict_cursor=None):
        """Database config. """
        conf = {
            "host": self.serveraddress,
            "dbname": self.dbname,
            "user": self.user,
            "sslmode": self.sslmode,
            "port": self.port
        }
        # Connect to database
        if self.password == 0:
            promt_txt = f"Enter the password for {conf['user']}: "
            if sys.stdin.isatty():
                conf["password"] = getpass.getpass(prompt=promt_txt)
            else:
                print(promt_txt)
                conf["password"] = sys.stdin.readline().rstrip()
        else:
            conf["password"] = self.password
        # define connection string
        conn_string = f"host='{conf['host']}'" \
                      f"dbname='{conf['dbname']}'" \
                      f"user='{conf['user']}'" \
                      f"password='{conf['password']}'" \
                      f"sslmode='{conf['sslmode']}'" \
                      f"port='{conf['port']}'" \
                      f"application_name='LBSN Batch Transfer'"
        cursor_factory = None
        if dict_cursor:
            cursor_factory = DictCursor
        # get a connection, if a connect cannot be made an
        # exception will be raised here
        try:
            conn = psycopg2.connect(conn_string, cursor_factory=cursor_factory)
        except Exception as err:
            print(err)
            sys.exit()
        # activate dict to hstore conversion globally
        psycopg2.extras.register_hstore(conn, globally=True)
        # conn.cursor will return a cursor object,
        # this will be used to perform queries
        cursor = conn.cursor()
        dnow = datetime.datetime.now()
        LOG.info(f'{dnow.strftime("%Y-%m-%d %H:%M:%S")} '
                 f'- Connected to {self.dbname}')
        return conn, cursor

Methods

def connect(self, dict_cursor=None)

Database config.

Expand source code
def connect(self, dict_cursor=None):
    """Database config. """
    conf = {
        "host": self.serveraddress,
        "dbname": self.dbname,
        "user": self.user,
        "sslmode": self.sslmode,
        "port": self.port
    }
    # Connect to database
    if self.password == 0:
        promt_txt = f"Enter the password for {conf['user']}: "
        if sys.stdin.isatty():
            conf["password"] = getpass.getpass(prompt=promt_txt)
        else:
            print(promt_txt)
            conf["password"] = sys.stdin.readline().rstrip()
    else:
        conf["password"] = self.password
    # define connection string
    conn_string = f"host='{conf['host']}'" \
                  f"dbname='{conf['dbname']}'" \
                  f"user='{conf['user']}'" \
                  f"password='{conf['password']}'" \
                  f"sslmode='{conf['sslmode']}'" \
                  f"port='{conf['port']}'" \
                  f"application_name='LBSN Batch Transfer'"
    cursor_factory = None
    if dict_cursor:
        cursor_factory = DictCursor
    # get a connection, if a connect cannot be made an
    # exception will be raised here
    try:
        conn = psycopg2.connect(conn_string, cursor_factory=cursor_factory)
    except Exception as err:
        print(err)
        sys.exit()
    # activate dict to hstore conversion globally
    psycopg2.extras.register_hstore(conn, globally=True)
    # conn.cursor will return a cursor object,
    # this will be used to perform queries
    cursor = conn.cursor()
    dnow = datetime.datetime.now()
    LOG.info(f'{dnow.strftime("%Y-%m-%d %H:%M:%S")} '
             f'- Connected to {self.dbname}')
    return conn, cursor