Python - SQLServerCentral https://www.sqlservercentral.com The #1 SQL Server community Tue, 17 Feb 2026 19:11:21 +0000 en-GB hourly 1 https://wordpress.org/?v=6.8.1 Python is good, but not perfect – here are 10 reasons to avoid it https://www.sqlservercentral.com/articles/python-is-good-but-not-perfect-here-are-10-reasons-to-avoid-it Mon, 02 Mar 2026 00:00:38 +0000 https://www.sqlservercentral.com/?post_type=ssc_article&p=4736363 Four years ago I wrote a blog on this site explaining why Python is better than C# and, arguably, most other programming languages. To redress the balance, here are 10 reasons why you might want to avoid getting caught up in Python’s oh-so-tempting coils – particularly when building large, long-lived systems.

The post Python is good, but not perfect – here are 10 reasons to avoid it appeared first on SQLServerCentral.

]]>
The post Python is good, but not perfect – here are 10 reasons to avoid it appeared first on SQLServerCentral.

]]>
Automating Database Cleanup for PostgreSQL Using Python https://www.sqlservercentral.com/articles/automating-database-cleanup-for-postgresql-using-python Fri, 30 Jan 2026 00:00:54 +0000 https://www.sqlservercentral.com/?post_type=ssc_article&p=4700770 This article shows a technique for cleaning out older data from PostgreSQL tables using Python to follow a set of rules you create.

The post Automating Database Cleanup for PostgreSQL Using Python appeared first on SQLServerCentral.

]]>
Introduction

In a database, tables keep growing continuously due to repeated insertions. It might occur that many data which are old or irrelevant is not needed to be persisted into the table anymore. Old records, expired sessions, soft-deleted rows, audit logs, and orphaned data accumulate over time and slowly degrade performance. Manual cleanup can lead us to erroneous scenarios and is also hard to audit, and often forgotten to be run on time.

If we try to understand the same with an example, let us consider that we store application logs into a logging table. Now, all the application logs will keep getting appended into the table which will make the volume of data in this table huge. However, in most cases, application logs older than 90 days (approx. 3 months) would not be needed anymore. So, if we have a data cleanup strategy here, we can actually keep the table light weight and clean of any unnecessary data.

Objective

In this article, we will slowly build a python script to clean up old and unnecessary data from the database using python scripts for PostgreSQL. We will cover:

  • What data should be cleaned and why
  • Safe cleanup strategies
  • Transaction-aware Python scripts
  • Performance and safety best practices

Why database clean up is necessary

If the database tables are not cleaned up from time to time, we may run into any of the following problems:

  • Slower queries due to table and index bloating
  • Increased storage costs
  • Longer backups and restores
  • Autovacuum struggling to keep up
  • Compliance issues (retaining data longer than allowed)

Let's take a look into what kind of data can be accepted for clean up with some examples:

  • Time based data (for ex. older than 90 days)
    • These data are typically such data which are too old and is not needed any more. For example, suppose an application stores all the logs in an app_logs table. Now, all logs older than 90 days may not be needed to be retained any more and can be deleted. Such data qualify as time based data.
  • Soft deleted data (for ex. data against which is_deleted column is marked as true)
    • These data are typically those which are marked as not needed any more and can be removed right away. For example, there is a table called users where all the registered users are enlisted. Now, if a user deletes their account/unsubscribes, then the column against that user entry is saved as deleted=true. Such records can be deleted since they have deleted their account. These data need no specific time delay for them to be qualified for clean up.
  • Orphan records (for ex. child records with no parent records, or as we say, dangling records)
    • These records are those which somehow are not linked to any parent record. For example, there are 2 tables called teacher and department. Now, every department has atleast one teacher assigned to it. In case, there is any record in department table without any teacher assigned to it, it means that the department is no longer relevant and can be removed since it is not taught anymore.
  • Temporary data (for ex. session records or cache records which was created temporarily and is not needed anymore)
    • These are those data which are to be stored for very short time and can be deleted right after. For example, we are storing session information in a table called sessions table. Now, a session might be valid for max 15 minutes after which it should be timed out mandating the user to create a new session again. So, such session table data can be cleaned up after 15 minutes. This is also a time based data but since this is for a very short duration, we can also call this as temporary data as this stays for a very short duration and can be cleaned up very soon.
  • Failed transactions (for ex. records which failed were stored/logged into a table which later got reprocessed but has not been removed from this table)
    • Those records which were stored for some review later and now has been processed and won't be needed any more. For example, suppose we have a table called successful_transactions where successfully processed payment transaction details are stored and also a table called failed_transactions where records which fail duirng processing are stored. The records from failed_transactions table will be picked up again and processed later on during next run and if successful, they will be inserted inton successful_transactions table. In such case, the failed transactions which got reprocessed yet remains on the failed_transactions table. Such records can be cleaned up as soon as they got processed in the next run.

These are just a few examples to help you understand how in different ways can we identify records which qualify for clean up time to time in our database.

Clean up strategy

Before we write any delete script, we should define some rules or create a strategy to make sure that we have the intended results. We can broadly categorize these into 2 different rules: retention rules and safety rules.

Now, by rules, we want to emphasize on the importance of having these before we start our cleanup operation. These rules are basically a plan that helps us to ensure that the correct data is identified for clean up and they are deleted in the right order and right way. The identification of the correct data will be discussed in the retention rules and the execution will be discussed in the safety rules.

Let us understand with an example here. For example, we are dealing with the database of an online shopping portal where orders are being placed, payments are being made. Users are logging into the portal, placing orders, making payments. The application is also catching all logs in the database. Assuming that the organization does not needs to have any logs older than 90 days and any order older than 7 years and any session information older than 24 hours, we can have a retention rule planned as follows:

  • Delete all orders older than 7 years.
    • In the orders table, identify all orders which were placed more than 7 years ago
  • Delete all session information older than 24 hours.
    • In the session table, identify all sessions created before 24 hours
  • Delete all log information older than 90 days.
    • In the app_logs table, identify all log entries which were captured older than 90 days

This is just an illustration to help you understand how we decide on identifying data that needs to be cleaned up based on the retention policy/requirements.

Once the clean up data has been identified, we also need to have a safety rule plan in place to ensure that we have proper plans to rollback and ensure that we have proper logs of this entire cleanup process to address any future discrepancies. A typical safety rule should look something like this:

  • Try to delete in batches. This way huge records won't be deleted at once and will ensure that we don't face table lock issues
  • Try to keep a support for dry-run. This way we can take a look into what will happen actually when we run in in production mode
  • Definitely use transactions so that any issues faced while execution will ensure proper rollback rather than partial commits
  • Try to ensure having proper logs while running the automation cleanup tool so that all that's happening is captured in details

Again, the above is for illustration purposes so that we know before we begin what steps we need to take to ensure we don't run into errors and still if we do, how can we recover from there.

PostgreSQL clean up design approach

We will be taking the following approach for PostgreSQL database clean up:

  1. Connect to the database
  2. Identify the stale rows
  3. Dry-run or Delete the rows
  4. Commit the transaction
  5. Log and Exit

Each of these steps if covered below.

Python setup

Before we start with the creation of the automation script, let us frist prepare our environment for the development. We need to have the following python library installed:

pip install psycopg2-binary python-dotenv

Let's understand why these libraries are needed:

  • psycopg2 -> To connect to PostgreSQL database driver
  • dotenv -> Secure credential loading
  • logging -> Audit trail capture

We will create a simple python project in the following folder structure. Here, cleanup-tool is our basic folder where we will be keeping 2 files - .env file which will store the basic configuration information in a centralized place so that it is easy to maintain later. The other file is cleanup.py which is the python script which will contain the code for automating the database cleanup.

In case you've some other folder structure in mind, you can proceed accordingly but ensure that our cleanup.py and .env file lies on the same hierarchy. In case you create different folder structure, make sure to refer the files accordingly using proper folder structure.

Next, we will start with creating the .env file first. Inside the file, we will have the following contents:

DB_HOST=localhost
DB_PORT=5432
DB_NAME=localdb
DB_USER=sabyasachimukherjee
DB_PASSWORD=

Make sure to update the credential values with your own relevant data. This .env file will be used to establish connection with the local PostgreSQL database later.

Next, we will create the actual python script to clean up the database (named it as cleanup.py for us). We will look into the code explanation first and then use the code snippet to proceed further.

Explanation

First, we will understand the libraries being imported and their use:

  • psycopg2 - Establish PostgreSQL database connection
  • os - Read environment variables
  • logging - Structured logs and audit trailing
  • argparse - Read CLI arguments
  • datetime - Calculate retention cutoff
  • dotenv - Load .env file configuration

Next, we will try to load the configuration values from the .env file using the load_dotenv() method:

  • Loads the .env file entries
  • Injects the variables into os.environ

Next, in the get_connection() method, we are trying to create the database connection using the configuration values:

  • Creates a PostgreSQL connection
  • Uses entries as per .env file
  • Gives a centralized DB access by making connection reusable across other scripts too

Next, the count_old_logs() method will help us count the number of entries which qualify as old logs as per our retention policy:

  • This helps in counting the old records and creating a cutoff date.
  • If today is December 16 and retention_days is 90 days, then cutoff values is September 17.
  • This code gives a visibility or a sneak peek before actual delete happens.
  • It is very useful when running the script in dry run mode.
  • It helps the user to validate the retention logic, if it's getting calculated properly or not.

