Advanced SQL queries
Execute simple query with sqlalchemy
With sqlalchemy
, best way with Pandas.
from sqlalchemy import create_engine, text engineSuppression = create_engine('mysql+mysqldb://%s:%s@%s:%i/%s' % (username, password, host, port, database)) sqlSuppression = '''DELETE FROM my_table WHERE id = 1;''' with engineSuppression.connect() as conSuppression: conSuppression.execute(text(sqlSuppression)) conSuppression.commit() print('\nid 1 supprimé !')
Execute SQL queries from a file with sqlalchemy
With sqlalchemy
, best way with Pandas.
from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker my_engine = create_engine('mysql+mysqldb://%s:%s@%s:%i/%s?charset=utf8mb4' % (my_username, my_password, my_host, my_port, my_database)) my_sql_file = 'C:/MyQueries.sql' # LOAD THE SQL SCRIPT with open(queriesDirectory + my_sql_file, 'r') as sql_file: sql_script = sql_file.read() # CREATE SESSION my_session = sessionmaker(bind=my_engine) session = my_session() # START A TRANSACTION session.begin() try: session.execute(text(sql_script)) print('Execution of "' + my_sql_file + '" finished!') except: print('We can not execute "' + my_sql_file + '"!') pass # VALIDATE THE TRANSACTION session.commit() # CLOSE SESSION AND CONNECTION session.close() my_engine.dispose()
Execute SQL queries from a file with mysqlclient
import MySQLdb from termcolor import colored conn = MySQLdb.connect(host='localhost',user='root',password='',database='ma_base',port=3308) my_sql_file = 'C:/MyQueries.sql' with open(sqlQueries, 'r', encoding='utf8') as sql_file: sql_script = str(my_sql_file.read()) try: cursor = conn.cursor() cursor.execute(sql_script) cursor.close() print(colored('\nOK!', 'green')) except: print(colored('\nIssue!', 'red')) pass
Execute an SQL query in a function with sqlalchemy
using with
Using with
we are sure to open and close engine and connection properly!
Here an example searching if tables exist in a DB:
def isTableExistInTarget(myTable): sqlTableTargetExist = '''SELECT TABLE_NAME FROM information_schema.TABLES WHERE table_schema = \'''' + nameDbTarget + '''\' AND TABLE_NAME = \'''' + prefixTableTarget + myTable + '''\' ;''' with engineTarget.connect() as conTarget: myResultTarget = conTarget.execute(text(sqlTableTargetExist)).scalar() if myResultTarget == prefixTableTarget + myTable: myAnswerTarget = 'Yes' else: myAnswerTarget = 'No' return myAnswerTarget
Execute a lot of SQL queries from a dataframe in batch with sqlachemy
- Here we use a function named
escape_value
available somewhere in this article, lol, just add it if needy, or remove it. - Here we use
f{string}
.
batch_queries = [] batch_size = 500 # SQL TO INSERT IN TARGET for index, row in dfSource.iterrows(): columns = ', '.join(row.index) values = ', '.join(f"'{escape_value(v)}'" for v in row.values) queryInsert = '''INSERT IGNORE INTO my_table''' + f'''({columns}) VALUES ({values}) ;''' # Add in batch batch_queries.append(queryInsert) # Run batch if len(batch_queries) == batch_size: # Build batch full_query = ' '.join(batch_queries) session.execute(text(full_query)) print(str(len(batch_queries)) + ' queries in batch') # Empty batch batch_queries.clear() # Queries out the last batch if batch_queries: full_query = ' '.join(batch_queries) session.execute(text(full_query)) print(str(len(batch_queries)) + ' queries in last batch')
Use percentage symbol in queries (%)
The percentage symbol is a special character with Python, so to use it in query, just mention the unicode string with u
:
query = u'''UPDATE content SET introtext = REPLACE(introtext, 'XXX', 'ZZZ') WHERE introtext LIKE '%AAA%' ;'''
And maybe better to double it:
query = u'''UPDATE content SET introtext = REPLACE(introtext, 'XXX', 'ZZZ') WHERE introtext LIKE '%%AAA%%' ;'''
Escape some character in query
Another way to use special Python character in a function:
def escape_value(value): value_str = str(value) value_str = value_str.replace("\\", "\\\\") value_str = value_str.replace("'", "\\'") value_str = value_str.replace(":", "\\:") return value_str ... values = ', '.join(f"'{escape_value(v)}'" for v in row.values) ...
Time out with sqlalchemy
If you experiment some connection/transaction issues, you can try this:
from sqlalchemy import create_engine engineTarget = create_engine('mysql+mysqldb://%s:%s@%s:%i/%s' % (myUser, myPwd, myHost, myPort, myDb), connect_args={'connect_timeout': 60, 'read_timeout': 60, 'write_timeout': 60})
But personally I don't like this trick, it's not clear. When I have timeout problems, I prefer to make a function to check for good execution and to repeat the query if bad execution. Search here for Manage unexplained connection losses.
Connexions pool/queue pool with sqlalchemy
If you experiment some connection/transaction issues, you can try this:
from sqlalchemy import create_engine from sqlalchemy.pool import QueuePool engineTarget = create_engine('mysql+mysqldb://%s:%s@%s:%i/%s' % (myUser, myPwd, myHost, myPort, myDb), poolclass=QueuePool, pool_recycle=3600)
But personally I don't like this trick, it's not clear. When I have timeout problems, I prefer to make a function to check for good execution and to repeat the query if bad execution. Search here for Manage unexplained connection losses.
Manage unexplained connection losses with sqlalchemy
Sometimes, if you run a lot of specific queries, with different connections, different DBs, melting SELECT
, UPDATE
, UPDATE
from a CONCAT
, massive updates, PREPARE EXECUTE
... you can experiment some weird connection/transaction issues.
I have not a perfect explanation, but I suspect a game of multiple transactions, imperfectly managed by SqlAlchemy - but not necessarily for bad reasons - to set some security limitations. Or something like that😂
No problem: here a tip to check if your queries worked, and otherwise, to restart them few times after a delay.
For UPDATE
queries:
from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker from sqlalchemy.exc import OperationalError myEngine = create_engine('mysql+mysqldb://%s:%s@%s:%i/%s' % (myUser, myPwd, myHost, myPort, myDb)) my_session = sessionmaker(bind=myEngine) mySession = my_session() mySession.begin() def query_management_update(session, query, max_retries, retry_delay): attempts = 0 while attempts < max_retries: try: session.execute(text(query)) session.commit() print('OK, update executed on the first try.') attempts = max_retries except OperationalError as e: attempts += 1 if attempts < max_retries: print(f'Connection issue, attempt {attempts}/{max_retries}. Retry in {retry_delay} seconds...') time.sleep(retry_delay) else: print('All attempts did not work!') raise e myQuery = u'''UPDATE my_table SET my_field = REPLACE(my_field, 'xptdr', 'lol') WHERE my_field LIKE '%xptdr%' ;''' query_management_update(mySession, myQuery, 5, 5)
For SELECT
queries:
from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker from sqlalchemy.exc import OperationalError myEngine = create_engine('mysql+mysqldb://%s:%s@%s:%i/%s' % (myUser, myPwd, myHost, myPort, myDb)) def query_management_select(query, max_retries, retry_delay): global myResult attempts = 0 while attempts < max_retries: try: with myEngine.connect() as myConnexion: myResult = myConnexion.execute(text(query)).scalar() print('Select executed on the first try.') attempts = max_retries except OperationalError as e: attempts += 1 if attempts < max_retries: print(f'Connection issue, attempt {attempts}/{max_retries}. Retry in {retry_delay} seconds...') time.sleep(retry_delay) else: print('All attempts did not work!') raise e myQuery = '''SELECT TABLE_NAME FROM information_schema.TABLES WHERE table_schema = \'my_db\' AND TABLE_NAME = \'my_table\' ;''' query_management_select(myQuery, 5, 5)