18 5 月, 2024

Manufacturing

Processing Machinery

A collection of tutorials for operating MySQL with Python!

7 min read

1. Python operation database introduction
Hello everyone, I am Picasso (lock!).

The Python standard database interface is Python DB-API, which provides developers with a database application programming interface. The Python database interface supports many databases, and you can choose the database that suits your project:

GadFly

mSQL

MySQL

PostgreSQL

Microsoft SQL Server 2000

Informix

Interbase

Oracle

Sybase…

You can visit the Python database interface and API to view a detailed list of supported databases.

You need to download different DB API modules for different databases. For example, if you need to access Oracle database and Mysql data, you need to download Oracle and MySQL database modules.

DB-API is a specification. It defines a series of necessary objects and database access methods in order to provide a consistent access interface for various underlying database systems and various database interface programs.

Python’s DB-API implements interfaces for most databases, and after using it to connect to each database, you can operate each database in the same way.

Python DB-API usage process:

Import the API module.

Get a connection to the database.

Execute SQL statements and stored procedures.

Close the database connection.

2. python operation MySQL module
Python operates MySQL mainly in two ways:

DB module (native SQL)

PyMySQL (support python2.x/3.x)

MySQLdb (currently only supports python2.x)

ORM framework

SQLAchemy

2.1 PyMySQL module
This article mainly introduces the PyMySQL module, MySQLdb is used in a similar way

2.1.1 Install PyMySQL

PyMySQL is a MySQL driver written in Python that allows us to operate the MySQL database in the Python language.

pip install PyMySQL
2.2 Basic use
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# __author__ = “shuke”
# Date: 2018/5/13

import pymysql

# create connection
conn = pymysql.connect(host=”127.0.0.1″, port=3306, user=’zff’, passwd=’zff123′, db=’zff’, charset=’utf8mb4′)

# Create a cursor (query data is returned in tuple format)
# cursor = conn. cursor()

# Create a cursor (the query data is returned in dictionary format)
cursor = conn.cursor(pymysql.cursors.DictCursor)

# 1. Execute SQL and return the number of affected rows
effect_row1 = cursor. execute(“select * from USER”)

# 2. Execute SQL, return the number of affected rows, and insert multiple rows of data at a time
effect_row2 = cursor. executemany(“insert into USER (NAME) values(%s)”, [(“jack”), (“boom”), (“lucy”)]) # 3

# Query all data, return data in tuple format
result = cursor. fetchall()

#Addition/deletion/modification requires commit submission and saving
conn.commit()

# close the cursor
cursor. close()

# close the connection
conn. close()

print(result)
“””
[{‘id’: 6, ‘name’: ‘boom’}, {‘id’: 5, ‘name’: ‘jack’}, {‘id’: 7, ‘name’: ‘lucy’}, { ‘id’: 4, ‘name’: ‘tome’}, {‘id’: 3, ‘name’: ‘zff’}, {‘id’: 1, ‘name’: ‘zhaofeng’}, {‘id ‘: 2, ‘name’: ‘zhaofengfeng02′}]
“””
2.3 Get the newly created data self-increment ID
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# __author__ = “shuke”
# Date: 2018/5/13

import pymysql

# create connection
conn = pymysql.connect(host=”127.0.0.1″, port=3306, user=’zff’, passwd=’zff123′, db=’zff’, charset=’utf8mb4′)

# Create a cursor (query data is returned in tuple format)
cursor = conn. cursor()

# Get newly created data auto-increment ID
effect_row = cursor.executemany(“insert into USER (NAME)values(%s)”, [(“eric”)])

# All additions, deletions and changes need to be submitted by commit
conn.commit()

# close the cursor
cursor. close()

# close the connection
conn. close()

new_id = cursor. lastrowid
print(new_id)
“””
8
“””
2.4 Query operation
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# __author__ = “shuke”
# Date: 2018/5/13

import pymysql

# create connection
conn = pymysql.connect(host=”127.0.0.1″, port=3306, user=’zff’, passwd=’zff123′, db=’zff’, charset=’utf8mb4′)

# create cursor
cursor = conn. cursor()

cursor. execute(“select * from USER”)

# Get the first row of data
row_1 = cursor. fetchone()