Next, in the delete_old_logs method, we are running with default set to dry-run mode. The methods looks like this:

  • This is the batched delete logic where records are deleted in batches
  • Parameters are as follows:
    • conn - Database connection
    • retention_days - Number of days data needs to be retained
    • batch_sixe - Rows per transaction
    • dry_run - Safety flag
  • Deletes data in small chunks
  • We are using LIMIT here to avoid deleteing all data at once and thereby eliminating chances of table lock
  • ORDER BY created_at is being used to delete the oldest data first
  • RETURNING id is being used to know how many rows were deleted
  • For dry_run mode, DELETE is executed but the transaction is rolled back
  • Its hows the impact of the delete in real time but actually doesn't contributes for the impact
  • No data is deleted in actual
  • For no-dry-run mode, this is real delete mode where data is deleted
  • The transaction is committed
  • The program moves on to the next batch

The Code

Following is the python code snippet which automates data clean up depending on the retention policy:

#!/usr/bin/env python3

import psycopg2
import os
import logging
import argparse
from datetime import datetime, timedelta
from dotenv import load_dotenv

# --------------------------------------------------
# Load environment variables
# --------------------------------------------------
load_dotenv()

# --------------------------------------------------
# Logging configuration
# --------------------------------------------------
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)

# --------------------------------------------------
# Database connection
# --------------------------------------------------
def get_connection():
    return psycopg2.connect(
        host=os.getenv("DB_HOST"),
        port=os.getenv("DB_PORT"),
        dbname=os.getenv("DB_NAME"),
        user=os.getenv("DB_USER"),
        password=os.getenv("DB_PASSWORD"),
    )

# --------------------------------------------------
# Count old records (dry-run visibility)
# --------------------------------------------------
def count_old_logs(conn, retention_days):
    cutoff = datetime.utcnow() - timedelta(days=retention_days)
    with conn.cursor() as cur:
        cur.execute(
            "SELECT COUNT(*) FROM app_logs WHERE created_at < %s",
            (cutoff,)
        )
        return cur.fetchone()[0]

# --------------------------------------------------
# Batched delete function
# --------------------------------------------------
def delete_old_logs(
    conn,
    retention_days=90,
    batch_size=1000,
    dry_run=True
):
    cutoff = datetime.utcnow() - timedelta(days=retention_days)
    total_deleted = 0

    while True:
        with conn.cursor() as cur:
            cur.execute("""
                DELETE FROM app_logs
                WHERE id IN (
                    SELECT id
                    FROM app_logs
                    WHERE created_at < %s
                    ORDER BY created_at
                    LIMIT %s
                )
                RETURNING id;
            """, (cutoff, batch_size))

            rows = cur.fetchall()
            batch_count = len(rows)

        if batch_count == 0:
            break

        total_deleted += batch_count

        if dry_run:
            conn.rollback()
            logging.info(
                "DRY-RUN: Would delete %d rows (batch)",
                batch_count
            )
            break
        else:
            conn.commit()
            logging.info(
                "Deleted %d rows (batch)",
                batch_count
            )

    logging.info(
        "Cleanup finished. Total rows affected: %d",
        total_deleted
    )

# --------------------------------------------------
# Argument parser
# --------------------------------------------------
def parse_args():
    parser = argparse.ArgumentParser(
        description="Cleanup old application logs"
    )

    parser.add_argument(
        "--retention-days",
        type=int,
        default=90,
        help="Delete logs older than N days (default: 90)"
    )

    parser.add_argument(
        "--batch-size",
        type=int,
        default=500,
        help="Number of rows deleted per batch (default: 500)"
    )

    parser.add_argument(
        "--dry-run",
        action="proxy.php?url=store_true",
        default=True,
        help="Run in dry-run mode (default)"
    )

    parser.add_argument(
        "--no-dry-run",
        dest="dry_run",
        action="proxy.php?url=store_false",
        help="Actually delete records"
    )

    return parser.parse_args()

# --------------------------------------------------
# Main runner
# --------------------------------------------------
def main():
    args = parse_args()

    logging.info(
        "Starting cleanup | retention_days=%d | batch_size=%d | dry_run=%s",
        args.retention_days,
        args.batch_size,
        args.dry_run
    )

    conn = get_connection()

    try:
        count = count_old_logs(conn, args.retention_days)
        logging.info(
            "Found %d log records older than %d days",
            count,
            args.retention_days
        )

        delete_old_logs(
            conn,
            retention_days=args.retention_days,
            batch_size=args.batch_size,
            dry_run=args.dry_run
        )

    finally:
        conn.close()
        logging.info("Database connection closed")

# --------------------------------------------------
if __name__ == "__main__":
    main()

Data Creation

Before we proceed with running the script, it is obvious that we need to complete a set of data to archive. To do this, we will create a table first and then insert data into it. To create the table, we will use the following script:

DROP TABLE IF EXISTS app_logs;

CREATE TABLE app_logs (
    id          BIGSERIAL PRIMARY KEY,
    message     TEXT NOT NULL,
    created_at  TIMESTAMP NOT NULL DEFAULT NOW()
);

Next, we will add few necessary indices to make sure that the search operation on the table is not performance impacted:

CREATE INDEX idx_app_logs_created_at
ON app_logs (created_at);

Next, we will insert multiple records to qualify for different conditions when we try to clean up the database:

INSERT INTO app_logs (message, created_at) VALUES
('Log 180 days old', NOW() - INTERVAL '180 days'),
('Log 150 days old', NOW() - INTERVAL '150 days'),
('Log 120 days old', NOW() - INTERVAL '120 days'),
('Log 95 days old',  NOW() - INTERVAL '95 days'),
('Log 90 days old',  NOW() - INTERVAL '90 days'),
('Log 60 days old',  NOW() - INTERVAL '60 days'),
('Log 30 days old',  NOW() - INTERVAL '30 days'),
('Log 7 days old',   NOW() - INTERVAL '7 days'),
('Log 1 day old',    NOW() - INTERVAL '1 day'),
('Log today',        NOW());

We can also insert multiple bulk data for heavy testing using the following script (optional):

INSERT INTO app_logs (message, created_at)
SELECT
    'Auto log ' || g,
    NOW() - (g || ' days')::INTERVAL
FROM generate_series(1, 365) AS g;

Now, after we have done all this, we will first test the data in database using PGAdmin:

SELECT*
FROM app_logs
WHERE created_at < NOW() - INTERVAL '90 days';

You should see a following output:

Now, these 5 records are beyond the retention policy and should ideally be deleted by our script (if run in no-dry-run mode).

Note: However, when we run the script, only 4 records will be identified but not the last record since that falls on the boundary condition. Remember, we want to delete data which are older than 90 days.

Execution of the Script

To execute the automated script, just run the following command from the location where you have the cleanup script saved:

python cleanup.py

Once we run the above command, we should see the following logs in the terminal:

Note that in the above command, we didn't mention about the dry-run condition and the script defaulted to dry-run to make sure that permanent delete does not happens by mistake. If we head over to PGAdmin and again run the above query, we should see the same output.

Next, we will try to run in dry-run mode but see if we can identify logs older than 95 days (in case we need to clean up database with custom date range). For the same, we need to run the following command with necessary arguments:

python cleanup.py --retention-days 95 --batch-size 2 --dry-run

So, we are mentioning the retention_days here to 95 and batch-size to 3 and running in dry-run mode.

So if you take a close look at the above script output, you can see that total records identified were 3 but since the batch size was down to 2, only 2 records will be deleted in each batch (if run in no-dry-run mode). This is how we can ensure that we don't run into database lock issues.

Finally, if we want to run the above command in production mode (no-dry-run mode), then we can run the following command:

python cleanup.py --retention-days 95 --batch-size 2 --no-dry-run

Next, if we check the logs, we can see the following output:

If you take a look at the logs above, you can see that 2 batches have run, the first with 2 rows deleted and the next with 1 row deleted.

Safety measures

  • Always use proper logging to make sure that logs are captured for records being identified for deletion
  • Keep the default run mode to dry-run mode
  • Make sure to use index on the table which needs to be cleaned up so that searching during deletion is faster
  • Try to run the script at night hours (when expected load is pretty low)
  • Always try using batching method to prevent database locks

Conclusion

We finally learnt on the importance of database cleanup and how we can do it very efficiently using automation script (in python). We also walked through different precautionary measures we should take to make sure that appropriate data is identified and cleaned up.

The post Automating Database Cleanup for PostgreSQL Using Python appeared first on SQLServerCentral.

]]>
Python in Action to Auto-Generate an Optimized PostgreSQL Index Strategy https://www.sqlservercentral.com/articles/python-in-action-to-auto-generate-an-optimized-postgresql-index-strategy Wed, 21 Jan 2026 00:00:26 +0000 https://www.sqlservercentral.com/?post_type=ssc_article&p=4695540 We have a script that can be used to help tune ind...

The post Python in Action to Auto-Generate an Optimized PostgreSQL Index Strategy appeared first on SQLServerCentral.

]]>
Introduction

By now, we all know the fact that indexes are one of the most powerful tools for speeding up database queries. However, designing the right index is actually an art.

In this article, we will learn on how to create a Python script to inspect slow queries and suggest us (or optionally create) optimized PostgreSQL indexes. We will analyze slow queries, propose index definitions using simple heuristics (EXPLAIN and ANALYZE), and create (or suggest) indexes that can be run in the actual environment without much risk.

Since this is just a learning exercise, we will keep it simple, practical, and safe.

Advantages of Automatic Index Suggestions

First, let us try to understand why we need and how we benefit from having a utility that helps suggest indexes. Normally, developers often add indexes on an ad-hoc basis. As a result, they leave behind redundant or ineffective indexes.

