Index de l'article

Advanced SQL queries

Execute simple query with sqlalchemy

With sqlalchemy, best way with Pandas.

from sqlalchemy import create_engine, text
 
engineSuppression = create_engine('mysql+mysqldb://%s:%s@%s:%i/%s' % (username, password, host, port, database))
 
sqlSuppression = '''DELETE FROM my_table WHERE id = 1;'''
 
with engineSuppression.connect() as conSuppression:
    conSuppression.execute(text(sqlSuppression))
    conSuppression.commit()
    print('\nid 1 supprimé !')

Execute SQL queries from a file with sqlalchemy

With sqlalchemy, best way with Pandas.

from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
 
my_engine = create_engine('mysql+mysqldb://%s:%s@%s:%i/%s?charset=utf8mb4' % (my_username, my_password, my_host, my_port, my_database))
 
my_sql_file = 'C:/MyQueries.sql'
 
# LOAD THE SQL SCRIPT
with open(queriesDirectory + my_sql_file, 'r') as sql_file:
    sql_script = sql_file.read()
 
# CREATE SESSION
my_session = sessionmaker(bind=my_engine)
session = my_session()
 
# START A TRANSACTION
session.begin()
 
try:
    session.execute(text(sql_script))
    print('Execution of "' + my_sql_file + '" finished!')
except:
    print('We can not execute "' + my_sql_file + '"!')
    pass
 
# VALIDATE THE TRANSACTION
session.commit()
 
# CLOSE SESSION AND CONNECTION
session.close()
my_engine.dispose()

Execute SQL queries from a file with mysqlclient

import MySQLdb
from termcolor import colored
 
conn = MySQLdb.connect(host='localhost',user='root',password='',database='ma_base',port=3308)
 
my_sql_file = 'C:/MyQueries.sql'
 
with open(sqlQueries, 'r', encoding='utf8') as sql_file:
    sql_script = str(my_sql_file.read())
 
try:
    cursor = conn.cursor()
    cursor.execute(sql_script)
    cursor.close()
    print(colored('\nOK!', 'green'))
except:
    print(colored('\nIssue!', 'red'))
    pass

Execute an SQL query in a function with sqlalchemy using with

Using with we are sure to open and close engine and connection properly!

Here an example searching if tables exist in a DB:

def isTableExistInTarget(myTable):
 
    sqlTableTargetExist = '''SELECT TABLE_NAME FROM information_schema.TABLES WHERE table_schema = \'''' + nameDbTarget + '''\' AND TABLE_NAME = \'''' + prefixTableTarget + myTable + '''\' ;'''
 
    with engineTarget.connect() as conTarget:
        myResultTarget = conTarget.execute(text(sqlTableTargetExist)).scalar()
 
    if myResultTarget == prefixTableTarget + myTable:
        myAnswerTarget = 'Yes'
 
    else:
        myAnswerTarget = 'No'
 
    return myAnswerTarget

Execute a lot of SQL queries from a dataframe in batch with sqlachemy

  • Here we use a function named escape_value available somewhere in this article, lol, just add it if needy, or remove it.
  • Here we use f{string}.
 
batch_queries = []
batch_size = 500
 
# SQL TO INSERT IN TARGET
for index, row in dfSource.iterrows():
    columns = ', '.join(row.index)
    values = ', '.join(f"'{escape_value(v)}'" for v in row.values)
    queryInsert = '''INSERT IGNORE INTO my_table''' + f'''({columns}) VALUES ({values}) ;'''
 
    # Add in batch
    batch_queries.append(queryInsert)
 
    # Run batch
    if len(batch_queries) == batch_size:
        # Build batch
        full_query = ' '.join(batch_queries)
        session.execute(text(full_query))
        print(str(len(batch_queries)) + ' queries in batch')
        # Empty batch
        batch_queries.clear()
 
# Queries out the last batch
if batch_queries:
    full_query = ' '.join(batch_queries)
    session.execute(text(full_query))
    print(str(len(batch_queries)) + ' queries in last batch')

Use percentage symbol in queries (%)

The percentage symbol is a special character with Python, so to use it in query, just mention the unicode string with u:

query = u'''UPDATE content SET introtext = REPLACE(introtext, 'XXX', 'ZZZ') WHERE introtext LIKE '%AAA%' ;'''

And maybe better to double it:

