zl程序教程

您现在的位置是:首页 >  后端

当前栏目

Python开发-pandas导入数据库

2023-06-13 09:17:23 时间

本文介绍利用requests和pandas将API接口返回的数据分别导入Oracle和MySQL数据库以便使用。

导入Oracle代码如下:

import requests
import pandas as pd
import sqlalchemy as SAC
from sqlalchemy import create_engine
from datetime import datetime,timedelta
import retrying
import time
import cx_Oracle
import sqlalchemy.dialects.oracle as OType
import logging
import logging.handlers

conStr=cx_Oracle.makedsn('XXX','XXX',service_name='XXX')

engine = create_engine('oracle+cx_oracle://XXX:XXX@'+conStr)

logger=logging.Logger(__name__)
handler=logging.handlers.TimedRotatingFileHandler('log',when='d')

formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)

logger.addHandler(handler)

@retrying.retry()
def retryGet(url):
    time.sleep(10)
    res=requests.get(url)
    data=res.json()
    print(type(data))
    if(isinstance(data,int)):
        print(res.content)
        raise Exception()
    return data
def getPS():
    
    GetPSListUrl='XXX'
    psList=retryGet(GetPSListUrl)
    
    df_PS=pd.DataFrame(psList)
    df_PS.drop(columns=["token_id","error_web"],inplace=True)
    return df_PS
def savePS_oracle(df_table):
    
    dtype={"PS_CODE":OType.VARCHAR2(80),"PS_NAME":OType.VARCHAR2(400),"REGION_CODE":OType.VARCHAR2(40),
           "REGION_NAME":OType.VARCHAR2(40),"FULLREGION_NAME":OType.VARCHAR2(80),"ADDRESS":OType.VARCHAR2(300),
           "LONGITUDE":OType.NUMBER(9,6),"LATITUDE":OType.NUMBER(9,6),"MANUFACTURE_DATE":OType.DATE(),
           "ENVIRONMENT_PRINCIPAL":OType.VARCHAR2(80),"CREDIT_CODE":OType.VARCHAR2(100),"LINK_INFO":OType.VARCHAR2(80),
           "BOILER_NUM":OType.NUMBER(1),"BURN_ABILITY":OType.VARCHAR2(300),"ELECTRIC_POWER":OType.VARCHAR2(80),
           "DATA_STATUS":OType.NUMBER(1),"AFGROUP_CODE":OType.VARCHAR2(80),"CREATE_TIME":OType.DATE(),
           "UPDATE_TIME":OType.DATE(),"CORPORATION_NAME":OType.VARCHAR2(20)

           }
    df_table.rename(columns=str.upper,inplace=True)

    dateCols=[key for key,value in dtype.items() if isinstance(value,OType.DATE)]
    for col in dateCols:
        df_table[col]=pd.to_datetime(df_table[col])
     
    df_table.to_sql('T_GDI_INFO_PS_BASE',engine,index=False,dtype=dtype,if_exists='append')

def getBurn():
    GetBurnListUrl='XXX'

   
    df_PS=pd.io.sql.read_sql_table('T_GDI_INFO_PS_BASE'.lower(),con=engine)
     
    dataList0=[]
    for i,ps in df_PS.iterrows():
        url=GetBurnListUrl.format(pscode=ps.ps_code)
        dataList=retryGet(url)
         
        dataList0+=dataList
    df_Burn=pd.DataFrame(burnList0)
    df_Burn.replace({'':None},inplace=True)
    return df_Burn

