zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

pymysql ︱mysql的基本操作与dbutils+PooledDB使用

2023-04-18 14:12:16 时间

文章目录

0 安装依赖

pip3 install --pre pymysql -i https://pypi.tuna.tsinghua.edu.cn/simple 
pip3 install --pre sqlalchemy -i https://pypi.tuna.tsinghua.edu.cn/simple

1 连接方式

1.1 pymysql.connect直接连

import pymysql
# 连接database

user = "xxx"
password = "xxx"
host = 'xx.xx.xx.xx'
port = 3306

conn = pymysql.connect(host=host, user=user,password=password,database="xxxxxxx")
# 得到一个可以执行SQL语句的光标对象
cursor = conn.cursor()

1.2 pandas连接

参考:利用pandas的to_sql将数据插入MySQL数据库和所踩过的坑

from sqlalchemy import create_engine

engine = create_engine("mysql+pymysql://{}:{}@{}/{}?charset={}".format('用户名', '登录密码', '127.0.0.1:3306', '数据库名','字符编码'))
con = engine.connect()#创建连接
df.to_sql(name='rumousdata', con=con, if_exists='append', index=False)

1.3 dbutils+PooledDB连接

文章:python使用dbutils的PooledDB连接池,操作数据库

使用优势:

  • 1、使用dbutils的PooledDB连接池,操作数据库。 这样就不需要每次执行sql后都关闭数据库连接,频繁的创建连接,消耗时间
  • 2、如果是使用一个连接一直不关闭,多线程下,插入超长字符串到数据库,运行一段时间后很容易出现OperationalError: (2006, ‘MySQL server has gone away’)这个错误。

使用PooledDB解决。

self.__class__.__pool = PooledDB(pymysql,
                                 mincached, maxcached,
                                 maxshared, maxconnections, blocking,
                                 maxusage, setsession, reset,
                                 host=host, port=port, db=db,
                                 user=user, passwd=passwd,
                                 charset=charset,
                                 cursorclass=pymysql.cursors.DictCursor
                                 )

2 读/写/改

python3.6 使用 pymysql 连接 Mysql 数据库及 简单的增删改查操作

2.1 常规查询

import pymysql  #导入 pymysql

#打开数据库连接
db= pymysql.connect(host="localhost",user="root",
 	password="123456",db="test",port=3307)

# 使用cursor()方法获取操作游标
cur = db.cursor()

#1.查询操作
# 编写sql 查询语句  user 对应我的表名
sql = "select * from user"
try:
	cur.execute(sql) 	#执行sql语句

	results = cur.fetchall()	#获取查询的所有记录
	print("id","name","password")
	#遍历结果
	for row in results :
		id = row[0]
		name = row[1]
		password = row[2]
		print(id,name,password)
except Exception as e:
	raise e
finally:
	db.close()	#关闭连接

2.2 常规-写入

import pymysql
#2.插入操作
db= pymysql.connect(host="localhost",user="root",
 	password="123456",db="test",port=3307)

# 使用cursor()方法获取操作游标
cur = db.cursor()

sql_insert ="""insert into user(id,username,password) values(4,'liu','1234')"""

try:
	cur.execute(sql_insert)
	#提交
	db.commit()
except Exception as e:
	#错误回滚
	db.rollback() 
finally:
	db.close()

2.3 常规-批量写入

# 单条插入    
coin_cnts_2 = ['46213000',
 '14000',
 '1952',
 '249',
 '62000',
 '10000',
 '48',
 '6446',
 '121000']
    
for n,cot in tqdm(enumerate(coin_cnts_2),total = len(coin_cnts_2)):
    sql = 'UPDATE douyin_data_1 SET coin_cnt = {} WHERE id = {} '.format(cot,n+1)
    mq.execute(sql)    

    
# 批量插入
conn = pymysql.connect(host=mq.host, user=mq.user,password=mq.password,database=mq.database)
cursor = conn.cursor()
sql = 'UPDATE douyin_data_1 SET coin_cnt = %s WHERE id = %s '
data_info = [(cot,n) for n,cot in enumerate(coin_cnts_2)]
%time cursor.executemany(sql, data_info)

插入的速度比较慢