# Get the first n rows of data
row_2 = cursor. fetchmany(3)
#
# # Get all data
row_3 = cursor. fetchall()

# close the cursor
cursor. close()

# close the connection
conn. close()
print(row_1)
print(row_2)
print(row_3)
⚠️ When fetching data in order, you can use cursor.scroll(num,mode) to move the cursor position, such as:

cursor.scroll(1,mode=’relative’) # move relative to the current position

cursor.scroll(2,mode=’absolute’) # relative absolute position movement

2.5 Prevent SQL injection
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# __author__ = “shuke”
# Date: 2018/5/13

import pymysql

# create connection
conn = pymysql.connect(host=”127.0.0.1″, port=3306, user=’zff’, passwd=’zff123′, db=’zff’, charset=’utf8mb4′)

# create cursor
cursor = conn. cursor()

# There is a sql injection situation (do not splice SQL in the form of a formatted string)
sql = “insert into USER (NAME) values(‘%s’)” % (‘zhangsan’,)
effect_row = cursor. execute(sql)

# Correct way one
# The execute function accepts a tuple/list as an SQL parameter, and the number of elements can only be 1
sql = “insert into USER (NAME) values(%s)”
effect_row1 = cursor. execute(sql, [‘wang6’])
effect_row2 = cursor. execute(sql, (‘wang7’,))

# Correct way two
sql = “insert into USER (NAME) values(%(name)s)”
effect_row1 = cursor. execute(sql, {‘name’: ‘wudalang’})

# Write and insert multiple rows of data
effect_row2 = cursor.executemany(“insert into USER (NAME) values(%s)”, [(‘ermazi’), (‘dianxiaoer’)])

# submit
conn.commit()
# close the cursor
cursor. close()
# close the connection
conn. close()
In this way, SQL operations are safer. If you need more detailed documentation, refer to the PyMySQL documentation. However, it seems that the implementation of these SQL databases is not the same. The parameter placeholders of PyMySQL use C formatters such as %s, while the placeholders of the sqlite3 module that comes with Python seem to be question marks (?). So read the documentation carefully when using other databases. Welcome to PyMySQL’s documentation

3. Database connection pool
There is a problem with the above method, which can be satisfied in the case of single thread, and the program needs to frequently create and release connections to complete the operation on the database. Then, what problems will our program/script cause in the case of multi-threading? At this time, We need to use the database connection pool to solve this problem!

3.1 DBUtils module
DBUtils is a Python module for implementing database connection pools.

This connection pool has two connection modes:

Create a connection for each thread. Even if the thread calls the close method, it will not be closed, but the connection will be put back into the connection pool for its own thread to use again. When the thread terminates, the connection is automatically closed

Create a batch of connections to the connection pool for shared use by all threads (recommended)

3.2 Mode 1
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# __author__ = “shuke”
# Date: 2018/5/13

from DBUtils.PersistentDB import PersistentDB
import pymysql

POOL = PersistentDB(
creator=pymysql, # 使用链接数据库的模块
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:[“set datestyle to …”, “set time zone …”]
ping=0,
# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
closeable=False,
# 如果为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
threadlocal=None, # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
host=’127.0.0.1′,
port=3306,
user=’zff’,
password=’zff123′,
database=’zff’,
charset=’utf8′,
)

def func():
conn = POOL.connection(shareable=False)
cursor = conn.cursor()
cursor.execute(‘select * from USER’)
result = cursor.fetchall()
cursor.close()
conn.close()
return result

result = func()
print(result)

3.2 Mode 2

#! /usr/bin/env python
# -*- coding: utf-8 -*-
# __author__ = “shuke”
# Date: 2018/5/13

import time
import pymysql
import threading
from DBUtils.PooledDB import PooledDB, SharedDBConnection

POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3,
# 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:[“set datestyle to …”, “set time zone …”]
ping=0,
# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host=’127.0.0.1′,
port=3306,
user=’zff’,
password=’zff123′,
database=’zff’,
charset=’utf8′
)

def func():
# 检测当前正在运行连接数的是否小于最大链接数,如果不小于则:等待或报raise TooManyConnections异常
# 否则
# 则优先去初始化时创建的链接中获取链接 SteadyDBConnection。
# 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。
# 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。
# 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。
conn = POOL.connection()