Assume that initially we decided to have a query to fetch some records. To make it optimize performance, we introduced various indexes that are based on the column combinations being used in the filtering conditions. Now, after code review, we decided to change the query and, accordingly, we created a few more indexes to help this new query. We are leaving behind the earlier indexes we created that may not be relevant anymore. With the evolution of our queries, indexes keep changing and should be monitored for updates or removal. In such a situation, having a tool that observes slow and frequent queries and suggests to us useful indexes, benefits us in avoiding guesswork and over-indexing.

We will slowly move on to create a simple tool that focuses on the WHERE clause, JOIN conditions, ORDER BY, and sometimes GROUP BY, since these are the main areas where indexes help us the most to improve query performance.

Inner Workings of the Tool

Let's shortly have a high level discussion on how the tool will work that we will be creating next.

  • Collect candidate queries to analyze:
    • Queries will be collected from pg_stat_statements (we recommend so), or
    • From a log file of slow queries
  • For each query collected:
    • Run EXPLAIN (ANALYZE, BUFFERS) to. see if sequential scans are used or if the cost is high,
    • Parse such queries to extract the columns used in WHERE, JOIN or ORDER BY clauses,
    • Generate one or more index suggestion(s) (single column, multi column or similar)
  • Optionally:
    • Create the index CONCURRENTLY
    • Re-run the EXPLAIN (ANALYZE) and report the improvement achieved
  • Finally, output a human readable report and SQL snippets.

Python Code Snippet

A few requirements to get started:

  • Python version 3.8+
  • pip install psycopg2-binary sqlparse

Before we start with the python script provided below, let us also understand in brief what the code snippet does. Once you understand it, you can copy it to a file and save it with any name of your choice with extension as .py. We will name our file as auto_indexer.py.

Here is the flow for the tool:

  • Import statements provided that would be needed for the tool functionality:
    • subprocess - runs psql commands
    • csv - parses csv returned by psql --csv commands
    • argparse - CLI arguments
    • re - regex for parsing SQL
    • typing - type hints
  • DB_CFG - configures the parameters needed to connect to the local database
  • def psql_conn_str(cfg): - Creates the postgreSQL connection string
  • def run_psql_sql(sql_text, cfg, csv_mode=False) - runs the SQL query using the psql -c command.
  • def queries_from_pg_statstatements(...) - get queries from the pg_stat_statements:
    • Runs SELECT query, calls, total_time FROM pg_stat_statements using CSV mode
    • Filters out BEGIN, vacuum, BEGIN queries
    • Sort by total_time DESC
    • Returns list of  (query_text, calls, total_time).
  • def queries_from_file(path) - Read queries from the file:
    • Tries sqlParse.split() for correct SQL splitting
    • Falls back to simple ; operator
    • Returns (query, 1, 0.0) for each query
  • def run_explain_via_psql(query, cfg) - Runs EXPLAIN via psql:
    • Wraps query inside EXPLAIN(ANALYZE, BUFFERS)
    • Use psql to execute EXPLAIN
    • Returns raw text plan as string
  • def extract_columns_from_where_and_joins(query) - Extract Columns (WHERE/JOIN/ORDER BY):
    • Looks into WHERE, JOIN ON, ORDER BY clauses
    • Extracts tokens like table.col or "col"
    • Strips table prefixes and returns distinct list of column names
  • def suggest_indexes(query, cols) - Suggest Indexes:
    • Single column indexes - for each detected column
    • Multi-column indexes - using first row columns
    • Each suggestion includes - index name, type (single/multiple), columns, SQL string, reason.
  • def detect_primary_table(query) - detect the primary table
    • Searches for: FROM table, INTO table, UPDATE table
    • Extracts the table name removing the schema, if present
  • def analyze_and_propose(query, apply_indexes=False, dry_run=True) - Analyze query and propose indexes:
    • Prints the query
    • Runs the EXPLAIN before index creation
    • Extract the columns and builds index suggestions
    • Replace table placeholder in SQL
    • Prints the suggested indexes
    • Optionally apply indexes using psql
    • Optionally show EXPLAIN after indexes are created

Finally, comes the CLI main function

import os
import re
import csv
import shlex
import time
import argparse
import subprocess
from typing import List, Tuple

# ---------------------------
# Database / psql configuration
# ---------------------------
DB_CFG = {
    "host": "localhost",
    "port": 5432,
    "dbname": "localdb",
    "user": "sabyasachimukherjee",
    "password": "",
}

def psql_conn_str(cfg: dict) -> str:
    """Return a libpq connection string for use with psql -c."""
    parts = []
    for k in ("host", "port", "dbname", "user"):
        v = cfg.get(k)
        if v:
            parts.append(f"{k}={v}")
    return " ".join(parts)

def run_psql_sql(sql_text: str, cfg: dict, csv_mode: bool = False, timeout: int = 30) -> str:
    """
    Run a SQL statement via psql and return stdout. If csv_mode=True, use --csv.
    """
    conn = psql_conn_str(cfg)
    cmd = ["psql", "--no-align", "--pset", "format=unaligned"]

    if csv_mode:
        cmd = ["psql", "--csv"]

    cmd += ["-c", sql_text, conn]

    env = os.environ.copy()
    if cfg.get("password"):
        env["PGPASSWORD"] = cfg["password"]

    proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                          env=env, text=True, timeout=timeout)

    if proc.returncode != 0:
        raise RuntimeError(f"psql failed: {proc.stderr.strip()}")

    return proc.stdout

# -------------------------------------------------------------------
# AUTO-DETECT pg_stat_statements column names for all PG versions
# -------------------------------------------------------------------

def detect_pgstat_columns() -> dict:
    """
    Detect correct column names for pg_stat_statements
    depending on PostgreSQL version.
    Works for PG 10–16+.
    """
    q = """
    SELECT column_name
    FROM information_schema.columns
    WHERE table_name = 'pg_stat_statements';
    """

    out = run_psql_sql(q, DB_CFG, csv_mode=True)
    rows = list(csv.DictReader(out.splitlines()))
    cols = {r["column_name"] for r in rows}

    mapping = {}

    # Calls always exists
    mapping["calls"] = "calls"

    # total execution time changed in PG 13+
    if "total_exec_time" in cols:
        mapping["total"] = "total_exec_time"
        mapping["mean"] = "mean_exec_time"
    else:
        mapping["total"] = "total_time"
        mapping["mean"] = "mean_time"

    # query column also changed (query vs queryid sometimes)
    if "query" in cols:
        mapping["query"] = "query"
    elif "queryid" in cols:
        mapping["query"] = "queryid"
    else:
        mapping["query"] = "query"

    return mapping


# ---------------------------
# Query collection
# ---------------------------

def queries_from_pg_statstatements(limit: int = 20) -> List[Tuple[str, int, float]]:
    cols = detect_pgstat_columns()

    q = f"""
    SELECT
        {cols['query']} AS query,
        {cols['calls']}::bigint AS calls,
        {cols['total']}::double precision AS total_time
    FROM pg_stat_statements
    WHERE query NOT ILIKE 'EXPLAIN %'
      AND query NOT ILIKE 'vacuum%'
      AND query NOT ILIKE 'BEGIN%'
    ORDER BY {cols['total']} DESC
    LIMIT {int(limit)};
    """

    out = run_psql_sql(q, DB_CFG, csv_mode=True)
    rows = []
    reader = csv.DictReader(out.splitlines())
    for r in reader:
        rows.append((r["query"], int(r["calls"]), float(r["total_time"])))
    return rows


def queries_from_file(path: str, limit: int = 100) -> List[Tuple[str, int, float]]:
    try:
        import sqlparse
    except Exception:
        sqlparse = None

    text = open(path, "r", encoding="utf-8").read()

    if sqlparse:
        statements = sqlparse.split(text)
    else:
        statements = 

    return [(s.strip(), 1, 0.0) for s in statements][:limit]


# ---------------------------
# EXPLAIN
# ---------------------------

def run_explain_via_psql(query: str, cfg: dict, timeout: int = 60) -> str:
    wrapped = "EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT) " + query
    return run_psql_sql(wrapped, cfg, csv_mode=False, timeout=timeout).strip()


# ---------------------------
# Column extraction
# ---------------------------

def extract_columns_from_where_and_joins(query: str) -> List[str]:
    ql = query.lower()
    candidates = set()

    # WHERE extraction
    for m in re.findall(r'\bwhere\b(.*?)(\bgroup|\border|\blimit|$)', ql, flags=re.S):
        part = m[0]
        for c in re.findall(r'([A-Za-z_][A-Za-z0-9_\."]*)\s*(=|>|<|>=|<=|like|\bin\b|\silike)', part):
            col = c[0].split('.')[-1].replace('"', '')
            candidates.add(col)

    # JOIN extraction
    for m in re.findall(r'\bon\b(.*?)(\bwhere\b|\bjoin\b|\bgroup\b|\border\b|\blimit|$)', ql, flags=re.S):
        part = m[0]
        for c in re.findall(r'([A-Za-z_][A-Za-z0-9_\."]*)\s*=', part):
            col = c.split('.')[-1].replace('"', '')
            candidates.add(col)

    # ORDER BY
    for m in re.findall(r'\border by\b(.*?)(\blimit\b|$)', ql, flags=re.S):
        part = m[0]
        for c in re.findall(r'([A-Za-z_][A-Za-z0-9_\."]*)', part):
            col = c.split('.')[-1].replace('"', '')
            candidates.add(col)

    return list(candidates)


# ---------------------------
# Index suggestion
# ---------------------------

def sanitize(name: str) -> str:
    return re.sub(r'\W+', '_', name)

