Python操作mysql

一、python对MySQL的基本操作

1.连接数据

import pymysqlconn = pymysql.connect(host='127.0.0.1', port=3306, user='root', password='root123', charset='utf8')cursor = conn.cursor()

2.创建数据库

cursor.execute('create database db01 default charset utf8 collate utf8_general_ci')conn.commit()

3.进入数据库创建表

cursor.execute('use db01')sql_create_table_l01= """create table l01(id int not null primary key auto_increment,name varchar(32) not null )default charset=utf8;"""cursor.execute(sql_create_table_l01)conn.commit()

4. 查看数据库中的表

cursor.execute('show tables')result = cursor.fetchall()print(result)

5.新增数据

cursor.execute("insert into l01(id,name) values('1','流水')")conn.commit()

6.删除

cursor.execute("delete from l01 where id=1")conn.commit()

7.修改数据

cursor.execute("update tb1 set name='xx' where id=1")conn.commit()

8.查询数据

cursor.execute("select * from tb where id>10")data = cursor.fetchone() # cursor.fetchall()print(data)

9.关闭数据库

cursor.close()conn.close()

10.执行存储过程

import pymysqlconn = pymysql.connect(host='', port=3306, user='', password='', charset=utf8)cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)# 执行存储过程cursor.callproc('存储过程名称')result = cursor.fetchall()# 得到执行存储过中的结果集 # 执行带参数存储过程cursor.callproc('存储过程名称', args=('参数1','参数2', '参数3'))# 获取执行完存储的参数cursor.execute('select @_名称_0')result = cursor.fetchall()# {@_名称_0: 参数} cursor.close()conn.close()print(result)

其他略

二、python操作MySQL的应用

1.登录注册

import pymysqldef register():print("用户注册")user = input("请输入用户名:")password = input("请输入密码:")# 连接指定数据conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8", db="usersdb")cursor = conn.cursor()sql = 'insert into users(name,password) values("{}","{}")'.format(user, password)cursor.execute(sql)conn.commit()# 关闭数据库连接cursor.close()conn.close()print("注册成功,用户名:{},密码:{}".format(user, password))def login():print("用户登录")user = input("请输入用户名:")password = input("请输入密码:")# 连接指定数据conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8", db="usersdb")cursor = conn.cursor()sql = "select * from users where name='{}' and password='{}'".format(user, password)cursor.execute(sql)result = cursor.fetchone() # 去向mysql获取结果# 关闭数据库连接cursor.close()conn.close()if result:print("登录成功", result)else:print("登录失败")def run():choice = input("1.注册;2.登录")if choice == '1':register()elif choice == '2':login()else:print("输入错误")if __name__ == '__main__':run()

2.防止sql注入

1.注入示例

import pymysql# 输入用户名和密码user = input("请输入用户名:") # ' or 1=1 -- pwd = input("请输入密码:") # 123conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8",db='usersdb')cursor = conn.cursor()# 基于字符串格式化来 拼接SQL语句# sql = "select * from users where name='liu' and password='123'"# sql = "select * from users where name='' or 1=1 -- ' and password='123'"sql = "select * from users where name='{}' and password='{}'".format(user, pwd)cursor.execute(sql)result = cursor.fetchone()print(result) # None,不是Nonecursor.close()conn.close()

如果用户在输入user时,输入了: ’ or 1=1 – ,这样即使用户输入的密码不存在,也会可以通过验证。
原因:
在SQL拼接时,拼接后的结果是:

select * from users where name='' or 1=1 -- ' and password='123'

注意: 在mysql中 – 表示注释
切记, sql语句不要在使用Python的字符串格式化, 而是用pymysql的execute方法

2.防止sql注入示例

import pymysql# 输入用户名和密码user_name = input('请输入用户名: ')user_pwd = input('请输入密码: ')conn = pymysql.connect(host='', password=3306, user='', passwd='', charset='utf8',db='userdb')cursor = conn.cursor()cursor.execute('select * from users name=s% and password=s%',[user_name, user_pwd])# 或者cursor.execute('select * from users where name=%(n1)s and password=%(n2)s',{'n1':user_name, 'n2':user_pwd})result = cursor.fetchone()print(result)cursor.close()conn.close()

3.数据库连接池

在操作数控是需要使用数据库连接池

