2014-04-30 2 views
1

저는 Apache Hadoop, MapReduce 및 Cassandra를 사용하여 Cassandra 테이블에서 읽고 다른 Cassandra 테이블로 출력하는 MapReduce 작업을 실행하고 있습니다.hadoop의 복합 기본 키를 사용하여 cassandra 테이블에 삽입하기

단일 기본 키가있는 테이블로 출력되는 작업이 몇 가지 있습니다. 예를 들어, 각 단어 유형의 수를 계산하기위한이 테이블에는 단일 키가 있습니다.

CREATE TABLE word_count(
     word text, 
     count int, 
     PRIMARY KEY(text) 
    ) WITH COMPACT STORAGE; 

클래스를 줄이기 보이는 관련된이 같은 비트 :

public static class ReducerToCassandra 
    extends Reducer<Text, IntWritable, ByteBuffer, List<Mutation>> 
{ 
    public void reduce(Text word, Iterable<IntWritable> values, Context context) 
     throws IOException, InterruptedException 
    { 
     int sum = 0; 
     for (IntWritable val : values){ 
      sum += val.get(); 
     } 

     org.apache.cassandra.thrift.Column c 
       = new org.apache.cassandra.thrift.Column(); 
     c.setName(ByteBufferUtil.bytes("count"); 
     c.setValue(ByteBufferUtil.bytes(sum)); 
     c.setTimestamp(System.currentTimeMillis()); 

     Mutation mutation = new Mutation(); 
     mutation.setColumn_or_supercolumn(new ColumnOrSuperColumn()); 
     mutation.column_or_supercolumn.setColumn(c); 

     ByteBuffer keyByteBuffer = ByteBufferUtil.bytes(word.toString()); 
     context.write(keyByteBuffer, Collections.singletonList(mutation)); 
    } 
} 

내가 여분의 열을 추가하려면, 그럼 난 그냥 reduce하여 List<Mutation> 이미 출력되는 또 다른 돌연변이를 추가해야하지만, 복합 기본 키의 새 열이있는 테이블로 출력하는 방법을 알아낼 수 없습니다. 예를 들어이 표는 위의 표와 동일하지만 단어를 발행 시간과 함께 색인화합니다.

CREATE TABLE word_count(
     word text, 
     publication_hour bigint, 
     count int, 
     PRIMARY KEY(word, publication_hour) 
    ) WITH COMPACT STORAGE; 

나는 출력하려고처럼, 몇 가지 다른 방법을 시도했습니다 사용자 정의 WritableComparableclassmethod 서명하고 이에 따라 job 구성을 갱신 (즉 단어와 시간을 모두 보유),하지만 그 reduce 던져합니다 a ClassCastExceptionWritableComparableByteBuffer으로 캐스팅하려고 시도 할 때

적절한 열 이름을 Builder과 함께 구축하려고 시도했습니다.

public static class ReducerToCassandra 
    //    MappedKey  MappedValue ReducedKey ReducedValues 
    extends Reducer<WordHourPair, IntWritable, ByteBuffer, List<Mutation>> 
{ 
    //     MappedKey     Values with the key wordHourPair 
    public void reduce(WordHourPair wordHourPair, Iterable<IntWritable> values, 
    Context context) 
     throws IOException, InterruptedException 
    { 
     int sum = 0; 
     for (IntWritable val : values){ 
     sum += val.get(); 
     } 
     long hour = wordHourPair.getHourLong(); 

     org.apache.cassandra.thrift.Column c 
      = new org.apache.cassandra.thrift.Column(); 
     c.setName(ByteBufferUtil.bytes("count"); 
     c.setValue(ByteBufferUtil.bytes(sum)); 
     c.setTimestamp(System.currentTimeMillis()); 

     Mutation mutation = new Mutation(); 
     mutation.setColumn_or_supercolumn(new ColumnOrSuperColumn()); 
     mutation.column_or_supercolumn.setColumn(c); 

     //New Code 
     List<AbstractType<?>> keyTypes = new ArrayList<AbstractType<?>>(); 
     keyTypes.add(UTF8Type.instance); 
     keyTypes.add(LongType.instance); 
     CompositeType compositeKey = CompositeType.getInstance(keyTypes); 

     Builder builder = new Builder(compositeKey); 
     builder.add(ByteBufferUtil.bytes(word.toString()); 
     builder.add(ByteBufferUtil.bytes(hour)); 

     ByteBuffer keyByteBuffer = builder.build(); 
     context.write(keyByteBuffer, Collections.singletonList(mutation)); 
    } 
} 

하지만 그를 슬로우 IOException

java.io.IOException: InvalidRequestException(why:String didn't validate.) 
    at org.apache.cassandra.hadoop.ColumnFamilyRecordWriter$RangeClient.run(ColumnFamilyRecordWriter.java:204) 
Caused by: InvalidRequestException(why:String didn't validate.) 
    at org.apache.cassandra.thrift.Cassandra$batch_mutate_result$batch_mutate_resultStandardScheme.read(Cassandra.java:28232) 
    at org.apache.cassandra.thrift.Cassandra$batch_mutate_result$batch_mutate_resultStandardScheme.read(Cassandra.java:28218) 
    at org.apache.cassandra.thrift.Cassandra$batch_mutate_result.read(Cassandra.java:28152) 
    at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78) 
    at org.apache.cassandra.thrift.Cassandra$Client.recv_batch_mutate(Cassandra.java:1069) 
    at org.apache.cassandra.thrift.Cassandra$Client.batch_mutate(Cassandra.java:1055) 
    at org.apache.cassandra.hadoop.ColumnFamilyRecordWriter$RangeClient.run(ColumnFamilyRecordWriter.java:196) 

이 질문 : Cassandra CQL3 composite key not written by Hadoop reducer 내가 찾고 있어요 코드의 종류를 전시 보이지만 유형 HashMap, ByteBuffer의 파라미터를 가지는 context.write를 호출하고 아니에요 context.write에 해당 매개 변수를 허용하는 방법을 알려주세요.

내 테이블에 원하는 데이터 (워드 시간 키, int 값)를 얻으려면 어떻게해야합니까?

답변

1

해답은 Thrift API가 아닌 Cassandra의 CQL 인터페이스를 사용하는 것이 었습니다.

이제 reduce 클래스의 출력 키/값 클래스를 "Map, List"로 선언 한 다음 복합 키에 대한 Map을 작성하여 복합 키를 사용하여 테이블에 쓸 수 있습니다.)은 열 이름이고 ByteBuffer 유형의 Value는 ByteBufferUtil을 사용하여 ByteBuffer로 변환 된 열 값입니다.

예를 들어, 같은 정의 된 테이블에 쓰기 :

CREATE TABLE foo (
    customer_id uuid, 
    time timestamp, 
    my_value int, 
    PRIMARY KEY (customer_id, time) 
) 

내가 쓸 수 있습니다 :

String customerID = "the customer's id"; 
long time = DateTime.now().getMillis(); 
int myValue = 1; 

Map<String, ByteBuffer> key = new Map<String, ByteBuffer>(); 
key.put("customer_id",ByteBufferUtil.bytes(customerID)); 
key.put("time",ByteBufferUtil.bytes(time)); 

List<ByteBuffer> values = Collections.singletonList(ByteBufferUtil.bytes(myValue)); 

context.write(key, values); 
관련 문제