2.4 常规-更新

import pymysql
#3.更新操作
db= pymysql.connect(host="localhost",user="root",
 	password="123456",db="test",port=3307)

# 使用cursor()方法获取操作游标
cur = db.cursor()

sql_update ="update user set username = '%s' where id = %d"

try:
	cur.execute(sql_update % ("xiongda",3))  #像sql语句传递参数
	#提交
	db.commit()
except Exception as e:
	#错误回滚
	db.rollback() 
finally:
	db.close()

2.5 常规-删除

import pymysql
#4.删除操作
db= pymysql.connect(host="localhost",user="root",
 	password="123456",db="test",port=3307)

# 使用cursor()方法获取操作游标
cur = db.cursor()

sql_delete ="delete from user where id = %d"

try:
	cur.execute(sql_delete % (3))  #像sql语句传递参数
	#提交
	db.commit()
except Exception as e:
	#错误回滚
	db.rollback() 
finally:
	db.close()

2.6 pandas写回——to_sql

pd.to_sql()知道这些就够用了

DataFrame.to_sql(name, con, schema=None, 
if_exists='fail', index=True, index_label=None, 
chunksize=None, dtype=None, method=None)

参见pandas.to_sql函数,主要有以下几个参数:

  • name: 输出的表名
  • con: 与read_sql中相同,数据库链接
  • if_exits: 三个模式:fail,若表存在,则不输出;replace:若表存在,覆盖原来表里的数据;append:若表存在,将数据写到原表的后面。默认为fail
  • index:是否将df的index单独写到一列中
  • index_label:指定列作为df的index输出,此时index为True
  • chunksize: 同read_sql
  • dtype: 指定列的输出到数据库中的数据类型。字典形式储存:{column_name: sql_dtype}。常见的数据类型有sqlalchemy.types.INTEGER(), sqlalchemy.types.NVARCHAR(),sqlalchemy.Datetime()等,具体数据类型可以参考这里

使用to_sql的方式写回数据库之中。

import pandas as pd
from sqlalchemy import create_engine
yconnect = create_engine('mysql+pymysql://user:password@111.111.111.111:3306/your_database_name?charset=utf8')
pd.io.sql.to_sql(dataframe_name,'form_name', yconnect, schema='your_database_name', if_exists='append')

将数据写入mysql的数据库,但需要先通过sqlalchemy.create_engine建立连接,且字符编码设置为utf8,否则有些latin字符不能处理

  • 第二个参数tablename,form_name,是将导入的数据库中的表名
  • 第四个参数your_database_name是将导入的数据库名字
  • if_exists='append’的意思是,如果表tablename存在,则将数据添加到这个表的后面
    • fail的意思如果表存在,啥也不做
    • replace的意思,如果表存在,删了表,再建立一个新表,把数据插入
    • append的意思,如果表存在,把数据插入,如果表不存在创建一个表!!

字段字符格式的初始化:

df.to_sql(name='table', 
          con=con, 
          if_exists='append', 
          index=False,
          dtype={'col1':sqlalchemy.types.INTEGER(),
                 'col2':sqlalchemy.types.NVARCHAR(length=255),
                 'col_time':sqlalchemy.DateTime(),
                 'col_bool':sqlalchemy.types.Boolean
          })

如果不提供dtype,to_sql会自动根据df列的dtype选择默认的数据类型输出,比如字符型会以sqlalchemy.types.TEXT类型输出,相比NVARCHAR,TEXT类型的数据所占的空间更大,所以一般会指定输出为NVARCHAR; 而如果df的列的类型为np.int64时,将会导致无法识别并转换成INTEGER型,需要事先转换成int类型(用map,apply函数可以方便的转换)。

2.6.0 sqlalchemy的格式

自建格式的一些格式要求:

df.to_sql('emp_backup', engine, if_exists='replace', index=False,
          dtype={'EMP_ID': sqlalchemy.types.BigInteger(),
                 'GENDER': sqlalchemy.types.String(length=20),
                 'AGE': sqlalchemy.types.BigInteger(),
                 'EMAIL':  sqlalchemy.types.String(length=50),
                 'PHONE_NR':  sqlalchemy.types.String(length=50),
                 'EDUCATION':  sqlalchemy.types.String(length=50),
                 'MARITAL_STAT':  sqlalchemy.types.String(length=50),
                 'NR_OF_CHILDREN': sqlalchemy.types.BigInteger()
                 })

