Python 大批量写入数据 百万级别
- 背景
- 方案
- 代码
背景
现有一个百万行数据的csv格式文件,需要在两分钟之内存入数据库。
方案
方案一:多线程+协程+异步MySql方案二:多线程+MySql批量插入
代码
1,先通过pandas读取所有csv数据存入列表。2,设置N个线程,将一百万数据均分为N份,以start,end传递给线程以切片的方法读取区间数据(建议为16个线程)3,方案二 线程内以executemany 方法批量插入所有数据。4,方案一 线程内使用异步事件循环遍历所有数据异步插入。 5,方案一纯属没事找事型。
方案二
import threadingimport pandas as pdimport asyncioimport timeimport aiomysqlimport pymysqldata=[]error_data=[]def run(start,end):global dataglobal error_dataprint("start"+threading.current_thread().name)print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))mysdb = getDb("*", *, "*", "*", "*")cursor = mysdb.cursor()sql = """insert into *_*_* values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""cursor.executemany(sql,data[start:end])mysdb.commit()mysdb.close()print("end" + threading.current_thread().name)print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))def csv_file_read_use_pd(csvFile):csv_result = pd.read_csv(csvFile,encoding="utf-16",sep='\t')csv_result = csv_result.fillna(value="None")result = csv_result.values.tolist()return resultclass MyDataBase:def __init__(self,host=None,port=None,username=None,password=None,database=None):self.db = pymysql.connect(host=host,port=port,user=username,password=password,database=database)def close(self):self.db.close()def getDb(host,port,username,password,database):MyDb = MyDataBase(host, port, username, password,database)return MyDb.dbdef main(csvFile):global data#获取全局对象csv全量数据#读取所有的数据 将所有数据均分成 thread_lens 份 分发给thread_lens个线程去执行thread_lens=20csv_result=csv_file_read_use_pd(csvFile)day = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))for item in csv_result:item.insert(0,day)data=csv_resultthread_exe_count_list=[] #线程需要执行的区间csv_lens=len(csv_result)avg = csv_lens // thread_lensremainder=csv_lens % thread_lens# 0,2751727517,55,034nowIndex=0for i in range(thread_lens):temp=[nowIndex,nowIndex+avg]nowIndex=nowIndex+avgthread_exe_count_list.append(temp)thread_exe_count_list[-1:][0][1]+=remainder#余数分给最后一个线程# print(thread_exe_count_list)#th(thread_exe_count_list[0][0],thread_exe_count_list[0][1])for i in range(thread_lens):sub_thread = threading.Thread(target=run,args=(thread_exe_count_list[i][0],thread_exe_count_list[i][1],))sub_thread.start()sub_thread.join()time.sleep(3)if __name__=="__main__":#csv_file_read_use_pd("分公司箱型箱量.csv")main("分公司箱型箱量.csv")
方案一
import threadingimport pandas as pdimport asyncioimport timeimport aiomysqldata=[]error_data=[]async def async_basic(loop,start,end):global dataglobal error_dataprint("start"+threading.current_thread().name)print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))conn = await aiomysql.connect(host="*",port=*,user="*",password="*",db="*",loop=loop)day = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))sql = """insert into **** values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""async with conn.cursor() as cursor:for item in data[start:end]:params=[day]params.extend(item)try:x=await cursor.execute(sql,params)if x==0:error_data.append(item)print(threading.current_thread().name+" result "+str(x))except Exception as e:print(e)error_data.append(item)time.sleep(10)passawait conn.close()#await conn.commit()#关闭连接池# pool.close()# await pool.wait_closed()print("end" + threading.current_thread().name)print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))def csv_file_read_use_pd(csvFile):csv_result = pd.read_csv(csvFile,encoding="utf-16",sep='\t')csv_result = csv_result.fillna(value="None")result = csv_result.values.tolist()return resultdef th(start,end):loop = asyncio.new_event_loop()loop.run_until_complete(async_basic(loop,start,end))def main(csvFile):global data#获取全局对象csv全量数据#读取所有的数据 将所有数据均分成 thread_lens 份 分发给thread_lens个线程去执行thread_lens=20csv_result=csv_file_read_use_pd(csvFile)data=csv_resultthread_exe_count_list=[] #线程需要执行的区间csv_lens=len(csv_result)avg = csv_lens // thread_lensremainder=csv_lens % thread_lens# 0,2751727517,55,034nowIndex=0for i in range(thread_lens):temp=[nowIndex,nowIndex+avg]nowIndex=nowIndex+avgthread_exe_count_list.append(temp)thread_exe_count_list[-1:][0][1]+=remainder#余数分给最后一个线程print(thread_exe_count_list)#th(thread_exe_count_list[0][0],thread_exe_count_list[0][1])for i in range(thread_lens):sub_thread = threading.Thread(target=th,args=(thread_exe_count_list[i][0],thread_exe_count_list[i][1],))sub_thread.start()time.sleep(3)if __name__=="__main__":#csv_file_read_use_pd("分公司箱型箱量.csv")main("分公司箱型箱量.csv")