def quote_ident(name: str) -> str:
    if re.match(r'^[a-z_][a-z0-9_]*$', name):
        return name
    return '"' + name.replace('"', '""') + '"'

def suggest_indexes(query: str, cols: List[str] = None) -> List[dict]:
    if cols is None:
        cols = extract_columns_from_where_and_joins(query)

    suggestions = []

    # Single-column indexes
    for c in cols:
        idx = f"idx_auto_{sanitize(c)}"
        sql_text = f"CREATE INDEX CONCURRENTLY IF NOT EXISTS {idx} ON <table> ({quote_ident(c)});"
        suggestions.append({
            "name": idx,
            "type": "single",
            "columns": [c],
            "sql": sql_text,
            "reason": f"{c} appears in WHERE/JOIN/ORDER BY"
        })

    # Multi-column index from first two
    if len(cols) >= 2:
        first_two = cols[:2]
        idx = "idx_auto_" + "_".join(sanitize(x) for x in first_two)
        sql_text = (
            f"CREATE INDEX CONCURRENTLY IF NOT EXISTS {idx} ON <table> "
            f"({', '.join(quote_ident(x) for x in first_two)});"
        )
        suggestions.append({
            "name": idx,
            "type": "multi",
            "columns": first_two,
            "sql": sql_text,
            "reason": "multi-column index for combined filters"
        })

    return suggestions


def detect_primary_table(query: str) -> str:
    for kw in ("from", "into", "update"):
        m = re.search(rf'\b{kw}\s+([A-Za-z_][A-Za-z0-9_\."]*)', query, flags=re.I)
        if m:
            t = m.group(1)
            if "." in t:
                t = t.split(".")[-1]
            return t.replace('"', '')
    return None


# ---------------------------
# Analysis
# ---------------------------

def analyze_and_propose(query: str, apply_indexes: bool = False, dry_run: bool = True):
    table = detect_primary_table(query) or "<table>"

    print("=" * 72)
    print("Query:")
    print(query)
    print("-" * 72)

    try:
        explain_text = run_explain_via_psql(query, DB_CFG)
        print("EXPLAIN (before):")
        print(explain_text)
    except Exception as ex:
        print("EXPLAIN failed:", ex)
        explain_text = None

    cols = extract_columns_from_where_and_joins(query)
    print("Detected columns:", cols)

    suggestions = suggest_indexes(query, cols)

    # Replace table placeholder
    for s in suggestions:
        s["sql"] = s["sql"].replace("<table>", quote_ident(table))

    print("\nIndex suggestions:")
    for s in suggestions:
        print("-", s["type"], s["columns"], "->", s["sql"])
        print("  reason:", s["reason"])

    if apply_indexes and not dry_run:
        for s in suggestions:
            print("Applying:", s["name"])
            try:
                run_psql_sql(s["sql"], DB_CFG)
                print("OK:", s["name"])
            except Exception as e:
                print("Failed:", e)

        if explain_text:
            time.sleep(0.7)
            print("\nEXPLAIN (after):")
            print(run_explain_via_psql(query, DB_CFG))


# ---------------------------
# CLI
# ---------------------------

def main():
    p = argparse.ArgumentParser(prog="auto-index-psql")
    p.add_argument("--source", choices=["pgstat", "file"], default="pgstat")
    p.add_argument("--file", help="SQL file if source=file")
    p.add_argument("--limit", type=int, default=10)
    p.add_argument("--apply", action="proxy.php?url=store_true")
    p.add_argument("--dry-run", action="proxy.php?url=store_true", default=True)
    args = p.parse_args()

    # Check psql availability
    try:
        subprocess.run(["psql", "--version"], stdout=subprocess.DEVNULL,
                       stderr=subprocess.DEVNULL, check=True)
    except Exception:
        print("psql not found on PATH.")
        return

    if args.source == "pgstat":
        rows = queries_from_pg_statstatements(limit=args.limit)
    else:
        if not args.file:
            print("Provide --file when source=file")
            return
        rows = queries_from_file(args.file, limit=args.limit)

    for query, calls, total_time in rows:
        analyze_and_propose(query, apply_indexes=args.apply, dry_run=args.dry_run)


if __name__ == "__main__":
    main()

Pre-requisite for table creation, data insertion and tool execution

Let us create the tables and insert bulk data into it for the queries to actually take time in fetching data from the tables:

-- sample_data.sql
CREATE TABLE IF NOT EXISTS customers (
  id SERIAL PRIMARY KEY,
  name TEXT
);

CREATE TABLE IF NOT EXISTS orders (
  id SERIAL PRIMARY KEY,
  customer_id INTEGER REFERENCES customers(id),
  status TEXT,
  created_at TIMESTAMP WITHOUT TIME ZONE,
  amount NUMERIC
);

-- populate small amounts for demo. For real testing populate millions.
INSERT INTO customers (name) SELECT 'Customer ' || g FROM generate_series(1,100) g;
INSERT INTO orders (customer_id, status, created_at, amount)
SELECT floor(random()*100)::int + 1,
       (array['new','shipped','canceled','returned'])[ (random()*3 + 1)::int ],
       getdate() - (random()*10000)::int * interval '1 second',
       random()*1000
FROM generate_series(1, 50000);

Once the above script is run, it will insert 100 records into customers table and 50,00 records into the orders table.

Next, we will run the following query a coup,e of times in our PostgreSQL query tool (you can use PGAdmin here) to make sure that the query takes time and also the same comes up in pg_stat_statements when the tool is run.

SELECT TOP 50 id, customer_id, status, amount
FROM orders
WHERE customer_id = 42 AND status = 'shipped'
ORDER BY created_at DESC

Next, we will now run the auto_indexer.py tool script to see if the index needed here is suggested to us. To run the same, run the following command from the terminal opened from the location where you saved the python script.

Once the script is run successfully, you should see the details of the query that was run along with the EXPLAIN of the query before and after the index creation as follows:

We can see the index suggestion against the query that was run and accordingly have the indexes created so that the query execution becomes faster and performance is improved in real time when the database will have real time data loaded to it.

Rules, Heuristics used, and Their Limitations

The basic index design rules used in our utility tool are:

  1. Single column index: helpful when that column is frequently used in equality filters or joins.
  2. Multi-column index: when queries filter on more than one column together; leftmost column(s) matter. If queries often filter by customer_id and then status, then (customer_id, status) helps.
  3. Order By: if your query does WHERE customer_id = X ORDER BY created_at DESC LIMIT N, an index on (customer_id, created_at DESC) avoids sorting and is ideal.
  4. Creating index concurrently: builds index without locking writes (but it is slower). Use CONCURRENTLY in production environment.

Heuristics limits apply to:

  1. The script uses simple regex extraction to find candidate columns — it can miss columns or misidentify them for complex SQL (subqueries, functions, expressions).
  2. It does not analyze selectivity (how many distinct values) or table size when proposing index order; in practice you should choose the most selective column first.

Conclusion

We have seen on how we can have a utility tool ( the Python script here) to read the frequently executed queries in our database, accordingly parse them, and suggest us relatable indexes on necessary tables to help our queries run in a more optimized fashion. This improves performance and helps us avoid unnecessarily creating redundant indexes on the database.

The post Python in Action to Auto-Generate an Optimized PostgreSQL Index Strategy appeared first on SQLServerCentral.

]]>
How to Build an AI-Powered T-SQL Assistant with Python & SQL Server https://www.sqlservercentral.com/articles/how-to-build-an-ai-powered-t-sql-assistant-with-python-sql-server Wed, 19 Nov 2025 00:00:12 +0000 https://www.sqlservercentral.com/?post_type=ssc_article&p=4680934 If you’re a SQL Server DBA or developer looking to harness AI for your everyday scripting workflows, this article will walk you through building an AI-powered T-SQL assistant using Python and SQL Server.

The post How to Build an AI-Powered T-SQL Assistant with Python & SQL Server appeared first on SQLServerCentral.

]]>
The post How to Build an AI-Powered T-SQL Assistant with Python & SQL Server appeared first on SQLServerCentral.

]]>
The DBScan algorithm tutorial https://www.sqlservercentral.com/articles/the-dbscan-algorithm-tutorial Mon, 17 Nov 2025 00:00:29 +0000 https://www.sqlservercentral.com/?post_type=ssc_article&p=4676438 Learn to use the dbscan algorithm in Python.

The post The DBScan algorithm tutorial appeared first on SQLServerCentral.

]]>
Introduction to the DBScan algorithm

In this example, we are going to learn how to use DBScan. DBScan is a clustering algorithm used to find patterns. We are going to use it to analyze SQL Server data and find patterns about customers. In this tutorial, we will create Python code and connect to SQL Server, and analyze data.

Requirements for the DBScan

  • SQL Server and SSMS installed.
  • Visual Studio Code or another Python code editor of your preference.
  • The AdventureworksDW2022 database installed.
  • PyODBC library (pip install pyodbc)
  • Pandas’ library (pip install pandas)
  • Scikit-learn library (pip install scikit-learn)
  • Matplotlib library (pip install matplotlib)

What is DBScan?

DBScan (Density-based spatial clustering of applications with noise) is a cluster algorithm based on density. It creates groups based on points that are closely packed. For this algorithm, you require the following information:

  • ε (epsilon) is the neighborhood radius. This parameter is used to decide the closing points.
  • MinPts is the minimum number of points within a radius.
  • Core points are points that have at least minPts neighbors.
  • Border points are within ε of some core points, but they are not close enough to the core.
  • Noise are points outside ε and excluded from clusters

DBSCAN Core points and minPts

Getting started with DBScan

