3.2.4.1 手动写入被叫信息
类: HBaseConsumer2
package com.atguigu.dataconsumer;
import com.atguigu.dataconsumer.util.HBaseUtil;
import com.atguigu.dataconsumer.util.PropertyUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
public class HBaseConsumer2 {
public static void main(String[] args) throws InterruptedException {
final HBaseDao hBaseDao = new HBaseDao();
HBaseConsumer2 hBaseConsumer = new HBaseConsumer2();
hBaseConsumer.readDataFromKafka(value -> {
System.out.println(Bytes.toString(HBaseUtil.getRowKey(value)));
hBaseDao.put(value);
String[] split = value.split(",");
String temp = split[0];
split[0] = split[2];
split[2] = temp;
split[4] = "1";
hBaseDao.put(split[0] + "," + split[1] + "," + split[2] + "," + split[3] + "," + split[4]);
});
}
public void readDataFromKafka(CallBack<String> callback) throws InterruptedException {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(PropertyUtil.properties);
consumer.subscribe(Collections.singletonList(PropertyUtil.getProperty("kafka.topics")));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
if (callback != null) {
callback.call(record.value());
}
Thread.sleep(100);
}
}
}
public static interface CallBack<T> {
public abstract void call(T t);
}
}
类: QueryWithStartRowStopRow
package com.atguigu.dataconsumer.query;
import com.atguigu.dataconsumer.util.HBaseUtil;
import com.atguigu.dataconsumer.util.PropertyUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.text.DecimalFormat;
public class QueryWithStartRowStopRow {
public static void main(String[] args) throws IOException {
DecimalFormat formatter = new DecimalFormat("0000");
String startTime = "2018-01";
String endTime = "2018-12";
String phone = "16480981069";
int regions = Integer.parseInt(PropertyUtil.getProperty("hbase.regions"));
Table table = HBaseUtil.getTable(PropertyUtil.getProperty("hbase.table.name"));
for (int i = 0; i < regions; i++) {
String start = formatter.format(i) + "_" + phone + "_" + startTime;
String end = formatter.format(i) + "_" + phone + "_" + endTime;
Scan scan = new Scan(Bytes.toBytes(start), Bytes.toBytes(end));
ResultScanner results = table.getScanner(scan);
for (Result result : results) {
System.out.println(Bytes.toString(result.getRow()));
}
}
}
}
注意:
[startRowKew,stopRowKey)
是前闭后开的区间