import pymysqlimport threadingfrom dbutils.pooled_db import PooledDBMYSQL_DB_POOL = PooledDB(creator=pymysql,# 使用链接数据库的模块maxconnections=5,# 连接池允许的最大连接数,0和None表示不限制连接数mincached=2,# 初始化时,链接池中至少创建的空闲的链接,0表示不创建maxcached=3,# 链接池中最多闲置的链接,0和None不限制blocking=True,# 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错setsession=[],# 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]ping=0,# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, # 2 = when a cursor is created, 4 = when a query is executed, 7 = alwayshost='',port=3306,user='',password='',database='',charset='')def task():# 去连接池获取一个连接conn = MYSQL_DB_POOL()cursor = conn.cursor(pymysql.cursors.DictCursor)cursor.execut('select sleep(2)')result = cursor.fetchall()print(result)cursor.close()conn.close()def run():for i in range(10):t = threading.Thread(target=task)t.start()if __name__ == '__main__':run()

4.sql工具类

基于数据库连接池开发一个功能sql操作类,方便以后操作数据库

  • 单列和方法
import pymysqlfrom dbutils.pooled_db import PooledDBclass DBHelper(object):def __init__(self):# TODO 此处配置,可以去配置文件中读取。self.pool = PooledDB(creator=pymysql,# 使用链接数据库的模块maxconnections=5,# 连接池允许的最大连接数,0和None表示不限制连接数mincached=2,# 初始化时,链接池中至少创建的空闲的链接,0表示不创建maxcached=3,# 链接池中最多闲置的链接,0和None不限制blocking=True,# 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错setsession=[],# 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]ping=0,# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = alwayshost='127.0.0.1',port=3306,user='root',password='root123',database='userdb',charset='utf8')def get_conn_cursor(self):conn = self.pool.connection()cursor = conn.cursor(pymysql.cursors.DictCursor)return conn, cursordef close_conn_cursor(self, *args):for item in args:item.close()def exec(self, sql, **kwargs):conn, cursor = self.get_conn_cursor()cursor.execute(sql, kwargs)conn.commit()self.close_conn_cursor(conn, cursor)def fetch_one(self, sql, **kwargs):conn, cursor = self.get_conn_cursor()cursor.execute(sql, kwargs)result = cursor.fetchone()self.close_conn_cursor(conn, cursor)return resultdef fetch_all(self, sql, **kwargs):conn, cursor = self.get_conn_cursor()cursor.execute(sql, kwargs)result = cursor.fetchall()self.close_conn_cursor(conn, cursor)return resultdb = DBHelper()
from db import dbdb.exec("insert into d1(name) values(%(name)s)", name="Kevin")ret = db.fetch_one("select * from d1")print(ret)ret = db.fetch_one("select * from d1 where id=%(nid)s", nid=3)print(ret)ret = db.fetch_all("select * from d1")print(ret)ret = db.fetch_all("select * from d1 where id>%(nid)s", nid=2)print(ret)
  • 上下文管理
import threadingimport pymysqlfrom dbutils.pooled_db import PooledDBPOOL = PooledDB(creator=pymysql,# 使用链接数据库的模块maxconnections=5,# 连接池允许的最大连接数,0和None表示不限制连接数mincached=2,# 初始化时,链接池中至少创建的空闲的链接,0表示不创建maxcached=3,# 链接池中最多闲置的链接,0和None不限制blocking=True,# 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错setsession=[],# 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]ping=0,host='127.0.0.1',port=3306,user='root',password='root123',database='userdb',charset='utf8')class Connect(object):def __init__(self):self.conn = conn = POOL.connection()self.cursor = conn.cursor(pymysql.cursors.DictCursor)def __enter__(self):return selfdef __exit__(self, exc_type, exc_val, exc_tb):self.cursor.close()self.conn.close()def exec(self, sql, **kwargs):self.cursor.execute(sql, kwargs)self.conn.commit()def fetch_one(self, sql, **kwargs):self.cursor.execute(sql, kwargs)result = self.cursor.fetchone()return resultdef fetch_all(self, sql, **kwargs):self.cursor.execute(sql, kwargs)result = self.cursor.fetchall()return result
from db_context import Connectwith Connect() as obj:# print(obj.conn)# print(obj.cursor)ret = obj.fetch_one("select * from d1")print(ret)ret = obj.fetch_one("select * from d1 where id=%(id)s", id=3)print(ret)