First, we will analyze the vTargetmail view of the AdventureworksDW2022 database. This table contains information about customers and potential customers, like Title, MaritalStatus, BirthDate, Gender, etc. The column bike buyer shows 0 if it is a bike buyer and 1 if it is not a bike buyer.

Here you have a sample of the data:

Sample data for DBScan Sample of data

With the DBScan algorithm, we will group the customer information in DBScan clusters using Python.

The Python code

Here is the code. We will show the entire code first, and then we will explain each section.

First, in Visual Studio Code or any Python core editor, write the following code:

import pyodbc

import pandas as pd

from sklearn.preprocessing import StandardScaler

from sklearn.cluster import DBSCAN

import matplotlib.pyplot as plt



# Connection to SQL Server (Windows Authentication)

conn = pyodbc.connect(

  'DRIVER={ODBC Driver 17 for SQL Server};'

  'SERVER=localhost;'

  'DATABASE=AdventureWorksDW2019;'

  'Trusted_Connection=yes;'

  )



# Query relevant data

query = """

  SELECT

    [Age],

    [YearlyIncome],

    [TotalChildren],

    [BikeBuyer]

     FROM [dbo].[vTargetMail]

     WHERE 

[BikeBuyer] IS NOT NULL;

"""



df = pd.read_sql(query, conn)



conn.close()



# Inspect the data

print("First registers:")

print(df.head())



# Select the predictive characteristics

features = ['Age', 'YearlyIncome', 'TotalChildren']

X = df[features]



# Standarize the data (important for DBSCAN)

scaler = StandardScaler()

X_scaled = scaler.fit_transform(X)



# Apply DBSCAN

eps_value = 0.6

min_samples_value = 5

dbscan = DBSCAN(eps=eps_value, min_samples=min_samples_value)

df['Cluster'] = dbscan.fit_predict(X_scaled)



# Clusters summary and BikeBuyer proporcion

cluster_summary = df.groupby('Cluster')['BikeBuyer'].value_counts(normalize=True).unstack().fillna(0)

cluster_summary = cluster_summary.rename(columns={0: 'NoBuyer', 1: 'Buyer'})

print("\nProportion of BikeBuyer per Cluster:")

print(cluster_summary)



# Visualize clusters

plt.figure(figsize=(10, 6))

plt.scatter(df['Age'], df['YearlyIncome'], c=df['Cluster'], cmap='viridis', alpha=0.7)

plt.xlabel('Age')

plt.ylabel('YearlyIncome')

plt.title('Clusters of users - DBSCAN')

plt.colorbar(label='Cluster ID')

plt.show()



# Extra: quick summary per cluster



for cluster_id, group in df.groupby('Cluster'):

    buyer_pct = group['BikeBuyer'].mean() * 100

    print(f"Cluster {cluster_id}: {len(group)} users, {buyer_pct:.2f}0ought a bike")

In the next section, we will explain the code.

Libraries used

First, to connect to Python, you need the pyodbc library. Secondly, see the requirements if you did not install it. This library is used to connect to SQL Server. Also, we have the Pandas library, which is used to read the SQL Data in a DataFrame. It will read the data.

In addition, we have the scikit-learn – StandardScaler library, which is used to standardize the data. In the code, the year's income could dominate distance calculations. scikit-learn – DBSCAN uses the DBSCAN algorithm. This library is the most important one. Finally, we have matplotlib.pyplot library is used to create the charts.

import pyodbc

import pandas as pd

from sklearn.preprocessing import StandardScaler

from sklearn.cluster import DBSCAN

import matplotlib.pyplot as plt

Connection to the SQL Server database

The second part of the code will connect to the SQL server. First, we will connect using Windows Authentication ('Trusted_Connection=yes). Secondly, we will specify the driver. The DRIVER is used to specify the ODBC Driver used to connect to SQL Server. Server is used to specify the SQL Server name. Also, we have the Database. Write the database name.

Finally, we have a query to get the data from the vTargetMail view.

# Connection to SQL Server (Windows Authentication)

conn = pyodbc.connect(

'DRIVER={ODBC Driver 17 for SQL Server};'

'SERVER=localhost;'

'DATABASE=AdventureWorksDW2019;'

'Trusted_Connection=yes;'

)



# Query relevant data

query = """

SELECT

[Age],

[YearlyIncome],

[TotalChildren],

[BikeBuyer]

FROM

[dbo].[vTargetMail]

WHERE

[BikeBuyer] IS NOT NULL;

"""

Read and standardize the data for the DBScan

In this section, we read the code from the query and close the connection.

df = pd.read_sql(query, conn)

conn.close()

Secondly, we check a sample of the data.

# Inspect the data

print("First registers:")

print(df.head())

Thirdly, we select the input variables used in the model. X contains the dataframe with these values.

# Select the predictive characteristics

features = ['Age', 'YearlyIncome', 'TotalChildren']

X = df[features]

Also, we standardize the features. Standardization is used to put features on comparable scales. If one feature is bigger than others, the code puts them on comparable scales.

# Standardize the data (important for DBSCAN)

scaler = StandardScaler()

X_scaled = scaler.fit_transform(X)

Apply the DBSCAN

Now, we are going to use the DBScan algorithm. First, we need to use an eps_value. Secondly, we need the min_samples_value (MInPoints). As we explained before in the What is DBScan section, the Epsilon (eps_value) value is the neighborhood radius. It controls how far the algorithm looks to find the neighboring points.

The MinPoints or min_samples control the number of neighbors required for a cluster. Finally, it creates a cluster id for each cluster detected.

# Apply DBSCAN

eps_value = 0.4 # based on your k-distance

min_samples_value = 5

dbscan = DBSCAN(eps=eps_value, min_samples=min_samples_value)

df['Cluster'] = dbscan.fit_predict(X_scaled)

Cluster Summary for DBScan

In this section, you are determining which individuals are bike buyers and which are not. First, you group the data to remove the noise.

# Clusters summary and BikeBuyer proportions

cluster_summary = df.groupby('Cluster')['BikeBuyer'].value_counts(normalize=True).unstack().fillna(0)

Secondly, we are replacing the values 0 and 1 with the values of NoBuyer and Buyer.

cluster_summary = cluster_summary.rename(columns={0: 'NoBuyer', 1: 'Buyer'})

Then we are displaying the proportion per cluster.

print("\nProportion of BikeBuyer per Cluster:")

print(cluster_summary)

Visualize the DBScan clusters

Finally, we will create a scatter chart with the DBScan cluster information. First, we will plot and send the Age and Yearly Income from the data file, and create a file with the viridis colors. Alpha = 0.7 means that the chart will be 70 percent opaque and 30 percent transparent. Secondly, we will plot Age on the x-axis and Yearly Income on the y-axis. Xlabel and ylabel will name the axis.

# Visualize clusters

plt.figure(figsize=(10, 6))

plt.scatter(df['Age'], df['YearlyIncome'], c=df['Cluster'], cmap='viridis', alpha=0.7)

plt.xlabel('Age')

plt.ylabel('YearlyIncome')

plt.title('Clusters of users - DBSCAN')

plt.colorbar(label='Cluster ID')

plt.show()

Extra summary per DBScan cluster

Also, this summary shows the cluster and the percentage of buyers per cluster. It will provide the percentage of buyers and not buyers.

# Extra: quick summary per cluster

for cluster_id, group in df.groupby('Cluster'):

  buyer_pct = group['BikeBuyer'].mean() * 100

  print(f"Cluster {cluster_id}: {len(group)} users, {buyer_pct:.2f}0ought a bike")

Running the DBScan code

Run the code and you will see the results. First, you will see a sample of the data.

Secondly, you will have the clusters and the proportion of buyers and non-buyers per cluster.

DBScan resume Proportion of bike buyers with DBScan

In addition, you will have the scattered chart of the Yearly Income and the Age. The clusters 12 to 14 belong to the people with a high income of more than 140,000 USD. There is also a small portion of a cluster with an age of around 100 years old and a low income below 2,000 USD.

Cluster of user - DBSCAN DBSCAN Chart

As you can see, the clusters in this example are mixed; it is not clear.

Finally, we have some statistics of people who bought a bike per cluster.

Conclusion

DBScan is a cluster algorithm that can be used to create clusters of data to detect anomalies and group the data to find patterns.

For more information about DBScan refer to the following links:

The post The DBScan algorithm tutorial appeared first on SQLServerCentral.

]]> Implementing PostgreSQL with Python for Real-Time Logging and Monitoring https://www.sqlservercentral.com/articles/implementing-postgresql-with-python-for-real-time-logging-and-monitoring Wed, 12 Nov 2025 00:00:31 +0000 https://www.sqlservercentral.com/?post_type=ssc_article&p=4664306 Learn how you can create a logging module in Python that can be used to insert real-time records in a PostgreSQL database and display them on a dashboard.

The post Implementing PostgreSQL with Python for Real-Time Logging and Monitoring appeared first on SQLServerCentral.

]]>
Overview

Real-time logging and monitoring are essential for modern applications to track events, detect issues, and gain insights into system behavior. By integrating PostgreSQL with Python, we can build a powerful and efficient logging system that stores, queries, and monitors logs in real-time. In this article, we will walk through how we can achieve real-time logging into PostgreSQL database using Python and how we can monitor the logs in real-time using a 3rd party application.

Architecture

Before we dive into further details, let us understand the simple architecture in this article. We are using Python and PostgreSQL in our example. Here, a Python application will handle the log collection and log ingestion, while PostgreSQL will act as our central log database, which will record all the logs into a table in real-time. Lastly, we talked about a 3rd party application to monitor the logs, and we will be using Grafana for this. You can also use Metabase.

