目录
1.在HBase中创建表
2.写入API
2.1普通模式写入hbase(逐条写入)
2.2普通模式写入hbase(buffer写入)
2.3设计模式写入hbase(buffer写入)
3.HBase表映射至Hive中
1.在HBase中创建表
hbase(main):003:0> create_namespace ‘events_db’
hbase(main):004:0> create ‘events_db:users’,’profile’,’region’,’registration’
hbase(main):005:0> create ‘events_db:user_friend’,’uf’
hbase(main):006:0> create ‘events_db:events’,’schedule’,’location’,’creator’,’remark’
hbase(main):007:0> create ‘events_db:event_attendee’,’euat’
hbase(main):008:0> create ‘events_db:train’,’eu’
hbase(main):011:0> list_namespace_tables ‘events_db’
TABLE
event_attendee
events
train
user_friend
users
5 row(s)
2.写入API
2.1普通模式写入hbase(逐条写入)
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.HConstants;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.Connection;import org.apache.hadoop.hbase.client.ConnectionFactory;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.Table;import org.apache.hadoop.hbase.util.Bytes;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.serialization.StringDeserializer;import java.io.IOException;import java.time.Duration;import java.util.ArrayList;import java.util.Collections;import java.util.Properties;/** * 将Kafka中的topic为userfriends中的数据消费到hbase中 * hbase中的表为events_db:user_friend */public class UserFriendToHB {static int num = 0; //计数器public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kb129:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");properties.put(ConsumerConfig.GROUP_ID_CONFIG, "user_friend_group1");KafkaConsumer consumer = new KafkaConsumer(properties);consumer.subscribe(Collections.singleton("userfriends"));//配置hbase信息,连接hbase数据库Configuration conf = HBaseConfiguration.create();conf.set(HConstants.HBASE_DIR, "hdfs://kb129:9000/hbase");conf.set(HConstants.ZOOKEEPER_QUORUM, "kb129");conf.set(HConstants.CLIENT_PORT_STR, "2181");Connection connection = null;try {connection = ConnectionFactory.createConnection(conf);Table ufTable = connection.getTable(TableName.valueOf("events_db:user_friend"));ArrayList datas = new ArrayList();while (true){ConsumerRecords poll = consumer.poll(Duration.ofMillis(100));//每次for循环前清空datasdatas.clear();for (ConsumerRecord record : poll) {//System.out.println(record.value());String[] split = record.value().split(",");int i = (split[0] + split[1]).hashCode();Put put = new Put(Bytes.toBytes(i));put.addColumn(Bytes.toBytes("uf"), Bytes.toBytes("userid"), split[0].getBytes());put.addColumn("uf".getBytes(), "friend".getBytes(),split[1].getBytes());datas.add(put);}num = num + datas.size();System.out.println("---------num:" + num);if (datas.size() > 0){ufTable.put(datas);}try {Thread.sleep(10);} catch (InterruptedException e) {throw new RuntimeException(e);}}} catch (IOException e) {throw new RuntimeException(e);}}}
2.2普通模式写入hbase(buffer写入)
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.HConstants;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.*;import org.apache.hadoop.hbase.util.Bytes;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.serialization.StringDeserializer;import java.io.IOException;import java.time.Duration;import java.util.ArrayList;import java.util.Collections;import java.util.Properties;/** * 将Kafka中的topic为userfriends中的数据消费到hbase中 * hbase中的表为events_db:user_friend */public class UserFriendToHB2 {static int num = 0; //计数器public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kb129:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");properties.put(ConsumerConfig.GROUP_ID_CONFIG, "user_friend_group1");KafkaConsumer consumer = new KafkaConsumer(properties);consumer.subscribe(Collections.singleton("userfriends"));//配置hbase信息,连接hbase数据库Configuration conf = HBaseConfiguration.create();conf.set(HConstants.HBASE_DIR, "hdfs://kb129:9000/hbase");conf.set(HConstants.ZOOKEEPER_QUORUM, "kb129");conf.set(HConstants.CLIENT_PORT_STR, "2181");Connection connection = null;try {connection = ConnectionFactory.createConnection(conf);BufferedMutatorParams bufferedMutatorParams = new BufferedMutatorParams(TableName.valueOf("events_db:user_friend"));bufferedMutatorParams.setWriteBufferPeriodicFlushTimeoutMs(10000);//设置超时flush时间最大值bufferedMutatorParams.writeBufferSize(10*1024*1024);//设置缓存大小flushBufferedMutator bufferedMutator = connection.getBufferedMutator(bufferedMutatorParams) ;ArrayList datas = new ArrayList();while (true){ConsumerRecords poll = consumer.poll(Duration.ofMillis(100));datas.clear();//每次for循环前清空datasfor (ConsumerRecord record : poll) {//System.out.println(record.value());String[] split = record.value().split(",");int i = (split[0] + split[1]).hashCode();Put put = new Put(Bytes.toBytes(i));put.addColumn(Bytes.toBytes("uf"), Bytes.toBytes("userid"), split[0].getBytes());put.addColumn("uf".getBytes(), "friend".getBytes(),split[1].getBytes());datas.add(put);}num = num + datas.size();System.out.println("---------num:" + num);if (datas.size() > 0){bufferedMutator.mutate(datas);}}} catch (IOException e) {throw new RuntimeException(e);}}}
2.3设计模式写入hbase(buffer写入)
(1)Iworker接口
public interface IWorker {void fillData(String targetName);}
(2)worker实现类
import nj.zb.kb23.kafkatohbase.oop.writer.IWriter;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;import java.util.Collections;import java.util.Properties;public class Worker implements IWorker {private KafkaConsumer consumer = null;private IWriter writer = null;public Worker(String topicName, String consumerGroupId, IWriter writer) {this.writer = writer;Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kb129:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);consumer = new KafkaConsumer(properties);consumer.subscribe(Collections.singleton(topicName));}@Overridepublic void fillData(String targetName) {int num = 0;while (true) {ConsumerRecords records = consumer.poll(Duration.ofMillis(100));int returnNum = writer.write(targetName, records);num += returnNum;System.out.println("---------num:" + num);}}}
(3)IWriter接口
import org.apache.kafka.clients.consumer.ConsumerRecords;/** * 完成kafka消费出的数据ConsumerRecords 的组装和写入到指定类型的数据库 指定table 的工作 */public interface IWriter {int write(String targetTableName, ConsumerRecords records);}
(4)writer实现类
import nj.zb.kb23.kafkatohbase.oop.handler.IParseRecord;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.HConstants;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.*;import org.apache.kafka.clients.consumer.ConsumerRecords;import java.io.IOException;import java.util.List;public class HBaseWriter implements IWriter{private Connection connection = null;private BufferedMutator bufferedMutator = null;private IParseRecord handler = null;/** * 初始化HBaseWriter对象 */public HBaseWriter(IParseRecord handler) {this.handler = handler;Configuration conf = HBaseConfiguration.create();conf.set(HConstants.HBASE_DIR, "hdfs://kb129:9000/hbase");conf.set(HConstants.ZOOKEEPER_QUORUM, "kb129");conf.set(HConstants.CLIENT_PORT_STR, "2181");try {connection = ConnectionFactory.createConnection(conf);} catch (IOException e) {throw new RuntimeException(e);}}private void getBufferedMutator(String targetTableName){BufferedMutatorParams bufferedMutatorParams = new BufferedMutatorParams(TableName.valueOf(targetTableName));bufferedMutatorParams.setWriteBufferPeriodicFlushTimeoutMs(10000);//设置超时flush时间最大值bufferedMutatorParams.writeBufferSize(10*1024*1024);//设置缓存大小flushif (bufferedMutator == null){try {bufferedMutator = connection.getBufferedMutator(bufferedMutatorParams);} catch (IOException e) {throw new RuntimeException(e);}}}@Overridepublic int write(String targetTableName, ConsumerRecords records) {if (records.count() > 0) {this.getBufferedMutator(targetTableName);List datas = handler.parse(records);try {bufferedMutator.mutate(datas);} catch (IOException e) {throw new RuntimeException(e);}return datas.size();}else {return 0;}}}
(5)IParseRecord接口
import org.apache.hadoop.hbase.client.Put;import org.apache.kafka.clients.consumer.ConsumerRecords;import java.util.List;/** * 将record 装配成 put */public interface IParseRecord {List parse(ConsumerRecords records);}
(6)具体表对应的handler类(包装Put)
UsersHandler
import org.apache.hadoop.hbase.client.Put;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import java.util.ArrayList;import java.util.List;public class UsersHandler implements IParseRecord{List datas = new ArrayList();@Overridepublic List parse(ConsumerRecords records) {datas.clear();for (ConsumerRecord record : records) {String[] users = record.value().split(",");Put put = new Put(users[0].getBytes());put.addColumn("profile".getBytes(), "birthyear".getBytes(), users[2].getBytes());put.addColumn("profile".getBytes(), "gender".getBytes(), users[3].getBytes());put.addColumn("region".getBytes(), "locale".getBytes(), users[1].getBytes());if (users.length > 4){put.addColumn("registration".getBytes(), "joinedAt".getBytes(), users[4].getBytes());}if (users.length > 5){put.addColumn("region".getBytes(), "location".getBytes(), users[5].getBytes());}if (users.length > 6){put.addColumn("region".getBytes(), "timezone".getBytes(), users[6].getBytes());}datas.add(put);}return datas;}}
TrainHandler
import org.apache.hadoop.hbase.client.Put;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import java.util.ArrayList;import java.util.List;public class TrainHandler implements IParseRecord{List datas = new ArrayList();@Overridepublic List parse(ConsumerRecords records) {datas.clear();for (ConsumerRecord record : records) {String[] trains = record.value().split(",");double random = Math.random();Put put = new Put((trains[0]+trains[1]+random).getBytes());put.addColumn("eu".getBytes(), "user".getBytes(), trains[0].getBytes());put.addColumn("eu".getBytes(), "event".getBytes(), trains[1].getBytes());put.addColumn("eu".getBytes(), "invited".getBytes(), trains[2].getBytes());put.addColumn("eu".getBytes(), "timestamp".getBytes(), trains[3].getBytes());put.addColumn("eu".getBytes(), "interested".getBytes(), trains[4].getBytes());put.addColumn("eu".getBytes(), "not_interested".getBytes(), trains[5].getBytes());datas.add(put);}return datas;}}
EventsHandler
import org.apache.hadoop.hbase.client.Put;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import java.util.ArrayList;import java.util.List;public class EventsHandler implements IParseRecord {List datas = new ArrayList();@Overridepublic List parse(ConsumerRecords records) {datas.clear();for (ConsumerRecord record : records) {String[] events = record.value().split(",");Put put = new Put(events[0].getBytes());put.addColumn("creator".getBytes(), "user_id".getBytes(),events[1].getBytes());put.addColumn("schedule".getBytes(), "start_time".getBytes(),events[2].getBytes());put.addColumn("location".getBytes(), "city".getBytes(),events[3].getBytes());put.addColumn("location".getBytes(), "state".getBytes(),events[4].getBytes());put.addColumn("location".getBytes(), "zip".getBytes(),events[5].getBytes());put.addColumn("location".getBytes(), "country".getBytes(),events[6].getBytes());put.addColumn("location".getBytes(), "lat".getBytes(),events[7].getBytes());put.addColumn("location".getBytes(), "lng".getBytes(),events[8].getBytes());put.addColumn("remark".getBytes(), "common_words".getBytes(),events[9].getBytes());datas.add(put);}return datas;}}
EventAttendHandler
import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.util.Bytes;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import java.util.ArrayList;import java.util.List;public class EventAttendHandler implements IParseRecord{@Overridepublic List parse(ConsumerRecords records) {List datas = new ArrayList();for (ConsumerRecord record : records) {String[] splits = record.value().split(",");Put put = new Put((splits[0] + splits[1] + splits[2]).getBytes());put.addColumn(Bytes.toBytes("euat"), Bytes.toBytes("eventid"), splits[0].getBytes());put.addColumn("euat".getBytes(), "friendid".getBytes(),splits[1].getBytes());put.addColumn("euat".getBytes(), "state".getBytes(),splits[2].getBytes());datas.add(put);}return datas;}}
(7)主程序
import nj.zb.kb23.kafkatohbase.oop.handler.*;import nj.zb.kb23.kafkatohbase.oop.worker.Worker;import nj.zb.kb23.kafkatohbase.oop.writer.HBaseWriter;import nj.zb.kb23.kafkatohbase.oop.writer.IWriter;/** * 将Kafka中的topic为...中的数据消费到hbase中 * hbase中的表为events_db:... */public class KfkToHbTest {static int num = 0; //计数器public static void main(String[] args) {//IParseRecord handler = new EventAttendHandler();//IWriter writer = new HBaseWriter(handler);//String topic = "eventattendees";//String consumerGroupId = "eventattendees_group1";//String targetName = "events_db:event_attendee";//Worker worker = new Worker(topic, consumerGroupId, writer);//worker.fillData(targetName);/*EventsHandler eventsHandler = new EventsHandler();IWriter writer = new HBaseWriter(eventsHandler);Worker worker = new Worker("events", "events_group1", writer);worker.fillData("events_db:eventsb");*//*UsersHandler usersHandler = new UsersHandler();IWriter writer = new HBaseWriter(usersHandler);Worker worker = new Worker("users_raw", "users_group1", writer);worker.fillData("events_db:users");*/TrainHandler trainHandler = new TrainHandler();IWriter writer = new HBaseWriter(trainHandler);Worker worker = new Worker("train", "train_group1", writer);worker.fillData("events_db:train2");}}
3.HBase表映射至Hive中
create database if not exists events;use events;create external table hb_users(userId string,birthyear int,gender string,locale string,location string,timezone string,joinedAt string)stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'with SERDEPROPERTIES ('hbase.columns.mapping'=':key,profile:birthyear,profile:gender,region:locale,region:location,region:timezone,registration:joinedAt')tblproperties ('hbase.table.name'='events_db:users');select * from hb_users limit 3;select count(1) from hb_users;--orc格式创建内部表存储映射外部表,安全保存数据,创建好可以直接删除hbase中的表create table users stored as orc as select * from hb_users;select * from users limit 3;select count(1) from users;drop table hb_users;--382091494select count(*) from users where birthyear is null;select round(avg(birthyear), 0) from users;select `floor`(avg(birthyear)) from users;-- 处理空字段,覆盖写入withtb as ( select `floor`(avg(birthyear)) avgAge from users ),tb2 as ( select userId, nvl(birthyear, tb.avgAge),gender,locale,location,timezone,joinedAt from users,tb)insert overwrite table usersselect * from tb2;-- 查询到性别中空字符串109个select count(gender) count from users where gender is null or gender = "";--------------------------------------------------------create external table hb_events(event_id string,user_id string,start_time string,city string,state string,zip string,country string,lat float,lng float,common_words string)stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'with SERDEPROPERTIES ('hbase.columns.mapping'=':key,creator:user_id,schedule:start_time,location:city,location:state,location:zip,location:country,location:lat,location:lng,remark:common_words')tblproperties ('hbase.table.name'='events_db:events');select * from hb_events limit 10;create table events stored as orc as select * from hb_events;select count(*) from hb_events;select count(*) from events;drop table hb_events;select event_id from events group by event_id having count(event_id) >1;withtb as (select event_id, row_number() over (partition by event_id) rn from events)select event_id from tb where rn > 1;select user_id, count(event_id) num from events group by user_id order by num desc;-----------------------------------------------------create external table if not exists hb_user_friend(row_key string,userid string,friendid string)stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'with SERDEPROPERTIES ('hbase.columns.mapping'=':key,uf:userid,uf:friend')tblproperties ('hbase.table.name'='events_db:user_friend');select * from hb_user_friend limit 3;create table user_friend stored as orc as select * from hb_user_friend;select count(*) from hb_user_friend;select count(*) from user_friend;drop table hb_user_friend;-----------------------------------------------------------create external table if not exists hb_event_attendee(row_key string,eventid string,friendid string,attendtype string)stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'with SERDEPROPERTIES ('hbase.columns.mapping'=':key,euat:eventid,euat:friendid,euat:state')tblproperties ('hbase.table.name'='events_db:event_attendee');select * from hb_event_attendee limit 3;select count(*) from hb_event_attendee;create table event_attendee stored as orc as select * from hb_event_attendee;select * from event_attendee limit 3;select count(*) from event_attendee;drop table hb_event_attendee;--------------------------------------------------------------create external table if not exists hb_train(row_key string,userid string,eventid string,invited string,`timestamp` string,interested string)stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'with SERDEPROPERTIES ('hbase.columns.mapping'=':key,eu:user,eu:event,eu:invited,eu:timestamp,eu:interested')tblproperties ('hbase.table.name'='events_db:train');select * from hb_train limit 3;select count(*) from hb_train;create table train stored as orc as select * from hb_train;select * from train limit 3;select count(*) from train;drop table hb_train;-----------------------------------------------create external table locale(locale_id int,locale string)row format delimited fields terminated by '\t'location '/events/data/locale';select * from locale;create external table time_zone(time_zone_id int,time_zone string)row format delimited fields terminated by ','location '/events/data/timezone';select * from time_zone;