其他包括:

from sqlalchemy import exc
from sqlalchemy import schema as sa_schema
from sqlalchemy import types as sqltypes
from sqlalchemy import util
from sqlalchemy.engine import default
from sqlalchemy.engine import reflection
from sqlalchemy.sql import compiler
from sqlalchemy.sql import text
from sqlalchemy.types import BIGINT
from sqlalchemy.types import BINARY
from sqlalchemy.types import CHAR
from sqlalchemy.types import DATE
from sqlalchemy.types import DATETIME
from sqlalchemy.types import DECIMAL
from sqlalchemy.types import FLOAT
from sqlalchemy.types import INT  # noqa
from sqlalchemy.types import INTEGER
from sqlalchemy.types import NCHAR
from sqlalchemy.types import NUMERIC
from sqlalchemy.types import NVARCHAR
from sqlalchemy.types import REAL
from sqlalchemy.types import SMALLINT
from sqlalchemy.types import TEXT
from sqlalchemy.types import TIME
from sqlalchemy.types import TIMESTAMP
from sqlalchemy.types import Unicode
from sqlalchemy.types import VARBINARY
from sqlalchemy.types import VARCHAR

2.7 pandas 读出——read_sql

参考:https://blog.csdn.net/ydyang1126/article/details/78222125

# -*- coding: utf-8 -*-
import pandas as pd
import pymysql

conn = pymysql.connect(host='127.0.0.1', 
               user='root',password='123456', 
               db='TESTDB',charset='utf8', 
               use_unicode=True)

sql = 'select GroupName from group limit 20'
df = pd.read_sql(sql, con=conn)
print(df.head())

df.to_csv("data.csv")
conn.close()

2.8 SQL + pandas 来创建表结构

如果数据源本身是来自数据库,通过脚本操作是比较方便的。如果数据源是来自 CSV 之类的文本文件,可以手写 SQL 语句或者利用 pandas get_schema() 方法,如下例:

import sqlalchemy

print(pd.io.sql.get_schema(df, 'emp_backup', keys='EMP_ID', 
   dtype={'EMP_ID': sqlalchemy.types.BigInteger(),
       'GENDER': sqlalchemy.types.String(length=20),
       'AGE': sqlalchemy.types.BigInteger(),
       'EMAIL':  sqlalchemy.types.String(length=50),
       'PHONE_NR':  sqlalchemy.types.String(length=50),
       'EDUCATION':  sqlalchemy.types.String(length=50),
       'MARITAL_STAT':  sqlalchemy.types.String(length=50),
       'NR_OF_CHILDREN': sqlalchemy.types.BigInteger()
       }, con=engine))

get_schema()并不是一个公开的方法,没有文档可以查看。生成的 SQL 语句如下:

CREATE TABLE emp_backup (
        `EMP_ID` BIGINT NOT NULL AUTO_INCREMENT,
        `GENDER` VARCHAR(20),
        `AGE` BIGINT,
        `EMAIL` VARCHAR(50),
        `PHONE_NR` VARCHAR(50),
        `EDUCATION` VARCHAR(50),
        `MARITAL_STAT` VARCHAR(50),
        `NR_OF_CHILDREN` BIGINT,
        CONSTRAINT emp_pk PRIMARY KEY (`EMP_ID`)
)

当然,如果数据源本身就是 mysql,当然不用大费周章来创建数据表的结构,直接使用 create table like xxx 就行。以下代码展示了这种用法:

import pandas as pd 
from sqlalchemy import create_engine

engine = create_engine('mysql+pymysql://user:password@localhost/stonetest?charset=utf8')
df = pd.read_sql('emp_master', engine)

# Copy table structure
with engine.connect() as con:
    con.execute('DROP TABLE if exists emp_backup')
    con.execute('CREATE TABLE emp_backup LIKE emp_master;')

df.to_sql('emp_backup', engine, index=False, if_exists='append')