# print(‘连接被拿走了’, conn._con)
# print(‘池子里目前有’, POOL._idle_cache, ‘\r\n’)

cursor = conn.cursor()
cursor.execute(‘select * from USER’)
result = cursor.fetchall()
conn.close()
return result

result = func()
print(result)

Since the threadsafety value of pymysql, MySQLdb, etc. is 1, the threads in the connection pool of this mode will be shared by all threads, so it is thread safe. If there is no connection pool, when using pymysql to connect to the database, there is no problem with single-threaded applications, but if it involves multi-threaded applications, then it needs to be locked. Once locked, the connection will inevitably wait in line. When there are many requests, the performance will be slow. will be lowered.

3.3 Locking

#! /usr/bin/env python
# -*- coding: utf-8 -*-
# __author__ = “shuke”
# Date: 2018/5/13

import pymysql
import threading
from threading import RLock

LOCK = RLock()
CONN = pymysql.connect(host=’127.0.0.1′,
port=3306,
user=’zff’,
password=’zff123′,
database=’zff’,
charset=’utf8′)

def task(arg):
with LOCK:
cursor = CONN.cursor()
cursor.execute(‘select * from USER ‘)
result = cursor.fetchall()
cursor.close()

print(result)

for i in range(10):
t = threading.Thread(target=task, args=(i,))
t.start()

3.4 No lock (error reporting)

#! /usr/bin/env python
# -*- coding: utf-8 -*-
# __author__ = “shuke”
# Date: 2018/5/13

import pymysql
import threading

CONN = pymysql.connect(host=’127.0.0.1′,
port=3306,
user=’zff’,
password=’zff123′,
database=’zff’,
charset=’utf8′)

def task(arg):
cursor = CONN.cursor()
cursor.execute(‘select * from USER ‘)
# cursor.execute(‘select sleep(10)’)
result = cursor.fetchall()
cursor.close()
print(result)

for i in range(10):
t = threading.Thread(target=task, args=(i,))
t.start()

At this point, you can view the connection status in the database: show status like ‘Threads%’;

4. The database connection pool is used in combination with pymsql

# cat sql_helper.py

import pymysql
import threading
from DBUtils.PooledDB import PooledDB, SharedDBConnection
POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=20, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
#maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:[“set datestyle to …”, “set time zone …”]
ping=0,
# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host=’192.168.11.38′,
port=3306,
user=’root’,
passwd=’apNXgF6RDitFtDQx’,
db=’m2day03db’,
charset=’utf8′
)

def connect():
# 创建连接
# conn = pymysql.connect(host=’192.168.11.38′, port=3306, user=’root’, passwd=’apNXgF6RDitFtDQx’, db=’m2day03db’)
conn = POOL.connection()
# 创建游标
cursor = conn.cursor(pymysql.cursors.DictCursor)

return conn,cursor

def close(conn,cursor):
# 关闭游标
cursor.close()
# 关闭连接
conn.close()

def fetch_one(sql,args):
conn,cursor = connect()
# 执行SQL,并返回收影响行数
effect_row = cursor.execute(sql,args)
result = cursor.fetchone()
close(conn,cursor)

return result

def fetch_all(sql,args):
conn, cursor = connect()

# 执行SQL,并返回收影响行数
cursor.execute(sql,args)
result = cursor.fetchall()

close(conn, cursor)
return result

def insert(sql,args):
“””
创建数据
:param sql: 含有占位符的SQL
:return:
“””
conn, cursor = connect()

# 执行SQL,并返回收影响行数
effect_row = cursor.execute(sql,args)
conn.commit()

close(conn, cursor)

def delete(sql,args):
“””
创建数据
:param sql: 含有占位符的SQL
:return:
“””
conn, cursor = connect()

# 执行SQL,并返回收影响行数
effect_row = cursor.execute(sql,args)

conn.commit()

close(conn, cursor)

return effect_row

def update(sql,args):
conn, cursor = connect()

# 执行SQL,并返回收影响行数
effect_row = cursor.execute(sql, args)

conn.commit()

close(conn, cursor)

return effect_row

PS: It can be encapsulated into a class by static method, which is convenient to use