def saveBurn_oracle(df_table):

    dtype={"PS_CODE":OType.VARCHAR2(80),"MP_CODE":OType.VARCHAR2(80),"MP_NAME":OType.VARCHAR2(80),
           "BOILER_NAME":OType.VARCHAR2(400),"BOILER_TYPE":OType.VARCHAR2(80),"CREATER_TIME":OType.DATE(),
           "BURN_ABILITY":OType.VARCHAR2(80),"DATA_STATUS":OType.NUMBER(1),"OPEN_TIME":OType.DATE(),
           "CREATE_TIME":OType.DATE(),"UPDATE_TIME":OType.DATE(),
           "STOPOPEN_TIME":OType.VARCHAR2(80),"STOPOPEN_STATUS":OType.VARCHAR2(80),"STOPOPEN_REASON":OType.VARCHAR2(80),

           }
    df_table.rename(columns=str.upper,inplace=True)

    dateCols=[key for key,value in dtype.items() if isinstance(value,OType.DATE)]
    for col in dateCols:
        df_table[col]=pd.to_datetime(df_table[col])
          
    df_table.to_sql('T_GDI_INFO_MP_BURN',engine,index=False,dtype=dtype,if_exists='append')

def getMonitor(date):
    GetMonitorDataListUrl='XXX'
    df_Burn=pd.io.sql.read_sql_table('T_GDI_INFO_MP_BURN'.lower(),con=engine)
    dataList0=[]
    for i,ps in df_Burn.iterrows():
         
        url=GetMonitorDataListUrl.format(pscode=ps.ps_code,outputcode=ps.mp_code,date=date.strftime('%Y%m%d'))

        dataList=retryGet(url)
        print(dataList)
        dataList0+=dataList
        
    
    df_Monitor=pd.DataFrame(dataList0)
    return df_Monitor

def saveMonitor_oracle(df_table):

    dtype={"PS_CODE":OType.VARCHAR2(80),"MP_CODE":OType.VARCHAR2(80),"MONITOR_TIME":OType.DATE(),
           "POLLUTANT_CODE":OType.VARCHAR2(20),"POLLUTANT_NAME":OType.VARCHAR2(30),"DAY":OType.VARCHAR2(20),
           "STANDARD_VALUE":OType.VARCHAR2(20),"STRENGTH":OType.NUMBER(20,6), 
           "STATUS":OType.NUMBER(1),"DATA_STATUS":OType.NUMBER(1),"CREATE_TIME":OType.DATE(),"UPDATE_TIME":OType.DATE(),
           "REMARK":OType.VARCHAR2(300),

           }
    df_table.rename(columns=str.upper,inplace=True)

    dateCols=[key for key,value in dtype.items() if isinstance(value,OType.DATE)]
    for col in dateCols:
        df_table[col]=pd.to_datetime(df_table[col])
          
    df_table.to_sql('T_GDI_DATA_DAY',engine,index=False,dtype=dtype,if_exists='append')
    
def getGK(date):
    GetGkDataListUrl='XXX'

    df_Burn=pd.io.sql.read_sql_table('T_GDI_INFO_MP_BURN'.lower(),con=engine)
    dataList0=[]
    for i,ps in df_Burn.iterrows():
        time.sleep(10)
        url=GetGkDataListUrl.format(pscode=ps.ps_code,outputcode=ps.mp_code,date=date.strftime('%Y%m%d'))

        dataList=retryGet(url)
        print(dataList)
        dataList0+=dataList
        break   
    df_Gk=pd.DataFrame(dataList0)
    return df_Gk

def saveGk_oracle(df_table):

    dtype={"PS_CODE":OType.VARCHAR2(80),"MP_CODE":OType.VARCHAR2(80),"MONITOR_TIME":OType.DATE(),
           "GK_NAME":OType.VARCHAR2(20),"DAY":OType.VARCHAR2(20),
           "STATUS":OType.NUMBER(1),"DATA_STATUS":OType.NUMBER(1),"CREATE_TIME":OType.DATE(),"UPDATE_TIME":OType.DATE(),
           "REMARK":OType.VARCHAR2(300),

           }
    df_table.rename(columns=str.upper,inplace=True)

    dateCols=[key for key,value in dtype.items() if isinstance(value,OType.DATE)]
    for col in dateCols:
        df_table[col]=pd.to_datetime(df_table[col])
          
    df_table.to_sql('T_GDI_DATA_GK',engine,index=False,dtype=dtype,if_exists='append')