2.9 更新时间格式

update djk_corpus set create_time='2019-09-29 14:09:45'

2.10 to_sql 和常规insert的优劣势

python的to_sql那点儿事

to_sql结论

  • 可以对齐字段(dataframe的columns和数据库字段一一对齐)
  • 可以缺少字段(dataframe的columns可以比数据库字段少)
  • 不可以多出字段,会报错 -if_exists='append’进行新增(bug:如果设置了PK,ignore 和 replace会报错)
  • 一定要先创建好数据库,设置好格式,
  • 否则使用if_exists='append’自动创建的字段格式乱七八糟
  • 不能replace!!!!

to_sql代码

#构建数据库连接
engine=create_engine(f'mysql+pymysql://{user}:{passwd}@{host}:3306/{db}')

#可以对齐字段,以及缺少字段;不可以增加字段
data.to_sql(sql_name,engine,index=False,if_exists='append')

自定义w_sql (迭代后版本)

# 定义写入数据库函数
def w_sql(sql_name,data,db_name,host=host,user=user,passwd=passwd):
    zd=""
    for j in data.columns:
        zd=zd+j+","
    
    connent = pymysql.connect(host=host, user=user, passwd=passwd, db=db_name, charset='utf8mb4') #连接数据库 
    cursor = connent.cursor()#创建游标
    for i in data.values:
        va=""
        for j in i:
            if pd.isnull(j):
                va=va+","+'null' #缺失值判断和转换
            else:
                va=va+","+'"'+str(j)+'"'
#         sql=u"""insert ignore into %s (%s) values(%s)"""%(sql_name,zd[:-1],va[1:])
        sql=u"""replace into %s (%s) values(%s)"""%(sql_name,zd[:-1],va[1:])
        cursor.execute(sql)
        
    connent.commit() #提交事务
    cursor.close()#关闭游标
    connent.close()#断开连接

3 其他基础设置

3.1 更新注释

sql = "show full columns from douyin_data_1; " 
mq.query(sql)

3.2 批量修改字符串类型

ALTER TABLE `数据集` CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;

3.3 查看表名 + 列名

SELECT column_name FROM information_schema.columns
WHERE table_name='xxxx表名';

某个数据库下有多少张表

import pymysql
# 连接database

user = "xxx"
password = "xxx"
host = '1.1.1.1'
port = 3306

conn = pymysql.connect(host=host, user=user,password=password,database="xxx")
# 得到一个可以执行SQL语句的光标对象
cursor = conn.cursor()


database  = 'd-lab'
sql = "SELECT table_name FROM information_schema.tables WHERE table_schema='{}' AND table_type='base table';".format(database)
cursor.execute(sql)    # 执行sql语句
cursor.fetchall()    # 获取查询的所有记录

不同表的列名:

'show columns from {}; '.format(form_name)

3.4 指定唯一KEY

import pandas as pd
from sqlalchemy import create_engine
import sqlalchemy

engine = create_engine('mysql+pymysql://user:password@localhost/stonetest?charset=utf8')
df = pd.read_sql('emp_master', engine)
# make sure emp_master_backup table has been created
# so the table schema is what we want
df.to_sql('emp_backup', engine, index=False, if_exists='append')

也可以在 to_sql() 方法中,通过 dtype 参数指定字段的类型,然后在 mysql 中 通过 alter table 命令将字段 EMP_ID 变成 primary key。

df.to_sql('emp_backup', engine, if_exists='replace', index=False,
          dtype={'EMP_ID': sqlalchemy.types.BigInteger(),
                 'GENDER': sqlalchemy.types.String(length=20),
                 'AGE': sqlalchemy.types.BigInteger(),
                 'EMAIL':  sqlalchemy.types.String(length=50),
                 'PHONE_NR':  sqlalchemy.types.String(length=50),
                 'EDUCATION':  sqlalchemy.types.String(length=50),
                 'MARITAL_STAT':  sqlalchemy.types.String(length=50),
                 'NR_OF_CHILDREN': sqlalchemy.types.BigInteger()
                 })

with engine.connect() as con:
    con.execute('ALTER TABLE emp_backup ADD PRIMARY KEY (`EMP_ID`);')

