5

sqlalchemy模块介绍、单表操作、一对多表操作、多对多表操作、flask集成. - Deity_JGX

 2 years ago
source link: https://www.cnblogs.com/jgx0/p/16286445.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

今日内容概要

  • sqlalchemy介绍和快速使用
  • 单表操作增删查改
  • flask集成

1、sqlalchemy介绍和快速使用

# SQLAlchemy是一个基于 Python实现的ORM框架

# django的orm框架---》只能在django中用,不能单独用

# SQLAlchemy单独的,可以集成到任意框架中

# peewee:轻量级

# python的异步orm框架不多,  sanic, fastapi---》一旦用了异步,后续所有都需要用异步---》操作mysql,aiomysql--》操作redis,使用aioredis

# 公司选择
	-第一:peewee-async
	-第二:框架是异步---》没有使用异步orm框架---》SQLAlchemy---》生成和迁移表---》查询操作数据用原生操作
  
  
# 写django项目---》库和表已经有了
	-正常操作django中建表模型---》迁移---》表
	-反向生成models--》表---》models.py----》改表---》再反向生成
	python manage.py inspectdb > app/models.py

1.1 执行原生sql

# 执行原生sql快速使用
import time
import threading
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine

# 第一步:创建engine
engine = create_engine(
    "mysql+pymysql://root:[email protected]:3306/db01?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)


# 第二步:使用
def task():
    conn = engine.raw_connection()  # 从连接池中取一个连接
    cursor = conn.cursor()
    sql = "select * from cmd"
    cursor.execute(sql)
    print(cursor.fetchall())


if __name__ == '__main__':
    for i in range(20):
        t = threading.Thread(target=task)
        t.start()

# 查询mysql的客户端连接数

2、单表操作增删查改

2.1 表迁移

# 不能创建数据库(django orm也不能)

# 只能做表的创建和删除,不能做表更改(django orm能)---》借助于第三方实现

###### 第一步:生成基类,所有表模型都要继承这个基类
	django 的orm继承一个父类,Base就是那个父类

###### 第二步:写表模型,继承父类,写字段   (注意区别于django 的orm)
	django的default--》可不可以传个函数内存地址---》插入的时候通过函数运算完得到的值

###### 第三步:迁移,通过表模型,生成表

创建models.py

import datetime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

# 第一步:生成基类,所有表模型都要继承这个基类
# django 的orm继承一个父类,Base就是那个父类
Base = declarative_base()


# 第二步:写表模型,继承父类,写字段   (注意区别于django 的orm)
# django的default--》可不可以传个函数内存地址---》插入的时候通过函数运算完得到的值
class Users(Base):
    id = Column(Integer, primary_key=True, autoincrement=True)  # id 主键
    name = Column(String(32), index=True, nullable=False)  # name列,索引,不可为空
    email = Column(String(32), unique=True)  # 唯一
    # datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
    ctime = Column(DateTime, default=datetime.datetime.now)  # 默认值
    extra = Column(Text, nullable=True)  # 大文本,可以为空

    __tablename__ = 'lqz_users'  # 数据库表名称,如果不写,就报错
    # __table_args__ = (
    #     UniqueConstraint('id', 'name', name='uix_id_name'),  # 联合唯一
    #     Index('ix_id_name', 'name', 'email'),  # 联合索引
    # )

# 聚簇索(mysql主键自动建索引,聚簇索引,mysql基于聚簇索引构建的B+树),一定会有,没有显示建主键,mysql会隐藏一个
# 辅助索引:手动建的叫辅助索引---》单独减了索引---》如果你的辅助索引过多,非常影响插入效率,适度建索引

创建演示文件:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from threading import Thread
from models import Base


# 第三步:迁移,通过表模型,生成表
engine = create_engine(
    "mysql+pymysql://root:[email protected]:3306/db01?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)


def create_table():
    # 通过engine这个连接配置,创建出所有使用Base管理的表
    Base.metadata.create_all(engine)


def delete_table():
    # 通过engine这个连接配置,删除出所有使用Base管理的表
    Base.metadata.drop_all(engine)


if __name__ == '__main__':
    # create_table()  # 创建表
    delete_table()  # 删除表

2.2 简单的表操作

### 操作表,增加一条记录,以后都用conn/session(命名可以更改)操作

# 第一步:创建engin

# 第二步:通过session得到连接对象
	Session = sessionmaker(bind=engine)
	session = Session()

# # 第三步:实例化得到模型类的对象,增加到数据库中
	usr=Users(name='lqz001')
	session.add(usr)

# # 第四步:提交事务
	session.commit()

2.3 基于scoped_session实现线程安全

# # 以后操作数据,都用session对象---》定义在flask的函数外部还是内部?
# # 放内部没问题,每次都生成一个新的session,耗费资源
# # 如果定义在函数外部,会存在 多线程并发使用同一个变量session,要把session做成并发安全的
Session = sessionmaker(bind=engine)
session = scoped_session(Session)  # 也是基于local,给每一个线程自己创造一个session

# # 只需要记住,如果是多线程使用,或者在web框架中,使用scoped_session生成session就可以了
# # 集成到flask中,有flask-sqlalchemy第三方,内部已经处理了scoped_session
# # 全局用这个一个session,不用担心并发不安全
usr = Users(name='lqz002')
session.add(usr)  # 线程一用:取local中取线程1的那个session,如果就给,没有就重新创造一个

# # 第四步:提交事务
session.commit()

测试线程安全

# 线程一用:
	取local中取线程1的那个session,如果就给,没有就重新创造一个
# 线程二用:
	取local中取线程2的那个session,如果就给,没有就重新创造一个


# # 测试:开3个线程,如果定义全局的session,在3个线程中用,session对象应该是同一个
Session = sessionmaker(bind=engine)
session = Session()
# session = scoped_session(Session)


def task():
    # usr = Users(name='lqz003')
    # session.add(usr)
    # session.commit()
    # print(session.registry.registry.value) # <sqlalchemy.orm.scoping.scoped_session object at 0x7f8fbceeea60>
    print(session)  # <sqlalchemy.orm.scoping.scoped_session object at 0x7f8fbceeea60>


# 开3个线程,如果定义scoped_session,在3个线程中用,session对象应该是不是同一个,独有的
if __name__ == '__main__':
    for i in range(3):
        t = Thread(target=task)
        t.start()

2.4 基本增删查改

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Users
from sqlalchemy.orm import scoped_session
from models import Base

# 第三步:迁移,通过表模型,生成表
engine = create_engine(
    "mysql+pymysql://root:[email protected]:3306/db01?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)


def create_table():
    # 通过engine这个连接配置,创建出所有使用Base管理的表
    Base.metadata.create_all(engine)


def delete_table():
    # 通过engine这个连接配置,删除出所有使用Base管理的表
    Base.metadata.drop_all(engine)


if __name__ == '__main__':
    # create_table()
    # delete_table()
    Session = sessionmaker(bind=engine)
    session = scoped_session(Session)
    
    
    ### 1 增加操作
    # 增加一个
    obj1 = Users(name="lqz003")
    session.add(obj1)
    
    # 增加多个,不同对象
    session.add_all([
        Users(name="lqz009"),
        Users(name="lqz008"),
    ])
    session.commit()
    
    
    # 2 删除操作---》查出来再删---》
    session.query(Users).filter(Users.id > 2).delete()
    session.commit()
    
    
    # 3 修改操作--》查出来改
    # 传字典
    session.query(Users).filter(Users.id > 0).update({"name": "lqz"})
    # # 类似于django的F查询
    # # 字符串加
    # session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False)
    # # 数字加
    # session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate")
    session.commit()
    
    
    # 4 查询操作----》
    r1 = session.query(Users).all()  # 查询所有
    # 只取age列,把name重命名为xx
    # 原生sql:select name as xx,age from user;
    # r2 = session.query(Users.name.label('xx'), Users.age).all()

    # # filter传的是表达式,filter_by传的是参数
    # r3 = session.query(Users).filter(Users.name == "lqz").all()
    # # r3 = session.query(Users).filter(Users.id >= 1).all()
    # r4 = session.query(Users).filter_by(name='lqz').all()
    # r5 = session.query(Users).filter_by(name='lqz').first()

    # :value 和:name 相当于占位符,用params传参数
    # r6 = session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='lqz').order_by(
    #     Users.id).all()
    # 自定义查询sql
    # r7 = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='lqz').all()

2.5 更多查询操作

    # 更多查询
    #  条件
    # select * form user where name =lqz
    # ret = session.query(Users).filter_by(name='lqz').all()

    # 表达式,and条件连接
    # select * from user where id >1 and name = lqz
    # ret = session.query(Users).filter(Users.id > 1, Users.name == 'lqz').all()
    # select * from user where id between 1,3  and name = lqz
    # ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'lqz').all()

    # 注意下划线
    # select * from user where id in (1,3,4)
    # ret = session.query(Users).filter(Users.id.in_([1, 3, 4])).all()

    # # ~非,除。。外
    # select * from user where id not in (1,3,4)
    # ret = session.query(Users).filter(~Users.id.in_([1, 3, 4])).all()

    # # # 二次筛选
    # # ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='lqz'))).all()
    # from sqlalchemy import and_, or_
    #
    # # # or_包裹的都是or条件,and_包裹的都是and条件
    # ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
    # ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
    # ret = session.query(Users).filter(
    #     or_(
    #         Users.id < 2,
    #         and_(Users.name == 'eric', Users.id > 3),
    #         Users.extra != ""
    #     )).all()

    # # 通配符,以e开头,不以e开头
    # ret = session.query(Users).filter(Users.name.like('e%')).all()
    # ret = session.query(Users).filter(~Users.name.like('e%')).all()

    # # 限制,用于分页,区间
    # ret = session.query(Users)[1:2]

    # # 排序,根据name降序排列(从大到小)
    # ret = session.query(Users).order_by(Users.id.desc()).all()

    # # 第一个条件重复后,再按第二个条件升序排
    # ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()

    # # 分组
    # from sqlalchemy.sql import func
    # select * from user group by user.extra;
    # ret = session.query(Users).group_by(Users.extra).all()

    # # 分组之后取最大id,id之和,最小id
    # select max(id),sum(id),min(id) from user group by name ;
    # ret = session.query(
    #     func.max(Users.id),
    #     func.sum(Users.id),
    #     func.min(Users.id)).group_by(Users.name).all()

    # haviing筛选
    # select max(id),sum(id),min(id) from user group by name  having min(id)>2;
    # ret = session.query(
    #     func.max(Users.id),
    #     func.sum(Users.id),
    #     func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) > 2).all()

    # select max(id),sum(id),min(id) from user where id >=1 group by name  having min(id)>2;
    # ret = session.query(
    #     func.max(Users.id),
    #     func.sum(Users.id),
    #     func.min(Users.id)).filter(Users.id>=1).group_by(Users.name).having(func.min(Users.id) > 2).all()

    # 连表(默认用forinkey关联)
    # select * from user,favor where user.id=favor.id
    # ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()

    # join表,默认是inner join
    # select * from Person inner join favor on person.favor=favor.id;
    # ret = session.query(Person).join(Favor).all()
    
    # isouter=True 外连,表示Person left join Favor,没有右连接,反过来即可
    # ret = session.query(Person).join(Favor, isouter=True).all()
    # ret = session.query(Favor).join(Person, isouter=True).all()

    # 打印原生sql
    # aa = session.query(Person).join(Favor, isouter=True)
    # print(aa)

    # 自己指定on条件(连表条件),第二个参数,支持on多个条件,用and_,同上
    # select * from person left join favor on person.id=favor.id;
    # ret = session.query(Person).join(Favor, Person.id == Favor.id, isouter=True).all()

    # 组合(了解)UNION 操作符用于合并两个或多个 SELECT 语句的结果集
    # union和union all的区别?
    # q1 = session.query(Users.name).filter(Users.id > 2)
    # q2 = session.query(Favor.caption).filter(Favor.nid < 2)
    # ret = q1.union(q2).all()

    # q1 = session.query(Users.name).filter(Users.id > 2)
    # q2 = session.query(Favor.caption).filter(Favor.nid < 2)
    # ret = q1.union_all(q2).all()

2.6 执行原生sql

    # 执行原生sql
    # 查询
    cursor = session.execute('select * from users')
    result = cursor.fetchall()

    # 添加
    cursor = session.execute('insert into users(name) values(:value)', params={"value": 'lqz'})
    session.commit()
    print(cursor.lastrowid)

3、一对多表操作

3.1 表模型创建

class Hobby(Base):
    __tablename__ = 'hobby'
    id = Column(Integer, primary_key=True)
    caption = Column(String(50), default='篮球')


class Person(Base):
    __tablename__ = 'person'
    nid = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=True)
    # hobby指的是tablename而不是类名
    hobby_id = Column(Integer, ForeignKey("hobby.id"))  # 外键
    # 跟数据库无关,不会新增字段,只用于快速链表操作
    # 类名,backref用于反向查询   # 正向查询按字段,反向查询按 pers
    hobby = relationship('Hobby', backref='pers')

3.2 操作表

# 一对多
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker
from models import Users
from sqlalchemy.orm import scoped_session
from models import Base


# 第三步:迁移,通过表模型,生成表
engine = create_engine(
    "mysql+pymysql://root:[email protected]:3306/db01?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)


def create_table():
    # 通过engine这个连接配置,创建出所有使用Base管理的表
    Base.metadata.create_all(engine)


def delete_table():
    # 通过engine这个连接配置,删除出所有使用Base管理的表
    Base.metadata.drop_all(engine)


if __name__ == '__main__':
    # create_table()
    # delete_table()

    Session = sessionmaker(bind=engine)
    session = scoped_session(Session)

    from models import Hobby, Person

    # 1 增加数据
    # 方式一
    session.add_all([
        Hobby(caption='乒乓球'),
        Hobby(caption='羽毛球'),
        Person(name='张三', hobby_id=1),
        Person(name='李四', hobby_id=1),
    ])
    session.commit()
    
    # 方式二
    person = Person(name='张九', hobby=Hobby(caption='姑娘'))
    session.add(person)
    
    # 方式三
    hb = Hobby(caption='保龄球')
    # 反向字段
    hb.pers = [Person(name='lqz01'), Person(name='lqz02')]
    session.add(hb)
    session.commit()
    
    
    # 2 查询
    # 正向查询
    person = session.query(Person).first()
    print(person.name)
    # 基于对象的跨表查询
    print(person.hobby.caption)
    # 反向查询
    v = session.query(Hobby).first()
    print(v.caption)
    print(v.pers)  # 多条

    # 链表查询
    # select person.name ,hobby.caption from person left join bobby on person.hobby_id=hobby.id;
    person_list = session.query(Person.name, Hobby.caption).join(Hobby, isouter=True).all()
    # person_list = session.query(Person,Hobby).join(Hobby, isouter=True).all()
    for row in person_list:
        # print(row.name,row.caption)
        print(row[0].name, row[1].caption)

    person_list = session.query(Person).all()
    for row in person_list:
        print(row.name, row.hobby.caption)

    obj = session.query(Hobby).filter(Hobby.id == 1).first()
    persons = obj.pers
    print(persons)
    session.close()

4、多对多表操作

4.1 表模型创建

# boy girl 相亲,一个boy可以约多个女生,一个女生可以相多个男生
class Boy2Girl(Base):
    __tablename__ = 'boy2girl'
    id = Column(Integer, primary_key=True, autoincrement=True)
    girl_id = Column(Integer, ForeignKey('girl.id'))
    boy_id = Column(Integer, ForeignKey('boy.id'))


class Girl(Base):
    __tablename__ = 'girl'
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)


class Boy(Base):
    __tablename__ = 'boy'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(64), unique=True, nullable=False)

    # 与生成表结构无关,仅用于查询方便,放在哪个单表中都可以
    girls = relationship('Girl', secondary='boy2girl', backref='boys')

4.2 操作表

# 多对多
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
from models import Base

# 第三步:迁移,通过表模型,生成表
engine = create_engine(
    "mysql+pymysql://root:[email protected]:3306/db01?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)


def create_table():
    # 通过engine这个连接配置,创建出所有使用Base管理的表
    Base.metadata.create_all(engine)


def delete_table():
    # 通过engine这个连接配置,删除出所有使用Base管理的表
    Base.metadata.drop_all(engine)


from models import Boy, Girl, Boy2Girl

if __name__ == '__main__':
    # create_table()
    # delete_table()
    Session = sessionmaker(bind=engine)
    session = scoped_session(Session)

    # 1 增加数据
    #  方式一
    session.add_all([
        Boy(name='彭于晏'),
        Boy(name='刘德华'),
        Girl(name='刘亦菲'),
        Girl(name='迪丽热巴'),
    ])
    session.commit()
    s2g = Boy2Girl(boy_id=1, girl_id=1)
    session.add(s2g)
    session.commit()

    # 方式二
    boy = Boy(name='lqz')
    boy.girls = [Girl(name='小红'), Girl(name='校花')]
    session.add(boy)
    session.commit()

    # 方式三
    girl = Girl(name='小梅')
    girl.boys = [Boy(name='lqz001'), Boy(name='lqz002')]
    session.add(girl)
    session.commit()

    # 基于对象的跨表查
    # 使用relationship正向查询
    v = session.query(Boy).first()
    print(v.name)
    print(v.girls)

    # 使用relationship反向查询
    v = session.query(Girl).first()
    print(v.name)
    print(v.boys)

5、flask集成

# Flask_SQLAlchemy 操作数据库

# flask_migrate  模拟django的表迁移
	pip3 install flask_migrate


# flask_migrate使用步骤
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()  # 全局SQLAlchemy
app = Flask(__name__)
app.config.from_object('settings.DevelopmentConfig')

# 将db注册到app中,加载配置文件,flask-session,用一个类包裹一下app
db.init_app(app)

# flask_script创建命令 runserver命令 ,自定义名字
# 下面三句会创建出两个命令:runserver  db 命令(flask_migrate)
manager=Manager(app)
Migrate(app, db)
manager.add_command('db',MigrateCommand )  # 添加一个db命令,原来有了runserver命令了



# 直接使用命令迁移表即可
# 1 初始化
python3 manage.py db init  # 刚开始干,生成一个migrate文件夹

# 2 创建表,修改表
python3 manage.py db migrate   # 等同于 makemigartions
python3 manage.py db upgrade   # 等同于 migrate
# Flask_SQLAlchemy给你包装了基类,和session,以后拿到db

db = SQLAlchemy()  # 全局 SQLAlchemy

# 增删查改数据-->并发安全
db.session.query()

# 表模型要继承基表
class Users(db.Model):

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK