SQLAlchemy 查询

SQLAlchemy 查询

sqlalchemy多表联合查询(join)

有Users表和trans_details表,trans_details有外键user_id与Users.id关联.

#按用户名摸糊查询
trans_details.query.join(Uses).filter(Users.username.like('%xx%'))
#select xxx from trans_details inner join trans_details on users.id=trans_details.user_id where users.username like '%xx%'

#左外联接(left join)
trans_details.query.outerjoin(Uses).filter(Users.username.like('%xx%'))
#select xxx from trans_details left outer join trans_details on users.id=trans_details.user_id where users.username like '%xx%'

#以上是已经设置好外键,它自动找到关联的字段.也可以自己指定:
trans_details.query.join(Uses,trans_details.user_id==Users.id).filter(Users.username.like('%xx%'))
#select xxx from trans_details inner join trans_details on users.id=trans_details.user_id where users.username like '%xx%'

#另外一个更复杂的例子:
q=db.session.query(Credit_bills_details.no,Credit_bills_details.amount,Cards.no).outerjoin(Card_trans_details,
Credit_bills_details.no==Card_trans_details.trans_no).join(Cards,Card_trans_details.to_card_id==Cards.id)\
.filter(Credit_bills_details.credit_bill_id==3)
print(q.all())

#SELECT credit_bills_details.no AS credit_bills_details_no, credit_bills_details.amount AS credit_bills_details_amount, cards.no AS cards_no 
# FROM credit_bills_details LEFT OUTER JOIN card_trans_details ON credit_bills_details.no = card_trans_details.trans_no INNER JOIN cards
# ON card_trans_details.to_card_id = cards.id  WHERE credit_bills_details.credit_bill_id = %s

 #简单查询
 print(session.query(User).all())
 print(session.query(User.name, User.fullname).all())
 print(session.query(User, User.name).all())
 
 #带条件查询
 print(session.query(User).filter_by(name='user1').all())
 print(session.query(User).filter(User.name == "user").all())
 print(session.query(User).filter(User.name.like("user%")).all())
 
 #多条件查询
 print(session.query(User).filter(and_(User.name.like("user%"), User.fullname.like("first%"))).all())
 print(session.query(User).filter(or_(User.name.like("user%"), User.password != None)).all())
 
 #sql过滤
 print(session.query(User).filter("id>:id").params(id=1).all())
 
 #关联查询 
 print(session.query(User, Address).filter(User.id == Address.user_id).all())
 print(session.query(User).join(User.addresses).all())
 print(session.query(User).outerjoin(User.addresses).all())
 
 #聚合查询
 print(session.query(User.name, func.count('*').label("user_count")).group_by(User.name).all())
 print(session.query(User.name, func.sum(User.id).label("user_id_sum")).group_by(User.name).all())
 
 #子查询
 stmt = session.query(Address.user_id, func.count('*').label("address_count")).group_by(Address.user_id).subquery()
 print(session.query(User, stmt.c.address_count).outerjoin((stmt, User.id == stmt.c.user_id)).order_by(User.id).all())
 
 #exists
 print(session.query(User).filter(exists().where(Address.user_id == User.id)))
 print(session.query(User).filter(User.addresses.any()))
限制返回字段查询


person = session.query(Person.name, Person.created_at, 
 Person.updated_at).filter_by(name="zhongwei").order_by( 
 Person.created_at).first()
记录总数查询:


from sqlalchemy import func
 
# count User records, without
# using a subquery.
session.query(func.count(User.id))
 
# return count of user "id" grouped
# by "name"
session.query(func.count(User.id)).\
 group_by(User.name)
 
from sqlalchemy import distinct
 
# count distinct "name" values
session.query(func.count(distinct(User.name)))
Comments are closed.