3.5 left / right/inner Join 连接

其中包括:

  • left join(左联接) 返回包括左表中的所有记录和右表中联结字段相等的记录
  • right join(右联接) 返回包括右表中的所有记录和左表中联结字段相等的记录
  • inner join(等值连接) 只返回两个表中联结字段相等的行
select * from A
innerjoin B
on A.aID = B.bID

4 mysql文字查询

最简单的文字匹配

select * FROM xiaohongshu_article_3 WHERE content REGEXP "家居"

4.1 通配符查询 like

通配符查询: MySql的like语句中的通配符:百分号、下划线和escape

FROM [user] WHERE u_name REGEXP ‘^三’;     # 以三开头的句子
FROM [user] WHERE u_name REGEXP ‘三$’;     # 以三结尾的句子

其中: %:表示任意个或多个字符。可匹配任意类型和长度的字符。

select * from user where username like '%huxiao';   
select * from user where username like 'huxiao%';   
select * from user where username like '%huxiao%';   
SELECT * FROM [user] WHERE u_name LIKE ‘%三%猫%’  

其中,%huxiao,代表句尾 ‘%三%猫%’虽然能搜索出“三脚猫”,但不能搜索出符合条件的“张猫三”。

**_:表示任意单个字符。**匹配单个任意字符,它常用来限制表达式的字符长度语句:(可以代表一个中文字符)

select * from user where username like '_';   
select * from user where username like 'huxia_';   
select * from user where username like 'h_xiao';   

如果我就真的要查%或者_,怎么办呢?使用escape,转义字符后面的%或_就不作为通配符了,注意前面没有转义字符的%和_仍然起通配符作用 Sql代码

select username from gg_user where username like '%xiao/_%' escape '/';   
select username from gg_user where username like '%xiao/%%' escape '/';

4.2 多字段模糊匹配:

SELECT * FROM `magazine` WHERE CONCAT(`title`,`tag`,`description`) LIKE ‘%关键字%’

单字段多like模糊匹配:

SELECT * FROM [user] WHERE u_name LIKE '%三%' AND u_name LIKE '%猫%' 