def getLW(date):
    GetGkDataListUrl='XXX'

    df_Burn=pd.io.sql.read_sql_table('T_GDI_INFO_MP_BURN'.lower(),con=engine)

    dataList0=[]
    for i,ps in df_Burn.iterrows():
         
        url=GetGkDataListUrl.format(pscode=ps.ps_code,outputcode=ps.mp_code,date=date.strftime('%Y%m%d'))
        print(url)
        dataList=retryGet(url)
        dataList0+=dataList
        
        
    df_Lw=pd.DataFrame(dataList0)
    print(df_Lw)
    return df_Lw


def saveLw_oracle(df_table):

    dtype={"PS_CODE":OType.VARCHAR2(80),"MP_CODE":OType.VARCHAR2(80),"MONITOR_TIME":OType.DATE(),
           "STRENGTH":OType.NUMBER(20,6),"STANDARD_VALUE":OType.NUMBER(20,6),
        
           "STATUS":OType.NUMBER(1),"DATA_STATUS":OType.NUMBER(1),"CREATE_TIME":OType.DATE(),"UPDATE_TIME":OType.DATE(),
           "TX_INFO":OType.VARCHAR2(300),"ABNORMAL_REASON":OType.VARCHAR2(300),"FAULT_INFO":OType.VARCHAR2(300),
           "CEMS_INFO":OType.VARCHAR2(300),"DAY":OType.VARCHAR2(20),"H_NAME":OType.NUMBER(20,6),
         }
    df_table.rename(columns=str.upper,inplace=True)

    dateCols=[key for key,value in dtype.items() if isinstance(value,OType.DATE)]
    for col in dateCols:
        df_table[col]=pd.to_datetime(df_table[col])
          
    df_table.to_sql('T_GDI_DATA_LW2',engine,index=False,dtype=dtype,if_exists='append')



导入MySQL代码如下:

import requests
import pandas as pd
import sqlalchemy as SAC
from sqlalchemy import create_engine
from datetime import datetime,timedelta
  
import retrying
import time

             
engine = create_engine('mysql+pymysql://XXX:XXX@XXX/garbage_power?charset=utf8mb4')
 
 
    
 

def getPS():
    
    GetPSListUrl='XXX'
    res=requests.get(GetPSListUrl)
    psList=res.json()
    df_PS=pd.DataFrame(psList)
    df_PS.drop(columns=["token_id","error_web"],inplace=True)
    return df_PS

def savePS_mysql():
    
    dtype={col:SAC.CHAR(200) for col in df_PS.columns}
    dtype0={'longitude':SAC.FLOAT(),'latitude':SAC.FLOAT(),"data_status":SAC.INTEGER(),
           "create_time":SAC.DATETIME(),"boiler_num":SAC.INTEGER(),
           "manufacture_date":SAC.DATE(),"update_time":SAC.DATETIME()}
    dtype.update(dtype0)

    df_PS.to_sql('psinfo',engine,index=False,dtype=dtype)
     


def getBurn():
    GetBurnListUrl='XXX'

    #df_PS=pd.io.sql.read_sql_table('psinfo',con=engine)
    df_PS=pd.io.sql.read_sql_table('psinfo',con=engine)
     
    dataList0=[]
    for i,ps in df_PS.iterrows():
        url=GetBurnListUrl.format(pscode=ps.ps_code)
        dataList=retryGet(url)
         
        dataList0+=dataList
    df_Burn=pd.DataFrame(burnList0)
    df_Burn.replace({'':None},inplace=True)
    return df_Burn

def saveBurn_mysql(df_table):
        
    dtype={col:SAC.CHAR(200) for col in df_table.columns}
    dtype0={"open_time":SAC.DATETIME(),
           "create_time":SAC.DATETIME(),"data_status":SAC.INTEGER(),
           "creater_time":SAC.DATE(),"update_time":SAC.DATETIME()}
    dtype.update(dtype0)
    df_table.to_sql('burninfo',engine,index=False,dtype=dtype)


  