Initial Requirements

To start, we will need the following available in our local machine:

  • Python (version 3.7 or latest)
  • PostgreSQL (on local or cloud instance)
  • A few necessary Python packages

The necessary Python packages are the following: psycopg2-binary and loguru. To install these, open a command prompt (or similar) and run the following command:

pip install psycopg2-binary loguru

This will download all the necessary dependencies and install them on our local system.

Step 1: Creating a log table in PostgreSQL database

First, we will create a log table in our local PostgreSQL database. This table will contain all the information for all the log statements in our application (which will be developed soon). To simplify and understand it better, we will create the table using CREATE TABLE command. In real time applications, we can have a DDL script at the application start up which will create the table if it does not exists on application start up.

Run the following command in PostgreSQL database to create the table:

CREATE TABLE logs (
    id SERIAL PRIMARY KEY,
    timestamp TIMESTAMPTZ DEFAULT NOW(),
    level VARCHAR(10),          -- e.g., INFO, ERROR, DEBUG
    service VARCHAR(50),        -- microservice or component name
    message TEXT,
    context JSONB               -- for structured logging
);

This will create a table. Let us see in brief what data it stores:

  • id - an ID will be automatically generated serially when an entry is made into the table
  • timestamp - the exact timestamp when the record gets inserted into the table
  • level - determines the log level. Typically: INFO, WARN, DEBUG, TRACE, ERROR, etc.
  • service - this will hold the name of the service. Assume you have multiple files where from the application flow goes and each file has loggers implemented. So each log will have it's own service name mentioned. Typically, this will help us understand which file was responsible for this log entry.
  • message - any custom message that we want to make a note of here. This is the log message actually.
  • context - contains information about the log in a JSON format which can later be queries and used to be shown in some UI (if needed).

Step 2: Indexing on the query columns

We should create some indices on columns which will be frequently used in our where clause to query the log table and collect the logs for further processing and reporting. We will run the following queries in our PostgreSQL database to create the indices:

CREATE INDEX idx_logs_timestamp ON logs (timestamp DESC);
CREATE INDEX idx_logs_level ON logs (level);
CREATE INDEX idx_logs_service ON logs (service);

Step 3: Python utility code to connect to PostgreSQL database and insert log record

Now, we will create a utility python script that will establish a connection to our local PostgreSQL database and also contain a utility function that will be used from other python scripts to record log entries into the database.

The code does the following. First, it imports the necessary dependencies which will be used to establish connection to the PostgreSQL database. It also imports the datetime dependency which will be used to format the logger date time entry.

Next, we create a database configuration which will contain the details like database name, host, port, username and password. This will later be used to establish the connection to database.

Next, we are defining a function, called insert_log, which will be invoked later from other python modules to insert log into logs table in the PostgreSQL database. It takes the necessary table column entries as parameters (which will be passed from subsequent invocation modules later). In this method:

Using the DB_CONFIG we are creating a connection. The config is stored in the DB_CONFIG variable and used in this code:

   conn = psycopg2.connect(**DB_CONFIG)

Next, we are creating a cursor out of this established connection. Using this cursor, we are executing an insert statement to make an entry into the logs table in our PostgreSQL database. Next, we are committing the transaction and closing the cursor.

        cur = conn.cursor()
        cur.execute("""
            INSERT INTO logs (timestamp, level, service, message, context)
            VALUES (%s, %s, %s, %s, %s)
        """, (
            timestamp or datetime.utcnow(),
            level,
            service,
            message,
            Json(context or {})
        ))
        conn.commit()
        cur.close()

We are also letting the user know that log insertion was successfully completed. In case of any exception, we are printing to the console that log insertion operation failed. Finally, we are closing the connection (irrespective of successful or failed operation).

Following is the code snippet, which we save as db_logger.py:

import psycopg2
from psycopg2.extras import Json
from datetime import datetime

DB_CONFIG = {
    'dbname': 'testdb',
    'user': 'testuser',
    'password': 'testpass',
    'host': 'localhost',
    'port': 5432,
}


def insert_log(level, service, message, context=None, timestamp=None):
    """
    Inserts a log entry into the logs table.

    Args:
        level (str): Log level (e.g., INFO, ERROR).
        service (str): Name of the service or component.
        message (str): Log message.
        context (dict): Optional structured context.
        timestamp (datetime): Optional custom timestamp.
    """
    conn = None
    try:
        conn = psycopg2.connect(**DB_CONFIG)
        cur = conn.cursor()
        cur.execute("""
            INSERT INTO logs (timestamp, level, service, message, context)
            VALUES (%s, %s, %s, %s, %s)
        """, (
            timestamp or datetime.utcnow(),
            level,
            service,
            message,
            Json(context or {})
        ))
        conn.commit()
        cur.close()
        print(f"Log inserted: {level} - {message}")
    except Exception as e:
        print(f"Failed to insert log: {e}")
    finally:
        if conn:
            conn.close()

Note that this file has no use on its own. This is a utility module that will be invoked from other python modules when a logging is needed to be done.

Step 4: Python module code to insert a log record

Now, we will create a python script (custom_logger_handler.py) that will attempt some log entries from the application subsequently transferring them into the PostgreSQL database. In real time, there will be different python modules (or as you say service components) which will import the above utility file and reuse the insert_log function on each logging action. We, on the other hand, for keeping it simple and easy to understand, will create a python script which logs with different logging levels and service names for different numeric inputs.

To understand it better, let's see the below explanation:

  • We will take a input parameter in the python script which can be numeric.
  • The numeric input range will vary from 1 through 5, each for different logging level and service name.
  • As an example, if we pass 1 as a parameter, it will log at logging level INFO with service name as auth-service (assume that this means the logger was invoked from an auth-service class at INFO level).
  • Similarly, for example, if we pass 2 as a parameter, it will log at logging level ERROR with service name as payment-service (assume that this means the logger was invoked from a payment-service module for some error it observed during processing a payment).

The code does the following: First, it imports sys since we will be using the same to read the argument(s) provided by the invoker. It also imports insert_log from the db_logger utility file (as we said we will reuse that method to log events). Lastly, we import logging since we will override the default logger to ensure that it hits the PostgreSQL database.

Next, we are defining a class, called DBLogHandler, which is the game changer. Generally, when we call a function on logger (say logger.error), it logs into a log file. But we need the log to be stored in our database. So here we will override the basic logger functionality to be mapped to database operation. DBLogHandler inherits from logging.Handler and overrides the emit method which is invoked everytime a logger method is triggered.

# --- Custom Logging Handler ---
class DBLogHandler(logging.Handler):
    def emit(self, record):
        log_entry = self.format(record)
        insert_log(
            level=record.levelname,
            message=log_entry,
            service=record.module,
            context={
                'filename': record.filename,
                'funcName': record.funcName,
                'lineno': record.lineno,
                'args': record.args
            }
        )

This code formats the log message (based on a formatter we have defined later) with this line: log_entry = self.format(record).

The insert_log method is called with the necessary information:

      • level - DEBUG, INFO, WARNING, etc.
      • message - the formatted log string with the log message.
      • service - the name of the module where the log happened ( being fetched record.module which will return the python file name).
      • context - details like filename, function name, line number, and any arguments passed in the original log message.

Finally, we get or create a logger named DBLogger and set the minimum log level to DEBUG (i.e., it will capture all log levels from DEBUG and above). The code creates an instance of DBLogHandler we just created above. Then we define a log formatter that formats messages to look like this: 2025-10-03 12:00:00 - INFO - Something happened. This happens in this line:

formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') -

Now, we apply this formatter to our db_handler to ensure that this formatting is maintained. Finally, we set this db_handler to the default logger by adding this as a handler.

Now that our log handler is ready with the default logging functionality overridden to write logs into our database, we will define a custom function, called log_by_input, which will take the numeric parameter input (provided by user on runtime) as a choice and do the following:

    • First, it will populate a dictionary of log messages. This dictionary maps numeric choices to a set of strings.
    • if choice in logs: - if the input choice exists in the dictionary keys (1–5), then:
      • Get the log message by the choice number from the map above
      • Next, we match the input choice (similar to switch-case) and based on the input choice, we call a logger function and pass the log message we fetched for the input choice. For example, if input choice is 1, then we invoke logger.info("This is an INFO log").
      • Since our logger is already overridden as per our configuration, this invocation will finally lead to storing the data in the database (using the configuration that was setup using DBLogHandler).

Below is the full code snippet for the python script:

import logging
import sys
from db_logger import insert_log

# --- Custom Logging Handler ---
class DBLogHandler(logging.Handler):
    def emit(self, record):
        log_entry = self.format(record)
        insert_log(
            level=record.levelname,
            message=log_entry,
            service=record.module,
            context={
                'filename': record.filename,
                'funcName': record.funcName,
                'lineno': record.lineno,
                'args': record.args
            }
        )

# --- Configure Logging ---
logger = logging.getLogger('DBLogger')
logger.setLevel(logging.DEBUG)

# Add our DBLogHandler
db_handler = DBLogHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
db_handler.setFormatter(formatter)
logger.addHandler(db_handler)

def log_by_input(choice: int):
    """
    Inserts a log entry based on the input number.
    """
    logs = {
        1: {
            "This is an INFO log"
        },
        2: {
            "This is an ERROR log"
        },
        3: {
            "This is a WARNING log"
        },
        4: {
            "This is a DEBUG log"
        },
        5: {
            "This is a CRITICAL log"
        },
    }

    if choice in logs:
        log_data = logs[choice]
        match choice:
            case 1:
                logger.info(log_data)
            case 2:
                logger.error(log_data)
            case 3:
                logger.warning(log_data)
            case 4:
                logger.debug(log_data)
            case 5:
                logger.critical(log_data)
            case _:
                print("Number not recognized")
    else:
        print(f"Invalid input: {choice}. Please use a number from 1 to {len(logs)}.")