多字段 + 多like联合匹配:(参考:https://www.cnblogs.com/chywx/p/10154990.html)

select * from djk_corpus where concat(TestSubject,AnswerA,AnswerB,AnswerC,AnswerD,AnswerE) NOT REGEXP "png|jpg|jpeg|gif"

TestSubject,AnswerA等这几个字段,都不包含,png,jpg等,这些字段 匹配 p1 或 p2 或 p3。例如,‘z|food’ 能匹配 “z” 或 “food”。’(z|f)ood’ 则匹配 “zood” 或 “food”。 但是不能写成’p1&p2’,只能用"|"来写

还可以使用其他,但是需要注意顺序关系: *(星号)和+(加号)都可以匹配多个该符号之前的字符。但是,+至少表示一个字符,而*可以表示0个字符。

SELECT * FROM baike369 WHERE name REGEXP 'a*c';
SELECT * FROM baike369 WHERE name REGEXP 'a+c';

其他方式:

select * from weibo_posts_21092h where 
(
content like '%益生菌%' 
OR content like '%益生元%' 
OR content like '%微生态%' 
OR content like '%后生元%'
OR content like '%微生物%'
)
AND 
(
content like '%皮肤%'
OR content like '%肌肤%'
OR content like '%口腔%'
OR content like '%肠道%'
OR content like '%身体%'
OR content like '%人体%'
);

4.3 正则模糊匹配

来自:MySQL匹配指定字符串的查询

从baike369表的name字段中查询包含“a”到“w”字母和数字以外的字符的记录。SQL代码如下:

SELECT * FROM baike369 WHERE name REGEXP '[^a-w0-9]';

查看name字段中查询包含“a”到“w”字母和数字以外的字符的记录的操作效果。

使用方括号([])可以将需要查询的字符组成一个字符集;通过“[abc]”可以查询包含a、b和c等3个字母中任何一个的记录。

SELECT * FROM baike369 WHERE name REGEXP '[ceo]';   

name字段中查询出包含数字的记录。

SELECT * FROM baike369 WHERE name REGEXP '[0-9]';

查询包含数字或者字母a、b和c的记录

SELECT * FROM baike369 WHERE name REGEXP '[0-9a-c]';

4.4 多个关键词匹配,并集关系(不是 | )

可行的是下面的这种:

data[data['col'].str.contains('(?=.*组合)(?=.*衣物)^.*$' , regex=True )]****

不可行的有:

data[data['col'].str.contains('组合(.*?)衣物' , regex=True )]
data[data['col'].str.contains('组合*衣物' , regex=True )]
data[data['col'].str.contains('组合+衣物' , regex=True )]
data[data['col'].str.contains('.*[(组合)(衣物)].*' , regex=True )]

5 报错类型

5.1 报错1:ProgrammingError

ProgrammingError: (1064, 'You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '\\"]", "1", "", "", "", "", "", "", "", "", "", "水", "7732-18-5", "WATER' at line 2')

有人说是因为转义问题,所以: 可以使用:pymysql.escape_string(cdv)对文字进行一定编码(有点像,as.numbic() )

import pymysql
pymysql.escape_string('大花田菁(SESBANIAGRANDIFLORA)花')

5.2 报错二:xF0x9Fx92x9CxF0x9F 字符编码报错

(1366, "Incorrect string value: 'xF0x9Fx92x9CxF0x9F...' for column 'text' at row,xF0x9Fx92x9CxF0x9F,

这种原因因为MySQL不识别有些字符的utf8编码(如表情字符),这时你需要指定连接字符编码为utf8mb4。数据表对应字段编码也改成utf8mb4

5.3 报错三: Incorrect string value

[ERR] 1366 - Incorrect string value: ‘xE4xB8xADxE5x86xB6…’ for column ‘楼盘名称’ at row 1

5.4 报错四:not all arguments converted during string formatting

文章:python3 pandas to_sql填坑

Execution failed on sql ‘SELECT name FROM sqlite_master WHERE type=‘table’ AND name=?;’: not all arguments converted during string formatting 数据库链接不再使用pymysql,而改用sqlalchemy,con=engine 而不是con=db 官方文档

但是,如果按照如上写法,在python3.6(我的python版本)环境下会出现找不到mysqldb模块错误! 正确的写法如下,因为python3将mysqldb改为pymysql了!!! mysql+pymysql://root:root@localhost:3306/tushare?charset=utf8

5.5 报错五:DB-Lib error message 20018

当我用pymmsql执行,

cursor.execute('SELECT * FROM persons WHERE salesrep=John ')

会报错:

DatabaseError: Execution failed on sql 'select * from fresh_ax_product where itemid = "d"': (207, b"Invalid column name 'd'.DB-Lib error message 20018, severity 16:
General SQL Server error: Check messages from the SQL Server
")

那么这里用pymmsql 来进行一些where语句的时候,就需要一些特殊的写入方式:

cursor.execute('SELECT * FROM persons WHERE salesrep=%s', 'John Doe')

参考:Python连接SQL Server数据库 - pymssql使用基础

5.6 报错六:合并表的时候,编码报错

Cannot resolve the collation conflict between "Chinese_PRC_CI_AS" and "Chinese_PRC_Stroke_90_CI_AS_WS" in the equal to operation. (468)

解决:

select a1.*,a2.* from
(select *,(left(txno,4)) ShopNumber from xxx1) a1
left join xxx2 a2
on a1.fet1 = a2.fet2
 COLLATE Chinese_PRC_CI_AS

6 一些笔者的自建函数

6.1 打包查询函数

import pymysql

class mysql_query():
    def __init__(self,user = "root",password = "123456",
                 host = '47.103.114.113',port = 3306,
                 database="d-lab"):
        self.user = user
        self.password = password
        self.host = host
        self.port = port
        self.database = database
    
    def query(self,sql):
        conn = pymysql.connect(host=self.host, user=self.user,password=self.password,database=self.database)
        # 得到一个可以执行SQL语句的光标对象
        cursor = conn.cursor()
        try:
            #sql = "SELECT * FROM xiaohongshu_article_2 limit 1000"
            cursor.execute(sql)    # 执行sql语句
            query_out = cursor.fetchall()    # 获取查询的所有记录
            
        except Exception as e:
            print(e)
        finally:
            conn.close()
        return query_out
        
    def execute(self,sql):
        conn = pymysql.connect(host=self.host, user=self.user,password=self.password,database=self.database)
        # 得到一个可以执行SQL语句的光标对象
        cursor = conn.cursor()
        try:
            #sql = "SELECT * FROM xiaohongshu_article_2 limit 1000"
            cursor.execute(sql)    # 执行sql语句
            conn.commit() #提交到数据库执行
            
        except Exception as e:
            print(e)
            conn.rollback() #发生错误后回滚
        finally:
            conn.close()
        return 'success'

mq = mysql_query(user = "xxx",password = "xxx",
                 host = 'xxx',port = 3306,
                 database="xxx")
                 
sql = "select count(*) from data "
mq.query(sql)
  • query,针对查
  • execute,专门针对更新

6.2 DButils的使用

python使用dbutils的PooledDB连接池,操作数据库

"""
使用DBUtils数据库连接池中的连接,操作数据库
OperationalError: (2006, ‘MySQL server has gone away’)
"""
import json
import pymysql
import datetime
from DBUtils.PooledDB import PooledDB
import pymysql


class MysqlClient(object):
    __pool = None;

    def __init__(self, mincached=10, maxcached=20, maxshared=10, maxconnections=200, blocking=True,
                 maxusage=100, setsession=None, reset=True,
                 host='127.0.0.1', port=3306, db='test',
                 user='root', passwd='123456', charset='utf8mb4'):
        """

        :param mincached:连接池中空闲连接的初始数量
        :param maxcached:连接池中空闲连接的最大数量
        :param maxshared:共享连接的最大数量
        :param maxconnections:创建连接池的最大数量
        :param blocking:超过最大连接数量时候的表现,为True等待连接数量下降,为false直接报错处理
        :param maxusage:单个连接的最大重复使用次数
        :param setsession:optional list of SQL commands that may serve to prepare
            the session, e.g. ["set datestyle to ...", "set time zone ..."]
        :param reset:how connections should be reset when returned to the pool
            (False or None to rollback transcations started with begin(),
            True to always issue a rollback for safety's sake)
        :param host:数据库ip地址
        :param port:数据库端口
        :param db:库名
        :param user:用户名
        :param passwd:密码
        :param charset:字符编码
        """

        if not self.__pool:
            self.__class__.__pool = PooledDB(pymysql,
                                             mincached, maxcached,
                                             maxshared, maxconnections, blocking,
                                             maxusage, setsession, reset,
                                             host=host, port=port, db=db,
                                             user=user, passwd=passwd,
                                             charset=charset,
                                             cursorclass=pymysql.cursors.DictCursor
                                             )
        self._conn = None
        self._cursor = None
        self.__get_conn()

    def __get_conn(self):
        self._conn = self.__pool.connection();
        self._cursor = self._conn.cursor();

    def close(self):
        try:
            self._cursor.close()
            self._conn.close()
        except Exception as e:
            print e

    def __execute(self, sql, param=()):
        count = self._cursor.execute(sql, param)
        print count
        return count

    @staticmethod
    def __dict_datetime_obj_to_str(result_dict):
        """把字典里面的datatime对象转成字符串,使json转换不出错"""
        if result_dict:
            result_replace = {k: v.__str__() for k, v in result_dict.items() if isinstance(v, datetime.datetime)}
            result_dict.update(result_replace)
        return result_dict

    def select_one(self, sql, param=()):
        """查询单个结果"""
        count = self.__execute(sql, param)
        result = self._cursor.fetchone()
        """:type result:dict"""
        result = self.__dict_datetime_obj_to_str(result)
        return count, result

    def select_many(self, sql, param=()):
        """
        查询多个结果
        :param sql: qsl语句
        :param param: sql参数
        :return: 结果数量和查询结果集
        """
        count = self.__execute(sql, param)
        result = self._cursor.fetchall()
        """:type result:list"""
        [self.__dict_datetime_obj_to_str(row_dict) for row_dict in result]
        return count, result

    def execute(self, sql, param=()):
        count = self.__execute(sql, param)
        return count

    def begin(self):
        """开启事务"""
        self._conn.autocommit(0)

    def end(self, option='commit'):
        """结束事务"""
        if option == 'commit':
            self._conn.autocommit()
        else:
            self._conn.rollback()


if __name__ == "__main__":
    mc = MysqlClient()
    sql1 = 'SELECT * FROM shiji  WHERE  id = 1'
    result1 = mc.select_one(sql1)
    print json.dumps(result1[1], ensure_ascii=False)

    sql2 = 'SELECT * FROM shiji  WHERE  id IN (%s,%s,%s)'
    param = (2, 3, 4)
    print json.dumps(mc.select_many(sql2, param)[1], ensure_ascii=False)

7 一些应用

7.1 时间创建与写入

利用to_sql导入数据

import pandas as pd
import datetime
import pandas as pd
import sqlalchemy
from sqlalchemy import create_engine

yconnect = create_engine('mysql+pymysql://user:password@111.111.111.111:3306/your_database_name?charset=utf8mb4')

df = pd.DataFrame({'序号':[2],'id':[2],'name2':["user 10"],'time':[datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")]})
# 导入
df.to_sql(name='test_8', con=yconnect, if_exists='append', index=False,dtype = {'time':sqlalchemy.DateTime()})

导入的定义不同字段的数据格式 如果,表格里面该字段已经是时间格式了,那么就可以直接插入:

# sql语句:
table_name = 'test_8'
sql = "update {} set time='{}' where id= {}".format(table_name,datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),2)
out = mc._cursor.execute(sql,)

>>> update test_8 set time='2020-12-15 09:39:05' where id= 2

反正几种py时间生成的方式,都是符合规定:

#第一种
update_time=datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
#第二种
update_time=time.strftime('%Y-%m-%d %X',time.localtime(time.time()))

7.2 利用Pandas快速读入mysql / mmsql

简单写了一个可以连接mysql / mmsql的小函数,通过Pandas直接调用

import pymssql
import pymysql
import pandas as pd
import numpy as np
from collections import Counter

def connect_db(host= '0.0.0.0' , user='xxx',
               password= 'xxx', database = 'xxx'):
    conn = pymssql.connect(host , user , password, database)
    return conn

def fetch_data_from_db(sql_query, host, user, password, database):
    """
    Fetch data from SQL server
    """
    con = connect_db(host , user , password, database)
    
    cursor = con.cursor()
    cursor.execute(sql_query)
    result = cursor.fetchall()
    col_result = cursor.description
    columns = []
    for i in range(len(col_result)):
        columns.append(col_result[i][0])
    df = pd.DataFrame(list(result), columns = columns)
    con.close()
    return df

class pandas_read():
    def __init__(self,sql_config,types = 'mmsql'):
        self.sql_config = sql_config
        if types == 'mmsql':
            self.conn = self.get_conn_mmsql(sql_config)
        if types == 'mysql':
            self.conn = self.get_conn_mysql(sql_config)
    
    def get_conn_mmsql(self,mmsql_config):
        conn = pymssql.connect(mmsql_config['host'] , mmsql_config['user'] ,
                               mmsql_config['password'], mmsql_config['db'])
        return conn
    
    def get_conn_mysql(self,mysql_config):
        conn = pymysql.connect(host=mysql_config['host'], 
                       user=mysql_config['user'],password=mysql_config['password'], 
                       db=mysql_config['db'],charset='utf8', 
                       use_unicode=True)
        return conn

    def read_sql(self,sql):
        return pd.read_sql(sql, con=self.conn)



if __name__ == "__main__":
    user = 'xxx'
    password = 'xxx'
    host = '0.0.0.0'
    database = 'xxx'
    
    # 查询
    sql_query = 'select top 5000  * from database'
    data = fetch_data_from_db(sql_query, host, user, password, database)


    # pandas 读入
    sql_config = {
    'user':user,
    'password':password,
    'host':host,
    'db':'xxxdb'
    }

    pr = pandas_read(sql_config,types = 'mmsql')
    
    # 查询
    sql = 'select * from database'
    data = pr.read_sql(sql)