Python 项目里 SQLAlchemy 如何CRUD数据
Python项目使用的Flask框架,操作数据库使用的 SQLAlchemy。
```
from sqlalchemy import Column, Integer, String, SmallInteger
from sqlalchemy import func
__mysql = 'mysql+pymysql://xxx@127.0.0.1:3306/xxx?charset=utf8mb4'
__engine = create_engine(__mysql, pool_size=5, max_overflow=0)
__session = sessionmaker(bind=__engine, expire_on_commit=False)
```
#用于查询数据
```
@contextmanager
def session_query_maker():
db_session = scoped_session(__session)
try:
yield db_session
except BaseException as e:
db_session.rollback()
finally:
db_session.close()
```
#用户更新数据
```
@contextmanager
def session_commit_maker():
db_session = scoped_session(__session)
try:
yield db_session
db_session.commit()
except BaseException as e:
db_session.rollback()
finally:
db_session.close()
```
以要操作的对象为 User 为例:
User对象:
```
from extensions import BaseModel, Column, Integer, SmallInteger, String
class User(BaseModel):
__tablename__ = "user"
id = Column(Integer, primary_key=True, nullable=False)
name = Column(String, nullable=False, default='')
email = Column(String, nullable=False, default='')
```
添加新数据:
```
#new_user 为 User 对象实例
db_session.add(new_user)
#添加多个 user。user_list 举:[user1, user2, user3]
db.session.add_all(user_list)
```
查询数据:
#查询 uid > 100 的 20 条数据,按 uid降序排列
```
db_session.query(User)
.filter(User.id > 100)
.order_by(User.id.desc())
.limit(20)
.offset(0)
.all()
```
更新数据:
#将用uid为100的用户名更新为 ABC
```
session.query(User)
.filter_by(id = 100)
.update({"name": "ABC"})
```
删除数据:
```
session.query(User.id)
.filter_by(User.id == 2)
.delete()
```
我的笔记