2014-04-11 1 views
1
내가 SQLAlchemy의와과 같이 쿼리를 작성하려는

:SQLAlchemy가 Postgres 용 UPDATE ..... FROM 구문을 지원합니까?

UPDATE mytable 
SET 
    mytext = myvalues.mytext, 
    myint = myvalues.myint 
FROM (
    VALUES 
    (1, 'textA', 99), 
    (2, 'textB', 88), 
    ... 
) AS myvalues (mykey, mytext, myint) 
WHERE mytable.mykey = myvalues.mykey 

는 ORM에서 기본적으로 지원되는 이런 종류의 일입니까? 또는 원시 SQL을 실행하기 위해 session.execute()를 사용해야합니까? 메일 링리스트에 @zzzeek 통해

답변

4

:

우리는 UPDATE..FROM을하지만 우리는 지금에 내장하는 구조를 가지고 있지 않는 한 거기에 값이 일을 얻는 것은 몇 가지 추가 조리법을 필요로한다. 또한 AS 부분의 열을 이름 짓는 별칭 부분도 기본적으로 내장되어 있지 않습니다. 조리법은 여기에 있습니다 : https://bitbucket.org/zzzeek/sqlalchemy/wiki/UsageRecipes/PGValues. 어떤 시점에서 나는 누군가에게 AS 부분을 보여 주었지만 거기에는 없다 .... 나는 단지 그것을 업데이트했고, UPDATE 문맥에서 작업하는 데 도움이 필요했다. 따라서 :

from sqlalchemy.ext.compiler import compiles 
from sqlalchemy.sql.expression import FromClause 

class values(FromClause): 
    named_with_column = True 

    def __init__(self, columns, *args, **kw): 
     self._column_args = columns 
     self.list = args 
     self.alias_name = self.name = kw.pop('alias_name', None) 

    def _populate_column_collection(self): 
     for c in self._column_args: 
      c._make_proxy(self) 


@compiles(values) 
def compile_values(element, compiler, asfrom=False, **kw): 
    columns = element.columns 
    v = "VALUES %s" % ", ".join(
     "(%s)" % ", ".join(
       compiler.render_literal_value(elem, column.type) 
       for elem, column in zip(tup, columns)) 
     for tup in element.list 
    ) 
    if asfrom: 
     if element.alias_name: 
      v = "(%s) AS %s (%s)" % (v, element.alias_name, (", ".join(c.name for c in element.columns))) 
     else: 
      v = "(%s)" % v 
    return v 

if __name__ == '__main__': 
    from sqlalchemy import MetaData, create_engine, String, Integer, Table, Column 
    from sqlalchemy.sql import column 
    from sqlalchemy.orm import Session, mapper 
    m1 = MetaData() 
    class T(object): 
     pass 
    t1 = Table('mytable', m1, Column('mykey', Integer, primary_key=True), 
        Column('mytext', String), 
        Column('myint', Integer)) 
    mapper(T, t1) 
    t2 = values(
      [ 
       column('mykey', Integer), 
       column('mytext', String), 
       column('myint', Integer) 
      ], 


      (1, 'textA', 99), 
      (2, 'textB', 88), 

      alias_name='myvalues' 
     ) 
    e = create_engine("postgresql://scott:[email protected]/test", echo=True) 
    m1.create_all(e) 
    sess = Session(e) 
    sess.query(T).filter(T.mykey==t2.c.mykey).\ 
      update(dict(mytext=t2.c.mytext, myint=t2.c.myint))