if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: python log_event.py <log_number>")
        print("Example: python log_event.py 1")
    else:
        try:
            input_number = int(sys.argv[1])
            log_by_input(input_number)
        except ValueError:
            print("Please provide a valid integer input.")

Note that in a real-time application, this will not be done using switch cases. We are using switch case to simplify and check all log levels from one python module.

Step 5: Executing the python module

Finally, we are all set to check our log messages being written into the PostgreSQL database. To execute the above file, we would need to run the following command from the terminal (opened from the folder containing the python scripts we created):

python custom_logger_handler.py 1

Once the above command is run, upon successful log operation, we should see the following message in our terminal:

Now, let's head over to PgAdmin and check the logs table. We can see a row inserted below:

Now, we will run the same above command but with different parameters ranging from 1 to 5 multiple times to make multiple random log entries but with different log levels (just like it happens in a real time application). After that, when we query the database, we can see many records randomly created like this:

As we can see, there are total 128 records created. So every time from a python module a logger function is invoked, the entry of the log message will go to the database.

Step 6: Monitoring logs in real-time using Grafana

We will see how we can monitor logs using a 3rd party application like Grafana. To see this, we first need to install Grafana either locally or on docker. In this article, we will install Grafana on docker (or Rancher). To install Grafana on docker, run the following command:

docker run -d -p 3000:3000 --name=grafana grafana/grafana

This command will pull the Grafana image and install it on our docker system and start it on 3000 port number.

Once installed and run, we can head over to localhost:3000. The username will be admin and password will also be admin. These are the default credentials. Once we login, we can see the Grafana homepage.

Setting up PostgreSQL database on Grafana

Next, we will need to link our local PostgreSQL database into Grafana to query the database tables and monitor in real time. To do so, we have to head over to Connections -> Add new connection. In the search box, type in postgres to search for the plugin as shown below:

After clicking on the PostgreSQL tile, click on Add new data source button on the right. This will open up a page to fill in the connection details. Fill in accordingly:

  • Host URL: host.docker.internal:5432 (since we are running Grafana on docker. If installed locally, fill in localhost:5432)
  • Database name: testdb (use your database name)
  • Username: postgres (use your username)
  • Password: ******** (use your password)
  • TLS/SSL Mode: disabled

Click on the Save & test button to check your data source connection. If successful, we should see a following message:

Creating a dashboard for monitoring

Next, with our data source configured as the locally installed PostgreSQL database, we will build a dashboard to monitor logs in real time. To do so, head over to Dashboards from the left-hand menu bar and click on Create dashboard. Click on Add Visualization and select the newly created data source. The following screen opens up:

Now, select Code instead of Builder in the below section as shown in the image below:

This will open the query editor to put in our query directly. We will use the following query to monitor our logs in real time:

SELECT level, count(*) as number_of_errors FROM logs
GROUP BY level

Paste this query into the query editor and click on the Run Query button. You should see the following screen:

Click on Open Visualization Suggestions and we will see many suggestions of monitoring as follows:

We can click on any one of them that suits our requirements. All we need to do is just click on the refresh button and if there are any more log entries generated, the same will start reflecting here.

Conclusion

We have learnt how we can customize the logger in python to actually connect to our database and make log entries into the database table for every logger functionality invoked which can help us in saving logs into the database and monitor them on need. When the logger functionalities will be invoked from different python modules in the real time application, the service name will also get changed and in that case, the use case will be even more useful and explanatory as new modules being rolled out to clients can be checked for their performance. We also learnt on how we can use Grafana to monitor database entries using simple queries in a dashboard form which can be very helpful for non-technical backend support teams to montior the application performance and report any unexpected error spikes in the application.

The post Implementing PostgreSQL with Python for Real-Time Logging and Monitoring appeared first on SQLServerCentral.

]]>
Getting Started with Bayesian Modeling https://www.sqlservercentral.com/articles/getting-started-with-bayesian-modeling Fri, 19 Sep 2025 00:00:51 +0000 https://www.sqlservercentral.com/?post_type=ssc_article&p=4639399 Multivariate analysis in data science is a type of analysis that tackles multiple input/predictor and output/predicted variables. This tip explores the problem of predicting air pollution measured in particulate matter (PM) concentration based on ambient temperature, humidity, and pressure using a Bayesian Model.

The post Getting Started with Bayesian Modeling appeared first on SQLServerCentral.

]]>
The post Getting Started with Bayesian Modeling appeared first on SQLServerCentral.

]]>
Using the FP-Growth Algorithm to Mine Useful Patterns in Data https://www.sqlservercentral.com/articles/using-the-fp-growth-algorithm-to-mine-useful-patterns-in-data Mon, 08 Sep 2025 00:00:01 +0000 https://www.sqlservercentral.com/?post_type=ssc_article&p=4632174 This article looks at using the FP-Growth algorithm from Python to mine data in SQL Server.

The post Using the FP-Growth Algorithm to Mine Useful Patterns in Data appeared first on SQLServerCentral.

]]>
Introduction

In this example, we will show patterns between items stored in SQL Server using Python. We will use the FP-Growth algorithm to find common elements in the data that appear together in the transactions. For example, if we go to the supermarket, what items do we usually buy together?

Requirements

  1. First, we will need the SQL Server installed. In SQL Server, we will have the data to analyze.
  2. Secondly, we will use AdventureworksDW2009. This database contains sample data that we will use for this article.
  3. Thirdly, we will have Python installed.
  4. Finally, we will use Visual Studio Code to run Python, but you can use any Python platform to run the Python code.

FP-Growth algorithm patterns

FP-Growth (Frequent Pattern Growth) is a well-known mining algorithm used to find patterns in transactions. For example, a famous case study showed that people buy dippers and beer at the supermarket. This is because some young parents buy beer to watch the NFL (National Football League), but they will stay and take care of the children. Sometimes this algorithm helps a lot to find non-obvious patterns in the data.

Other applications for the FP-Growth algorithm

The typical example is the dippers and the beers in the supermarket, explained before. Also, in e-commerce, you can find patterns between products when customers buy products using the website. Additionally, you can check consumption patterns in restaurants like Coke+chips+hamburger and create attractive combos to increase sales. In addition, you can find patterns between genetic mutations and cancer.

Finally, there are other applications like financial fraud (suspicious combinations of transactions), e-learning platforms (content sequence of videos or articles).

Example using the FP-Growth algorithm

For this example, we will use the vAssocSeqLineItems. This is a view inside SQL Server in the AdventureworksDW2019 table.

Adventureworks is a fictitious company related to bikes.

This view contains information about the parts of a bike, order numbers, line numbers, and Models.

We will work to find patterns between these products.

Writing PF-Growth code

The code to find PF-Growth patterns of our data is the following:

import pandas as pd

import pyodbc

from mlxtend.frequent_patterns import fpgrowth, association_rules

# Connect to SQL Server

conn = pyodbc.connect(

'DRIVER={ODBC Driver 17 for SQL Server};' # Specify SQL Server drives

'SERVER=localhost;' # SQL Server instance name

'DATABASE=AdventureWorksDW2019;' # Database name

'Trusted_Connection=yes;' # Use Windows Authentication to connect

)

# Select data from the view vAssocSeqLineItems

query = """

SELECT OrderNumber, Model

FROM vAssocSeqLineItems

"""

df = pd.read_sql(query, conn)

# Transform to baskedt format (uno-hot encoding)

basket = df.pivot_table(index='OrderNumber',

columns='Model',

aggfunc=lambda x: 1,

fill_value=0)

# Apply the FP-Growth algorithm minimal support = 5

frequent_itemsets = fpgrowth(basket, min_support=0.05, use_colnames=True)

# 5. Generate association rules with confidence = 50

rules = association_rules(frequent_itemsets, metric="confidence", min_threshold=0.5)

# Show results

print("✅ Frequent sets found:")

print(frequent_itemsets)

print("\n📘 Association rules found:")

print(rules[['antecedents', 'consequents', 'support', 'confidence', 'lift']])

PF-Growth libraries

First, we will explain the libraries invoked:

import pandas as pd

import pyodbc

from mlxtend.frequent_patterns import fpgrowth, association_rules

The pandas library is used to read the data and convert it to a dataframe. Also, the pyodbc is used to connect to the SQL Server database. Finally, the mlxtend.frequent_patterns library is used to find the patterns of the data.

PF-Growth database connection

The next section of code is used to connect to SQL Server.

# Connect to SQL Server

conn = pyodbc.connect(

'DRIVER={ODBC Driver 17 for SQL Server};' # Specify SQL Server drives

'SERVER=localhost;' # SQL Server instance name

'DATABASE=AdventureWorksDW2019;' # Database name

'Trusted_Connection=yes;' # Use Windows Authentication to connect

)

First, we create the connection named conn using the pyodbc library. This connection will use the ODBC Driver 17 for SQL Server. This driver is usually installed by default with SQL Server.

