2016-08-09 2 views
2

내 응용 프로그램은 MongoDB를 플랫폼으로 사용하여 만들어졌습니다. DB에있는 하나의 컬렉션은 방대한 양의 데이터를 가지고 있으며 계산을 통해 분석 데이터를 검색하고 생성하기 위해 apache 스파크를 선택했습니다. Spark Connector for MongoDB을 MongoDB와 통신하도록 구성했습니다. pyspark을 사용하여 MongoDB 컬렉션을 쿼리하고 mongodb 쿼리의 결과 집합으로 구성된 데이터 프레임을 빌드해야합니다. 나에게 적절한 해결책을 제안하십시오.MongoDB에서 필터링 된 레코드로 Spark 데이터 프레임을 만드는 방법은 무엇입니까?

+0

왜 당신이 [Stratio 커넥터] 시도하지 말라 (https://github.com/Stratio/ Spark-MongoDB)? 이 커넥터는 데이터 프레임을 직접 반환합니다. –

+0

@JohnZeng 다음은 stratio 커넥터를 사용하여 구현 한 코드 조각입니다. ds = sqlContext.read.format ('com.stratio.datasource.mongodb'). (호스트 = 'localhost : 27017', 데이터베이스 = 'mydb', 콜렉션 = 'mycoll') load() –

+0

pls를 편집 할 수 있습니까? 귀하의 질문에 귀하의 스 니펫을 붙여 넣으시겠습니까? 나는 당신이 이것을 부른 후에 이미 데이터 프레임을 얻었다 고 생각합니다. 귀하의 질문이 MongoDB의 커넥터에 연결되어 있기 때문에 지금 무엇을 원하는지 혼란 스럽습니다. –

답변

5

당신은 너무 같은 dataframe에 직접 데이터를로드 할 수

# Create the dataframe 
df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", "mongodb://127.0.0.1/mydb.mycoll").load() 

# Filter the data via the api 
df.filter(people.age > 30) 

# Filter via sql 
df.registerTempTable("people") 
over_thirty = sqlContext.sql("SELECT name, age FROM people WHERE age > 30") 

자세한 내용은 몽고 스파크 커넥터 Python API 섹션 또는 introduction.py를 참조하십시오. SQL 쿼리가 변환되어 커넥터로 다시 전달되므로 spark 클러스터로 전송되기 전에 MongoDB에서 데이터를 쿼리 할 수 ​​있습니다.

또한 제공 할 수있는 당신의 불꽃에 결과를 반환하기 전에 컬렉션에 적용 할 aggregation pipeline을 자신의 :

dfr = sqlContext.read.option("pipeline", "[{ $match: { name: { $exists: true } } }]") 
df = dfr.option("uri", ...).format("com.mongodb.spark.sql.DefaultSource").load() 
+0

감사합니다. @ 로스. 그러나 데이터 프레임에 필터를 적용하는 대신 필터를 데이터베이스 쿼리에 직접 적용해야합니다. –

+0

그러면 콜렉션의 쿼리로 변환되어 커넥터가 반환합니다. 필터링 된 결과 – Ross

+0

코드 스 니펫을 통해 자세히 설명해 주시겠습니까 –

관련 문제