SQLAlchemy数据导出到CSV文件。
SQLAlchemy 是一款Python的ORM对象数据库映射框架。它包含整套企业级持久化模式,实现了高效和高性能的数据库访问。CSV文件是pandas数据分析中使用最广泛的文件格式,本案例实现如何把SQLAlchemy 查询出来的数据导入到CSV文件。
源码如下:
# -*- coding: utf-8 -*-
# @Time : 2023/3/29 21:12
# @File : csv_demo.py
# @Software : PyCharm
# MySQL引擎
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeMeta
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine, Column, Integer, String, Double, TEXT
# json解析库
import json
# pandas数据分析
import pandas as pd
# SqlAlchemy对象类型转Json解析处理类
class AlchemyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj.__class__, DeclarativeMeta):
fields = {}
for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
data = obj.__getattribute__(field)
try:
json.dumps(data)
fields[field] = data
except TypeError:
fields[field] = None
return fields
return json.JSONEncoder.default(self, obj)
#List转CSV文件
def list_to_csv(list_data, csv_file):
if len(list_data) > 0:
columns = list(list_data[0].keys())
df = pd.DataFrame(columns=columns, data=list_data)
df.to_csv(csv_file, mode="w", encoding="utf_8_sig", header=1, index=0)
# 创建orm的会话池,orm和sql均可以管理对象关系型数据库,需要绑定引擎才可以使用会话,
# 创建连接
engine = create_engine("mysql+pymysql://root:root@127.0.0.1/spider",
# 需要安装mysql和pymysql的模块,用户名:密码@ip地址/某个数据库
# echo=True, # 打印操作对应的SQL语句
pool_size=8, # 连接个数
pool_recycle=60 * 30 # 不使用时断开
)
# 创建session
DbSession = sessionmaker(bind=engine)
# 会话工厂,与引擎绑定。
# 实例化
session = DbSession()
Base = declarative_base()
class Movie(Base):
__tablename__ = 'movie250'
id = Column(Integer, primary_key=True)
info_link = Column(TEXT) # 电影链接
pic_link = Column(TEXT) # 电影海报
cname = Column(String(100)) # 电影名称
ename = Column(String(255)) # 电影英文名称
score = Column(Double) # 豆瓣得分
rated = Column(Double) # 等级
introduce = Column(TEXT) # 简介
info = Column(TEXT) # 电影信息
#查询所有高分电影
movies = session.query(Movie).all()
#电影集合
movie_list = []
for m in movies:
#转为json字符串
json_movie = json.dumps(m,cls=AlchemyEncoder,indent=4)
#json字符串转Dic
dict_movie = json.loads(json_movie)
#把dic对象追加到list
movie_list.append(dict_movie)
session.commit()
# 把电影集合保存到csv文件
list_to_csv(movie_list,'movie250.csv')
# 关闭会话
session.close()