conn = pyodbc.connect(

'DRIVER={ODBC Driver 17 for SQL Server};' # Specify SQL Server drives

Secondly, we will connect to our local SQL Server. In this example, you can add your SQL Server name here, and the database is the AdventureworksDW2019 database. Check the requirements if you have not installed it yet.

'SERVER=localhost;' # SQL Server instance name

'DATABASE=AdventureWorksDW2019;' # Database name

Finally, we will provide the Windows Authentication credentials. This means that the current Windows user that is running the code should have access to SQL Server.

'Trusted_Connection=yes;' # Use Windows Authentication to connect

Run a query and load the data

Also, we will run the query. We connected to SQL Server, and now we will run a query on the view vAssocSeqLineItems and load the data into a DataFrame.

# Select data from the view vAssocSeqLineItems

query = """

SELECT OrderNumber, Model

FROM vAssocSeqLineItems

"""

df = pd.read_sql(query, conn)

Prepare the data for the PF-Growth algorithm

This is the main part of the code where we convert to a format useful for the PF-Growth algorithm.

# Transform to baskedt format (one-hot encoding)

basket = df.pivot_table(index='OrderNumber',

columns='Model',

aggfunc=lambda x: 1,

fill_value=0)

First, we pivot the data and add a 0 if the item is not bought in the transaction and 1 if it is bought.

If the value is in the order, the value is equal to 1 (aggfunc=lambda x: 1). Otherwise, the value is 0 (fill_value=0)

Find frequent items and association rules

This section of the code finds the frequent items in the orders and the association rules.

# Apply the FP-Growth algorithm, minimal support = 5

frequent_itemsets = fpgrowth(basket, min_support=0.05, use_colnames=True)

# 5. Generate association rules with confidence = 50

rules = association_rules(frequent_itemsets, metric="confidence", min_threshold=0.5)

First, we use the fpgrowth function with the minimum support equal to 0.05. This function finds the frequency of the item groups. The basket is the table generated in the previous step. Only keeps combinations of products that appear in fewer than 50 0f the orders. Also, we have the use_colnames=true to use the actual product names.

In addition, we have the association_rules functions. First, we receive the frequent_itemsets values. Secondly, we have the metric ="confidence". Confidence is used to measure how often items in consqquents appear in transactions.

Finally, min_threshold=0.5 means that we are using a confidence level equal to 50 This means that we are only counting rules where the confidence is equal to 50 0r higher.

Print the FP-Growth results

In the last part, we are going to show the results.

# Show results

print("✅ Frequent sets found:")

print(frequent_itemsets)

print("\n📘 Association rules found:")

print(rules[['antecedents', 'consequents', 'support', 'confidence', 'lift']])

We are just printing the results of the PF-Growth algorithm. Let’s run the code and analyse the results.

First, we have the Frequent sets found. This will show you the frequency of the items. Support is a value between 1 and 0. If the item sport 100 has a support of 0.29, it means that 29 0f the orders included that item.

Secondly, we have the association rules found.

Also, these results show the related items. In this example, the mountain bottle cage is bought with the water bottle. Also, some items bought together are the Road Bottle Cage and the water bottle. We could create some special combos and offers to increase sales.

As you can see, the code is not so difficult. We can find patterns in our data and find relationships between products.

 

 

The post Using the FP-Growth Algorithm to Mine Useful Patterns in Data appeared first on SQLServerCentral.

]]>
Using psycopg2 to Connect Python to PostgreSQL https://www.sqlservercentral.com/articles/using-psycopg2-to-connect-python-to-postgresql Mon, 25 Aug 2025 00:00:01 +0000 https://www.sqlservercentral.com/?post_type=ssc_article&p=4631991 Learn how to connect to a PostgreSQL database from Python with this popular library.

The post Using psycopg2 to Connect Python to PostgreSQL appeared first on SQLServerCentral.

]]>
Overview

The psycopg2 library makes working with PostgreSQL databases in Python much easier. This has been the most popular PostgreSQL adapter for Python. Whether you're building backend services, data pipelines, or automation scripts, psycopg2 is your go-to library for integrating PostgreSQL with Python.

In this article, we will guide you through everything you need to get started with psycopg2, from installation and basic CRUD operations to best practices for production-ready code.

Prerequisites

To follow along, you'll need:

  • Python 3.x installed
  • PostgreSQL installed locally or access to a PostgreSQL server
  • Basic knowledge of SQL and Python

Step 1: Installing psycopg2

There are two versions:

  • psycopg2: Binary, faster to install, includes C dependencies.
  • psycopg2-binary: Recommended for development only.

Install via pip:

pip install psycopg2-binary

For production, it's recommended to use:

pip install psycopg2

and compile from source.

Step 2: Create a PostgreSQL Database

If you don’t already have a PostgreSQL database, create one:

CREATE DATABASE testdb;

Also, create a user (if needed) and grant permissions:

CREATE USER testuser WITH PASSWORD 'testpass';
GRANT ALL PRIVILEGES ON DATABASE testdb TO testuser;

Step 3: Connecting to PostgreSQL Using Python

Here's a basic connection example. The psycopg2 library helps to connect to the local PostgreSQL database with the details of the database connection mentioned. Next, we create a cursor on the connection so that we can execute some standard queries and collect the result. Once done, we should carefully close the cursor and the connection in the order they were opened.

import psycopg2

# Establishing the connection
conn = psycopg2.connect(
    dbname="testdb",
    user="testuser",
    password="testpass",
    host="localhost",
    port="5432"
)

# Creating a cursor object
cur = conn.cursor()

# Print PostgreSQL version
cur.execute("SELECT version();")
print(cur.fetchone())

# Close connection
cur.close()
conn.close()

All you need to do is put the above content in a text editor and save the file as postgres_conn.py. Open a command prompt (or similar) terminal from the directory where the file is saved and run the following command:

python postgres_conn.py

If everything is set up properly, you should see the PostgreSQL version printed.

Step 4: Creating a Table

Create a new file named create_table.py and paste the following contents inside it. Save the file and run it the same way as mentioned above.

import psycopg2

conn = psycopg2.connect(
    dbname="testdb",
    user="testuser",
    password="testpass",
    host="localhost",
    port="5432"
)

cur = conn.cursor()

# Create a table
cur.execute("""
    CREATE TABLE IF NOT EXISTS employees (
        id SERIAL PRIMARY KEY,
        name VARCHAR(100),
        department VARCHAR(50),
        salary INTEGER
    )
""")

conn.commit()
cur.close()
conn.close()

If everything is fine, you should see the following output:

If you navigate to pgAdmin and try to run the query in testdb, you should see the table with no data in it:

Step 5: Insert Data

Next, we will try to insert data into this newly created table. For that, you have to create a new file named: insert_data.py and paste the below content inside it. Save the file and run it as mentioned earlier.

import psycopg2

conn = psycopg2.connect(
    dbname="testdb",
    user="testuser",
    password="testpass",
    host="localhost",
    port="5432"
)

cur = conn.cursor()

# Insert data
cur.execute("""
    INSERT INTO employees (name, department, salary)
    VALUES (%s, %s, %s)
""", ("John Doe", "Engineering", 70000))

conn.commit()
cur.close()
conn.close()

The above code does exactly similar things with the only difference that now with the help of the cursor, we are running an insert query to insert data into the database table.

Note that we use %s in the insert query. This is called a parameterized query. It protects your code from SQL injection attacks. So, it is recommended to always use placeholders!

If everything is fine, you should see the following output:

Check from pgAdmin and you should see now the table has a record inserted:

Step 6: Read Data

Similarly, as above, create a new file called fetch_data.py and paste the below contents inside it. Save the file and run it.

import psycopg2

conn = psycopg2.connect(
    dbname="testdb",
    user="testuser",
    password="testpass",
    host="localhost",
    port="5432"
)

cur = conn.cursor()

cur.execute("SELECT * FROM employees;")
rows = cur.fetchall()

for row in rows:
    print(row)

cur.close()
conn.close()

Once you run it, you should see a similar output:

Step 7: Update Data

Create a new file named update_data.py and put the below contents inside it. Save the file and run it to update the existing data.

import psycopg2

conn = psycopg2.connect(
    dbname="testdb",
    user="testuser",
    password="testpass",
    host="localhost",
    port="5432"
)

cur = conn.cursor()

cur.execute("""
    UPDATE employees SET salary = %s WHERE name = %s
""", (80000, "John Doe"))

conn.commit()

Once you run, you should see a similar output:

Upon validating from pgAdmin, you should be able to see the changes reflecting there too:

Step 8: Delete Data

Create a new file called delete_data.py and paste the below contents inside. Save the file and run it.

import psycopg2

conn = psycopg2.connect(
    dbname="testdb",
    user="testuser",
    password="testpass",
    host="localhost",
    port="5432"
)

cur = conn.cursor()

cur.execute("""
    DELETE FROM employees WHERE name = %s
""", ("John Doe",))

conn.commit()

Run the above file and you should get a similar output:

Upon checking from pgAdmin, you should see the table again empty with the only existing record deleted:

Conclusion

We just walked through on how we can install the popular python library psycopg2 and use it inside python script to connect to PostgreSQL database and perform CRUD operations. We can also add try catch blocks in the python script to ensure that in case of database connection and/or operation failures, the exit is handled gracefully and a proper intended error message is shown to the user.

The post Using psycopg2 to Connect Python to PostgreSQL appeared first on SQLServerCentral.

]]>
Load Data into Snowflake Using Python with Pandas https://www.sqlservercentral.com/articles/load-data-into-snowflake-using-python-with-pandas Mon, 21 Apr 2025 00:00:37 +0000 https://www.sqlservercentral.com/?post_type=ssc_article&p=4575512 Loading data into Snowflake is a common need. Using Python and pandas is a common go-to solution for data professionals

The post Load Data into Snowflake Using Python with Pandas appeared first on SQLServerCentral.

]]>
The post Load Data into Snowflake Using Python with Pandas appeared first on SQLServerCentral.

]]>