2017-01-31 2 views
1

내 Mac에서 all-spark 노트북 도커 이미지를 사용하여 apree toree 및 scala (https://github.com/jupyter/docker-stacks/tree/master/all-spark-notebook)를 사용하고 있습니다. nc -lk 9999Scala Spark Streaming via Apache Toree

그래서 나는를 시작 : 1) netcast 프로그램) 포트 9999 2에서 수신하는 sparkstreaming 객체를 시작하려면 시작 :에

나는 불꽃의 문서의 기본 스트리밍 예제를 테스트하기 위해 노력하고, 느릅 나무 참여 컨테이너는 9999 포트 바인딩 : 나는 "포트가 이미 사용"가지고 그것을 연결을 시도, 다음

$ sudo docker run -it --rm -p 9999:9999 -p 8888:8888 -e GRANT_SUDO=yes --user root --pid=host -e TINI_SUBREAPER=true -v $HOME/Informatique/notebooks:/home/jovyan/work:rw jupyter/all-spark-notebook 

그러나이 오류 :

$ nc -lk 9999 
nc: Address already in use 
,

는 또한 컨테이너에 자신을 넣어 시도 :

[email protected]:~$ docker ps 
CONTAINER ID  IMAGE      COMMAND     CREATED    STATUS    PORTS           NAMES 
0bd6b70bacfa  jupyter/all-spark-notebook "tini -- start-not..." 23 seconds ago  Up 22 seconds  0.0.0.0:8888->8888/tcp, 0.0.0.0:9999->9999/tcp wonderful_brattain 
[email protected]:~$ docker exec -ti wonderful_brattain /bin/bash 
[email protected]:~/work# nc -lk 9999 
bash: nc: command not found 
[email protected]:~/work# sudo apt-get update 
[email protected]:~/work# sudo apt-get install netcat-traditional 
[email protected]:~/work# nc -lk 9999 
aaaa aaa aaa 
bb bbb bbb 
cc cc cc 

하지만 스칼라 노트북에 표시 아무것도 없다 :

import org.apache.spark._ 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.StreamingContext._ 
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") 
val ssc = new StreamingContext(sc, Seconds(1)) 
val lines = ssc.socketTextStream("localhost", 9999) 
val words = lines.flatMap(_.split(" ")) 
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 
// Count each word in each batch 
val pairs = words.map(word => (word, 1)) 
val wordCounts = pairs.reduceByKey(_ + _) 
// Print the first ten elements of each RDD generated in this DStream to the console 
wordCounts.print() 

과 :

ssc.start()    // Start the computation 
ssc.awaitTermination() // Wait for the computation to terminate 

을 제공합니다

------------------------------------------- 
Time: 1485880101000 ms 
------------------------------------------- 

------------------------------------------- 
Time: 1485880102000 ms 
------------------------------------------- 

------------------------------------------- 
Time: 1485880103000 ms 
------------------------------------------- 

------------------------------------------- 
Time: 1485880104000 ms 
------------------------------------------- 

이러한 네트워크 문제를 어떻게 처리합니까?

답변

0

Spark 스트리밍 컨텍스트는 서버에서 데이터를 스트리밍하기 위해 nc 서버에 연결을 시도합니다. 포트 9999에서 수신 대기중인 서버는 nc이고 은 아니고 Spark 컨텍스트입니다.

먼저 노트북 컨테이너를 -p 9999:9999으로 시작하므로 포트가 이미 사용 중이므로 Docker가 호스트에 포트 9999를 예약합니다. 호스트에 nc -lk 9999을 실행하려고하면 충돌이 발생합니다.

노트북 컨테이너에서 실행되는 커널이 액세스 할 수 있도록 nc 서버를 설정해야합니다. 이를 수행하는 한 가지 방법은 nc 서버를 별도의 Docker 컨테이너에서 실행하고 두 컨테이너를 동일한 Docker 네트워크에 연결하는 것입니다.

첫째, 두 개의 컨테이너가 통신 할 수 있도록 호스트에 도커 네트워크를 만들 :

docker network create testnet 

지금 자신의 용기에 nc를 실행합니다.

docker run -it --rm --name nc --network testnet appropriate/nc -lk 9999 

--network testnet 옵션은 testnet 네트워크에 컨테이너를 연결합니다. --name nc 옵션을 사용하면 컨테이너가 호스트 이름 nc을 사용하여 같은 네트워크에있는 다른 컨테이너에 액세스 할 수 있습니다.

이제 노트북 컨테이너를 별도로 실행하십시오. 또한 --network testnet을 사용해야합니다.

docker run -it --rm --network testnet -p 8888:8888 \ 
-v $HOME/Informatique/notebooks:/home/jovyan/work:rw \ 
jupyter/all-spark-notebook 

마지막으로, 노트북 코드에서 스파크 컨텍스트가 호스트 이름 nc에 연결해야합니다.

import org.apache.spark._ 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.StreamingContext._ 
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") 
val ssc = new StreamingContext(sc, Seconds(5)) 
val lines = ssc.socketTextStream("nc", 9999) 
val words = lines.flatMap(_.split(" ")) 
import org.apache.spark.streaming.StreamingContext._ 
// Count each word in each batch 
val pairs = words.map(word => (word, 1)) 
val wordCounts = pairs.reduceByKey(_ + _) 
// Print the first ten elements of each RDD generated in this DStream to the console 
wordCounts.print() 

ssc.start()    // Start the computation 
ssc.awaitTermination() // Wait for the computation to terminate 

당신은 nc 컨테이너 터미널에 입력하는 경우 :

hello, world 

당신은 노트북이 표시되어야합니다

------------------------------------------- 
Time: 1485987495000 ms 
------------------------------------------- 
(hello,,1) 
(world,1) 
관련 문제