query = u'''UPDATE content SET introtext = REPLACE(introtext, 'XXX', 'ZZZ') WHERE introtext LIKE '%%AAA%%' ;'''

Escape some character in query

Another way to use special Python character in a function:

def escape_value(value):
    value_str = str(value)
    value_str = value_str.replace("\\", "\\\\")
    value_str = value_str.replace("'", "\\'")
    value_str = value_str.replace(":", "\\:")
 
    return value_str
 
...
 
values = ', '.join(f"'{escape_value(v)}'" for v in row.values)
...

Time out with sqlalchemy

If you experiment some connection/transaction issues, you can try this:

from sqlalchemy import create_engine
 
engineTarget = create_engine('mysql+mysqldb://%s:%s@%s:%i/%s' % (myUser, myPwd, myHost, myPort, myDb), connect_args={'connect_timeout': 60, 'read_timeout': 60, 'write_timeout': 60})

But personally I don't like this trick, it's not clear. When I have timeout problems, I prefer to make a function to check for good execution and to repeat the query if bad execution. Search here for Manage unexplained connection losses.

Connexions pool/queue pool with sqlalchemy

If you experiment some connection/transaction issues, you can try this:

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
 
engineTarget = create_engine('mysql+mysqldb://%s:%s@%s:%i/%s' % (myUser, myPwd, myHost, myPort, myDb), poolclass=QueuePool, pool_recycle=3600)

But personally I don't like this trick, it's not clear. When I have timeout problems, I prefer to make a function to check for good execution and to repeat the query if bad execution. Search here for Manage unexplained connection losses.

Manage unexplained connection losses with sqlalchemy

Sometimes, if you run a lot of specific queries, with different connections, different DBs, melting SELECT, UPDATE, UPDATE from a CONCAT, massive updates, PREPARE EXECUTE... you can experiment some weird connection/transaction issues.

I have not a perfect explanation, but I suspect a game of multiple transactions, imperfectly managed by SqlAlchemy - but not necessarily for bad reasons - to set some security limitations. Or something like that😂

No problem: here a tip to check if your queries worked, and otherwise, to restart them few times after a delay.

For UPDATE queries:

from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import OperationalError
 
myEngine = create_engine('mysql+mysqldb://%s:%s@%s:%i/%s' % (myUser, myPwd, myHost, myPort, myDb))
my_session = sessionmaker(bind=myEngine)
mySession = my_session()
mySession.begin()
 
def query_management_update(session, query, max_retries, retry_delay):
    attempts = 0
    while attempts < max_retries:
        try:
            session.execute(text(query))
            session.commit()
            print('OK, update executed on the first try.')
            attempts = max_retries
 
        except OperationalError as e:
            attempts += 1
            if attempts < max_retries:
                print(f'Connection issue, attempt {attempts}/{max_retries}. Retry in {retry_delay} seconds...')
                time.sleep(retry_delay)
            else:
                print('All attempts did not work!')
                raise e
 
myQuery = u'''UPDATE my_table SET my_field = REPLACE(my_field, 'xptdr', 'lol') WHERE my_field LIKE '%xptdr%' ;'''
 
query_management_update(mySession, myQuery, 5, 5)

For SELECT queries:

from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import OperationalError
 
myEngine = create_engine('mysql+mysqldb://%s:%s@%s:%i/%s' % (myUser, myPwd, myHost, myPort, myDb))
 
def query_management_select(query, max_retries, retry_delay):
    global myResult
    attempts = 0
    while attempts < max_retries:
        try:
            with myEngine.connect() as myConnexion:
                myResult = myConnexion.execute(text(query)).scalar()
                print('Select executed on the first try.')
                attempts = max_retries
        except OperationalError as e:
            attempts += 1
            if attempts < max_retries:
                print(f'Connection issue, attempt {attempts}/{max_retries}. Retry in {retry_delay} seconds...')
                time.sleep(retry_delay)
            else:
                print('All attempts did not work!')
                raise e
 
myQuery = '''SELECT TABLE_NAME FROM information_schema.TABLES WHERE table_schema = \'my_db\' AND TABLE_NAME = \'my_table\' ;'''
 
query_management_select(myQuery, 5, 5)

 

Liens ou pièces jointes
Télécharger ce fichier (France-Departements-Deformation.zip)France-Departements-Deformation.zip[France-Departements-Deformation]335 Ko
Télécharger ce fichier (simple_countries.zip)simple_countries.zip[simple_countries]1880 Ko