Python3:sqlalchemy对sybase数据库操作,非sql语句详解编程语言
2023-06-13 09:11:43 时间
from selenium import webdriver
from sqlalchemy import Column,Integer, String,DateTime,create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import and_,func
import configparser
import math
logger = logging.getLogger()
#set loghandler
file = logging.FileHandler(sys.path[0]+"/py_zgjs_log"+time.strftime("%Y%m%d")+".log")
logger.addHandler(file)
#set formater
formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
file.setFormatter(formatter)
#set log level
logger.setLevel(logging.NOTSET)
# 创建对象的基类:
Base = declarative_base()
class Yztzzqktjb(Base):
# 表名
__tablename__ = t_wlpc_zgjs_yztzzqktjb
# 表结构
id = Column(Integer,primary_key=True,autoincrement=True)
mc = Column(String(200),nullable=False)
begindate = Column(String(45),nullable=False)
enddate = Column(String(45), nullable=False)
sjmc = Column(String(200))
ssjmc = Column(String(200))
sl = Column(String(45))
create_time = Column(DateTime,nullable=False)
update_time = Column(DateTime,nullable=False)
def __init__(self,mc,begindate,enddate,sjmc,ssjmc,sl,create_time,update_time):
self.mc = mc
self.begindate = begindate
self.enddate = enddate
self.sjmc = sjmc
self.ssjmc = ssjmc
self.sl = sl
self.create_time = create_time
self.update_time = update_time
class ZgjsEntry(object):
def __init__(self, v1, v2,v3,v4,v5,v6):
self.v1 = v1
self.v2 = v2
self.v3 = v3
self.v4 = v4
self.v5 = v5
self.v6 = v6
def __get__(self, instance, cls):
if instance is None:
return self
else:
return instance.__dict__[self.name]
def __set__(self, instance, value):
instance.__dict__[self.name] = value
def __delete__(self, instance):
del instance.__dict__[self.name]
def dbconfig():
#生成config对象
cfg = configparser.ConfigParser()
#用config对象读取配置文件
path_ = sys.path[0]
cfg.read(path_+"/dbconfig.ini")
dns = cfg.get("dbsybase", "dns")
user = cfg.get("dbsybase", "user")
password = cfg.get("dbsybase", "password")
endtime = cfg.get("dbtime", "endtime")
initdate = cfg.get("dbtime", "mzkbinitdate")
interval = cfg.get("dbtime", "interval")
return (dns,user,password,endtime,initdate,interval)
def savrData(tableName,zgjsList):
msgcode = 0
message = 数据保存成功
try:
dbcfg = dbconfig()
# 初始化数据库连接,
engine = create_engine(sybase+pyodbc://+dbcfg[1]+:+dbcfg[2]+@+dbcfg[0])
# 创建DBSession类型:
DBSession = sessionmaker(bind=engine)
session = DBSession()
try:
# 增操作
items = []
if tableName == Yztzzqktjb:
if len(zgjsList) 0:
for i in range(0,len(zgjsList)):
results = session.query(Yztzzqktjb).filter(and_(Yztzzqktjb.mc == zgjsList[i].v1,Yztzzqktjb.begindate == zgjsList[i].v2,Yztzzqktjb.enddate==zgjsList[i].v3,Yztzzqktjb.sjmc==zgjsList[i].v4,Yztzzqktjb.ssjmc==zgjsList[i].v5)).all()
if len(results) 0:
session.query(Yztzzqktjb).filter(and_(Yztzzqktjb.mc == zgjsList[i].v1,Yztzzqktjb.begindate == zgjsList[i].v2,Yztzzqktjb.enddate==zgjsList[i].v3,Yztzzqktjb.sjmc==zgjsList[i].v4,Yztzzqktjb.ssjmc==zgjsList[i].v5)).update({Yztzzqktjb.sl: zgjsList[i].v6,Yztzzqktjb.update_time:time.strftime(%Y-%m-%d %H:%M:%S)}, synchronize_session=False)
else:
item = Yztzzqktjb(mc=zgjsList[i].v1,begindate=zgjsList[i].v2,enddate=zgjsList[i].v3,sjmc=zgjsList[i].v4,ssjmc=zgjsList[i].v5,sl=zgjsList[i].v6,create_time=time.strftime(%Y-%m-%d %H:%M:%S),update_time=time.strftime(%Y-%m-%d %H:%M:%S))
items.append(item)
else:
pass
#print("len(items) %s" %len(items))
if len(items) 0:
for i in range(0,len(items)):
session.add(items[i])
#提交数据
session.commit()
except Exception as e:
msgcode = 1
message = 数据保存失败 + str(e)
session.rollback()
finally:
#关闭
session.close()
except Exception as e:
msgcode = 1
message = 数据库连接失败+str(e)
logger.info(message)
print(message)
return msgcode
# http://www.******.cn/cms-search/view.action?action=china url = "http://www.******.cn/cms-search/view.action?action=china" headerDict = {Host: www.******.cn, User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.31 Safari/537.36, Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8, Accept-Language: zh-CN,zh;q=0.8, Accept-Encoding: gzip, deflate, Referer: http://www.******.cn/cms-search/view.action?action=china, Connection: keep-alive} data = {dateType: , dateStr: dateStr, channelIdStr: channelIdStr} # psot 传递参数 res = requests.post(url, data=data, headers=headerDict) # 获取跳转后的页面源码 soup = BeautifulSoup(res.content, "html.parser") #获取周报的起始日期 SettlementTitle = soup.find(div,class_=SettlementTitle) if SettlementTitle is None: return zgjsList h2 = SettlementTitle.find(h2).text if h2 == 搜索结果: return zgjsList weekdate = h2.strip().split(()[1].split())[0] begindate = weekdate.split(-)[0].replace(.,-) enddate = weekdate.split(-)[1].replace(.,-) settlementList = soup.find(id=settlementList) # print(settlementList) if settlementList is None: return zgjsList if settlementList.find(table) is None: return zgjsList table_ = settlementList.find(table) tr_list = table_.find(table).find_all(tr) # 上级名称 sjmc_1 = sjmc_2 = sjmc_3 = sjmc_4 = sjmc_5 = sjmc_6 = # 上上级名称 ssjmc_1 = for n in range(1,len(tr_list)): td_list = tr_list[n].find_all(td) if tableName == Yztzzqktjb: if n == 1: sjmc_1 = td_list[0].get_text().replace(一、,).strip() if n == 4: sjmc_2 = td_list[0].get_text().replace(二、,).strip() ssjmc_1 = td_list[0].get_text().replace(二、,).strip() if n == 5: sjmc_3 = td_list[0].get_text().replace(1、,).strip() if n == 9: sjmc_4 = td_list[0].get_text().replace(2、,).strip() if n == 13: sjmc_5 = td_list[0].get_text().replace(三、,).strip() if n == 17: sjmc_6 = td_list[0].get_text().replace(四、,).strip() if tableName == Yztzzqktjb: if n in (6,10,14,18): continue zgjs = ZgjsEntry(,,,,,) zgjs.v2 = begindate zgjs.v3 = enddate if tableName == Yztzzqktjb: # 上级名称 if n in (2,3): zgjs.v4 = sjmc_1 if n in (5,9): zgjs.v4 = sjmc_2 if n in (7,8): zgjs.v4 = sjmc_3 zgjs.v5 = ssjmc_1 if n in (11,12): zgjs.v4 = sjmc_4 zgjs.v5 = ssjmc_1 if n in (15,16): zgjs.v4 = sjmc_5 if n in (19,20): zgjs.v4 = sjmc_6 for i in range(0,len(td_list)): if i == 0: zgjs.v1 =td_list[i].get_text().replace(一、,).replace(二、,).replace(三、,).replace(四、,).replace(1、,).replace(2、,).strip() if i == 1: zgjs.v6 =td_list[i].get_text().strip().replace(,,) if zgjs is not None: zgjsList.append(zgjs) return zgjsList # 获取开始日期: def getBeginDate(bgdate,tableName): r_date = bgdate try: dbcfg = dbconfig() # 初始化数据库连接, engine = create_engine(sybase+pyodbc://+dbcfg[1]+:+dbcfg[2]+@+dbcfg[0]) # 创建DBSession类型: DBSession = sessionmaker(bind=engine) session = DBSession() try: if tableName == Yztzzqktjb: results = session.query(func.max(Yztzzqktjb.enddate)).all() if len(results) != 0: r_date = results[0] else: pass except Exception as e: print(获取开始日期,查询异常;%s%str(e)) logger(获取开始日期,查询异常;%s%str(e)) session.rollback() finally: #关闭 session.close() except Exception as e: print(获取开始日期,数据库连接失败;%s%str(e)) logger(获取开始日期,数据库连接失败;%s%str(e)) if r_date[0] is None: r_date = bgdate else: begin = time.strptime(r_date[0], "%Y-%m-%d") y,m,d = begin[0:3] r_date = datetime.date(y,m,d) + datetime.timedelta(days=7) r_date = r_date .strftime(%Y-%m-%d) return r_date def isCheckData(date_): r_code = 0 try: dbcfg = dbconfig() # 初始化数据库连接, engine = create_engine(sybase+pyodbc://+dbcfg[1]+:+dbcfg[2]+@+dbcfg[0]) # 创建DBSession类型: DBSession = sessionmaker(bind=engine) session = DBSession() try: if len(results) == 0: r_code = 1 else: r_code = 0 except Exception as e: r_code = 1 print(判断是否有数据异常;%s%str(e)) logger(判断是否有数据异常;%s%str(e)) session.rollback() finally: #关闭 session.close() except Exception as e: r_code = 1 print(判断是否有数据,数据库连接异常;%s%str(e)) logger(判断是否有数据,数据库连接异常;%s%str(e)) #return r_code return 0 # 执行入口 def main(initdate_): req_list = [ {report:6ac54ce22db4474abc234d6edbe53ae7,table:Yztzzqktjb} for req in req_list: #字符转日期 begin = time.strptime(getBeginDate(initdate_,req[table]), "%Y-%m-%d") y,m,d = begin[0:3] #日期格式:2018-01-18 begin = datetime.date(y,m,d) #获取当前日期 end = datetime.date.today() if (end- begin).days 0: pass else: for i in range(math.ceil((end - begin).days/7)+1): list_szzj = [] # 日期转字符 date_ = (begin+datetime.timedelta(days=i*7)).strftime(%Y-%m-%d) list_mzkb = getData(date_,req[report],req[table]) if len(list_mzkb): savrData(req[table],list_mzkb) else: pass time.sleep(0.5) if i % 350 == 0: time.sleep(15)
vrg_endtime = dbcfg[3][0:2]+":"+dbcfg[3][2:4]+":"+dbcfg[3][4:6] var_initdate = dbcfg[4][0:4]+"-"+dbcfg[4][4:6]+"-"+dbcfg[4][6:8] var_interval = int(dbcfg[5]) if len(vrg_date) ==8: vrg_date = str(vrg_date[0:4]) + "-" + str(vrg_date[4:6]) + "-" + str(vrg_date[6:8]) end_time = time.strptime(vrg_endtime, "%H:%M:%S") y,m,d = end_time[3:6] end_time = datetime.time(y,m,d) logger.info("采集数据开始") print("采集数据开始") # 循环采集 while True: now_time = time.strftime("%H%M%S") main(var_initdate) if isCheckData(vrg_date,) == 0: logger.info("采集数据结束") print("采集数据结束") break # 时间到停止采集 if int(end_time.strftime(%H%M%S)) - int(now_time) = 0: logger.info("采集数据结束") print("采集数据结束") break # 间隔执行时间 logger.info("**********************(%s):没有采集到数据,任务继续执行**********************" %vrg_date) print("********************(%s):没有采集到数据,任务继续执行**********************" %vrg_date) time.sleep(var_interval) else: logger.info("日期参数格式不正确,请用格式:20180205") print("日期参数格式不正确,请用格式:20180205")
# http://www.******.cn/cms-search/view.action?action=china url = "http://www.******.cn/cms-search/view.action?action=china" headerDict = {Host: www.******.cn, User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.31 Safari/537.36, Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8, Accept-Language: zh-CN,zh;q=0.8, Accept-Encoding: gzip, deflate, Referer: http://www.******.cn/cms-search/view.action?action=china, Connection: keep-alive} data = {dateType: , dateStr: dateStr, channelIdStr: channelIdStr} # psot 传递参数 res = requests.post(url, data=data, headers=headerDict) # 获取跳转后的页面源码 soup = BeautifulSoup(res.content, "html.parser") #获取周报的起始日期 SettlementTitle = soup.find(div,class_=SettlementTitle) if SettlementTitle is None: return zgjsList h2 = SettlementTitle.find(h2).text if h2 == 搜索结果: return zgjsList weekdate = h2.strip().split(()[1].split())[0] begindate = weekdate.split(-)[0].replace(.,-) enddate = weekdate.split(-)[1].replace(.,-) settlementList = soup.find(id=settlementList) # print(settlementList) if settlementList is None: return zgjsList if settlementList.find(table) is None: return zgjsList table_ = settlementList.find(table) tr_list = table_.find(table).find_all(tr) # 上级名称 sjmc_1 = sjmc_2 = sjmc_3 = sjmc_4 = sjmc_5 = sjmc_6 = # 上上级名称 ssjmc_1 = for n in range(1,len(tr_list)): td_list = tr_list[n].find_all(td) if tableName == Yztzzqktjb: if n == 1: sjmc_1 = td_list[0].get_text().replace(一、,).strip() if n == 4: sjmc_2 = td_list[0].get_text().replace(二、,).strip() ssjmc_1 = td_list[0].get_text().replace(二、,).strip() if n == 5: sjmc_3 = td_list[0].get_text().replace(1、,).strip() if n == 9: sjmc_4 = td_list[0].get_text().replace(2、,).strip() if n == 13: sjmc_5 = td_list[0].get_text().replace(三、,).strip() if n == 17: sjmc_6 = td_list[0].get_text().replace(四、,).strip() if tableName == Yztzzqktjb: if n in (6,10,14,18): continue zgjs = ZgjsEntry(,,,,,) zgjs.v2 = begindate zgjs.v3 = enddate if tableName == Yztzzqktjb: # 上级名称 if n in (2,3): zgjs.v4 = sjmc_1 if n in (5,9): zgjs.v4 = sjmc_2 if n in (7,8): zgjs.v4 = sjmc_3 zgjs.v5 = ssjmc_1 if n in (11,12): zgjs.v4 = sjmc_4 zgjs.v5 = ssjmc_1 if n in (15,16): zgjs.v4 = sjmc_5 if n in (19,20): zgjs.v4 = sjmc_6 for i in range(0,len(td_list)): if i == 0: zgjs.v1 =td_list[i].get_text().replace(一、,).replace(二、,).replace(三、,).replace(四、,).replace(1、,).replace(2、,).strip() if i == 1: zgjs.v6 =td_list[i].get_text().strip().replace(,,) if zgjs is not None: zgjsList.append(zgjs) return zgjsList # 获取开始日期: def getBeginDate(bgdate,tableName): r_date = bgdate try: dbcfg = dbconfig() # 初始化数据库连接, engine = create_engine(sybase+pyodbc://+dbcfg[1]+:+dbcfg[2]+@+dbcfg[0]) # 创建DBSession类型: DBSession = sessionmaker(bind=engine) session = DBSession() try: if tableName == Yztzzqktjb: results = session.query(func.max(Yztzzqktjb.enddate)).all() if len(results) != 0: r_date = results[0] else: pass except Exception as e: print(获取开始日期,查询异常;%s%str(e)) logger(获取开始日期,查询异常;%s%str(e)) session.rollback() finally: #关闭 session.close() except Exception as e: print(获取开始日期,数据库连接失败;%s%str(e)) logger(获取开始日期,数据库连接失败;%s%str(e)) if r_date[0] is None: r_date = bgdate else: begin = time.strptime(r_date[0], "%Y-%m-%d") y,m,d = begin[0:3] r_date = datetime.date(y,m,d) + datetime.timedelta(days=7) r_date = r_date .strftime(%Y-%m-%d) return r_date def isCheckData(date_): r_code = 0 try: dbcfg = dbconfig() # 初始化数据库连接, engine = create_engine(sybase+pyodbc://+dbcfg[1]+:+dbcfg[2]+@+dbcfg[0]) # 创建DBSession类型: DBSession = sessionmaker(bind=engine) session = DBSession() try: if len(results) == 0: r_code = 1 else: r_code = 0 except Exception as e: r_code = 1 print(判断是否有数据异常;%s%str(e)) logger(判断是否有数据异常;%s%str(e)) session.rollback() finally: #关闭 session.close() except Exception as e: r_code = 1 print(判断是否有数据,数据库连接异常;%s%str(e)) logger(判断是否有数据,数据库连接异常;%s%str(e)) #return r_code return 0 # 执行入口 def main(initdate_): req_list = [ {report:6ac54ce22db4474abc234d6edbe53ae7,table:Yztzzqktjb} for req in req_list: #字符转日期 begin = time.strptime(getBeginDate(initdate_,req[table]), "%Y-%m-%d") y,m,d = begin[0:3] #日期格式:2018-01-18 begin = datetime.date(y,m,d) #获取当前日期 end = datetime.date.today() if (end- begin).days 0: pass else: for i in range(math.ceil((end - begin).days/7)+1): list_szzj = [] # 日期转字符 date_ = (begin+datetime.timedelta(days=i*7)).strftime(%Y-%m-%d) list_mzkb = getData(date_,req[report],req[table]) if len(list_mzkb): savrData(req[table],list_mzkb) else: pass time.sleep(0.5) if i % 350 == 0: time.sleep(15)
vrg_endtime = dbcfg[3][0:2]+":"+dbcfg[3][2:4]+":"+dbcfg[3][4:6] var_initdate = dbcfg[4][0:4]+"-"+dbcfg[4][4:6]+"-"+dbcfg[4][6:8] var_interval = int(dbcfg[5]) if len(vrg_date) ==8: vrg_date = str(vrg_date[0:4]) + "-" + str(vrg_date[4:6]) + "-" + str(vrg_date[6:8]) end_time = time.strptime(vrg_endtime, "%H:%M:%S") y,m,d = end_time[3:6] end_time = datetime.time(y,m,d) logger.info("采集数据开始") print("采集数据开始") # 循环采集 while True: now_time = time.strftime("%H%M%S") main(var_initdate) if isCheckData(vrg_date,) == 0: logger.info("采集数据结束") print("采集数据结束") break # 时间到停止采集 if int(end_time.strftime(%H%M%S)) - int(now_time) = 0: logger.info("采集数据结束") print("采集数据结束") break # 间隔执行时间 logger.info("**********************(%s):没有采集到数据,任务继续执行**********************" %vrg_date) print("********************(%s):没有采集到数据,任务继续执行**********************" %vrg_date) time.sleep(var_interval) else: logger.info("日期参数格式不正确,请用格式:20180205") print("日期参数格式不正确,请用格式:20180205")
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/16706.html
cjavapythonwindowsxml相关文章
- python3多行注释快捷键_geany注释快捷键
- Python3爬虫学习.md
- murmurhash2算法python3版本
- python3爆破服务器_dirsearch「建议收藏」
- python3_基础
- Python3 图片打水印详解编程语言
- 快速导出Oracle数据库表记录SQL语句大全(oracle导出表数据sql)
- python3 获取cookie解决方案详解编程语言
- Python3:Django连接Mysql数据库时出错,’Did you install mysqlclient or MySQL-python’详解编程语言
- Python3.x: pyodbc+FreeTDS+UinxODBC连接sybase数据库(Linux系统)详解编程语言
- Python3.x:定时获取页面数据存入数据库详解编程语言
- Python3.x:os.listdir和os.walk(获取路径方法)的区别详解编程语言
- Oracle数据库表连接SQL技术指南(oracle表连接sql)
- 模式进入Oracle数据库的SQL模式(oracle进入sql)
- SQL数据库迁移至MySQL:实现快速导入(sql数据库导入mysql)
- Python3.x:使用PyMysql连接Mysql数据库详解编程语言
- 如何在linux系统上安装python3?(linux安装python3)
- 优化Oracle耗时SQL,提升数据库性能(oracle耗时sql)
- 如何使用SQL快速导入MSSQL数据库(sql如何导入mssql)
- Oracle SQL跟踪:如何优化数据库性能?(oracle跟踪sql)
- 探索Oracle的SQL跟踪工具:优化数据库性能的重要利器(oracle跟踪sql工具)
- 脚本MSSQL自动生成SQL脚本:实现数据库快速保存(mssql生成sql)
- 深入浅出:精通Oracle数据库SQL语句(oracle数据库sql语句)
- 利用Oracle主键SQL优化数据库性能(oracle主键sql)
- 使用SQL语句查询Redis数据库的操作方法(sql语句查询redis)
- 使用Oracle动态SQL脚本提升数据库性能(oracle中动态sql)
- Oracle SQL实现数据库修改的技巧(oracle sql修改)
- python3生成随机数实例