← 返回首页
SQLAlchemy数据导出到CSV文件
发表时间:2023-03-29 23:19:47
SQLAlchemy数据导出到CSV文件

SQLAlchemy数据导出到CSV文件。

SQLAlchemy 是一款Python的ORM对象数据库映射框架。它包含整套企业级持久化模式,实现了高效和高性能的数据库访问。CSV文件是pandas数据分析中使用最广泛的文件格式,本案例实现如何把SQLAlchemy 查询出来的数据导入到CSV文件。

源码如下:

# -*- coding: utf-8 -*-
# @Time : 2023/3/29 21:12
# @File : csv_demo.py
# @Software : PyCharm

# MySQL引擎
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeMeta
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine, Column, Integer, String, Double, TEXT

# json解析库
import json

# pandas数据分析
import pandas as pd

# SqlAlchemy对象类型转Json解析处理类
class AlchemyEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj.__class__, DeclarativeMeta):
            fields = {}
            for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                data = obj.__getattribute__(field)
                try:
                    json.dumps(data)
                    fields[field] = data
                except TypeError:
                    fields[field] = None
            return fields
        return json.JSONEncoder.default(self, obj)


#List转CSV文件
def list_to_csv(list_data, csv_file):
    if len(list_data) > 0:
        columns = list(list_data[0].keys())
        df = pd.DataFrame(columns=columns, data=list_data)
        df.to_csv(csv_file, mode="w", encoding="utf_8_sig", header=1, index=0)



# 创建orm的会话池,orm和sql均可以管理对象关系型数据库,需要绑定引擎才可以使用会话,
# 创建连接
engine = create_engine("mysql+pymysql://root:root@127.0.0.1/spider",
                       # 需要安装mysql和pymysql的模块,用户名:密码@ip地址/某个数据库
                       # echo=True,         # 打印操作对应的SQL语句
                       pool_size=8,  # 连接个数
                       pool_recycle=60 * 30  # 不使用时断开
                       )

# 创建session
DbSession = sessionmaker(bind=engine)
# 会话工厂,与引擎绑定。
# 实例化
session = DbSession()

Base = declarative_base()

class Movie(Base):
    __tablename__ = 'movie250'
    id = Column(Integer, primary_key=True)
    info_link = Column(TEXT)  # 电影链接
    pic_link = Column(TEXT)  # 电影海报
    cname = Column(String(100))  # 电影名称
    ename = Column(String(255))  # 电影英文名称
    score = Column(Double)  # 豆瓣得分
    rated = Column(Double)  # 等级
    introduce = Column(TEXT)  # 简介
    info = Column(TEXT)  # 电影信息

#查询所有高分电影
movies = session.query(Movie).all()
#电影集合
movie_list = []
for m in movies:
    #转为json字符串
    json_movie = json.dumps(m,cls=AlchemyEncoder,indent=4)
    #json字符串转Dic
    dict_movie = json.loads(json_movie)
    #把dic对象追加到list
    movie_list.append(dict_movie)

session.commit()

# 把电影集合保存到csv文件
list_to_csv(movie_list,'movie250.csv')

# 关闭会话
session.close()