@retrying.retry()
def retryGet(url):
    time.sleep(10)
    res=requests.get(url)
    data=res.json()
    print(type(data))
    if(isinstance(data,int)):
        print(res.content)
        raise Exception()
    return data

def getMonitor(date):
    GetMonitorDataListUrl='XXX'

    df_Burn=pd.io.sql.read_sql_table('burninfo'.lower(),con=engine)

    dataList0=[]
    for i,ps in df_Burn.iterrows():
         
        url=GetMonitorDataListUrl.format(pscode=ps.ps_code,outputcode=ps.mp_code,date=date.strftime('%Y%m%d'))

        dataList=retryGet(url)
        print(dataList)
        dataList0+=dataList
        
    
    df_Monitor=pd.DataFrame(dataList0)
    return df_Monitor


def saveMonitor_mysql(df_table):
        
    dtype={col:SAC.CHAR(200) for col in df_table.columns}
    dtype0={"create_time":SAC.DATETIME(),"status":SAC.INTEGER(),"strength":SAC.FLOAT(),"standard_value":SAC.FLOAT(),
           "monitor_time":SAC.DATETIME(),"data_status":SAC.INTEGER(),
           "day":SAC.DATE(),"update_time":SAC.DATETIME()}
    dtype.update(dtype0)
    print(df_table)
    df_table.to_sql('monitordata',engine,index=False,if_exists='append',dtype=dtype)
    
 
    

def getGK(date):
    GetGkDataListUrl='XXX'

    df_Burn=pd.io.sql.read_sql_table('burninfo'.lower(),con=engine)
    dataList0=[]
    for i,ps in df_Burn.iterrows():
        time.sleep(10)
        url=GetGkDataListUrl.format(pscode=ps.ps_code,outputcode=ps.mp_code,date=date.strftime('%Y%m%d'))

        dataList=retryGet(url)
        print(dataList)
        dataList0+=dataList
        
     
    df_Gk=pd.DataFrame(dataList0)

    return df_Gk
    
def saveGk_mysql(df_table):
    dtype={col:SAC.CHAR(200) for col in df_Gk.columns}
    dtype0={"create_time":SAC.DATETIME(),"monitor_time":SAC.DATETIME(),"data_status":SAC.INTEGER(),
           "day":SAC.DATE(),"update_time":SAC.DATETIME()}
    dtype.update(dtype0)
    
    print(df_table)
    df_table.to_sql('gkdata',engine,index=False,if_exists='append',dtype=dtype)
    
def getLW(date):
    GetGkDataListUrl='XXX'

    df_Burn=pd.io.sql.read_sql_table('burninfo'.lower(),con=engine)

    dataList0=[]
    for i,ps in df_Burn.iterrows():
        time.sleep(10)
        url=GetGkDataListUrl.format(pscode=ps.ps_code,outputcode=ps.mp_code,date=date.strftime('%Y%m%d'))
        print(url)
        dataList=retryGet(url)
        dataList0+=dataList
        break
        
    df_Lw=pd.DataFrame(dataList0)

 
    print(df_Lw)
    return df_Lw
    

def saveLw_mysql(df_table):
    dtype={col:SAC.CHAR(200) for col in df_Lw.columns}
    dtype0={"create_time":SAC.DATETIME(),"monitor_time":SAC.DATETIME(),
            "data_status":SAC.INTEGER(),"status":SAC.INTEGER(),"h_name":SAC.FLOAT(),
           "day":SAC.DATE(),"standard_value":SAC.FLOAT(),"strength":SAC.FLOAT()}
    dtype.update(dtype0)
    
    df_Lw.to_sql('lwdata',engine,index=False,if_exists='append',dtype=dtype)