Flink开发-Hive数据导入HBase中
2023-06-13 09:16:49 时间
正文
依赖
<!--JSON工具-->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.22</version>
</dependency>
<!--操作Hbase-->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.1.10</version>
</dependency>
<!--Hive JDBC-->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>2.1.0</version>
</dependency>
主类
import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Hive2Hbase {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<JSONObject> hbaseData = env.addSource(new HiveReader());
hbaseData.addSink(new HbaseWriter());
hbaseData.print();
env.execute("Flink write data to hbase");
}
}
读取Hive
import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
class HiveReader extends RichSourceFunction<JSONObject> {
private transient Statement st = null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Class.forName("org.apache.hive.jdbc.HiveDriver");
Connection con = DriverManager.getConnection("jdbc:hive2://192.168.7.101:10000/default", "hive", "hive");
st = con.createStatement();
}
@Override
public void run(SourceContext<JSONObject> ctx) throws Exception {
ResultSet rs = st.executeQuery("select * from t_user");
while (rs.next()) {
Integer id = rs.getInt("id");
String name = rs.getString("name");
JSONObject json = new JSONObject();
json.put("rowKey", id);
json.put("name", name);
ctx.collect(json);
}
//rs.close();
//st.close();
//con.close();
}
@Override
public void cancel() {
}
}
写入HBase
import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.Random;
class HbaseWriter extends RichSinkFunction<JSONObject> {
private Table queryListTable = null;
private Random rnd = new Random();
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "hadoop01,hadoop02,hadoop03");
conf.set("hbase.zookeeper.property.clientPort", "2181");
// 线程池,性能未知
org.apache.hadoop.hbase.client.Connection connection = ConnectionFactory.createConnection(conf);
queryListTable = connection.getTable(TableName.valueOf("zdb", "tuser"));
}
@Override
public void close() throws Exception {
super.close();
}
@Override
public void invoke(JSONObject json, Context context) throws Exception {
try {
System.out.println("json = " + json);
String rowKey = json.getString("rowKey");
String name = json.getString("name");
System.out.println("rowKey:" + rowKey);
System.out.println("name:" + name);
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes("name"), Bytes.toBytes(""), Bytes.toBytes(name));
queryListTable.put(put);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
Hbase
删除命名空间下的表
disable 'zdb:tuser'
drop 'zdb:tuser'
创建表
create 'zdb:tuser','name'
查看表
describe 'zdb:tuser'
插入数据
put 'zdb:tuser','100','name','LiYing'
查询数据
get 'zdb:tuser','100'
get 'zdb:tuser','100','name'
scan 'zdb:tuser'
scan 'zdb:tuser', {FORMATTER => 'toString'}
相关文章
- HBase常见面试题[通俗易懂]
- C#用什么开发_hbase写数据流程
- HBase 快速入门(安装和命令操作)
- NoSql数据库之Hbase详解数据库
- Hbase(二)hbase建表详解大数据
- Hbase(六) hbase Java API详解大数据
- HBase中的HMaster、HRegionServer、Zookeeper详解大数据
- HBase笔记详解大数据
- 剖析HBase负载均衡和性能指标详解大数据
- kafka+storm+hbase详解编程语言
- MySQL数据导入HBase:构建NoSQL数据库(mysql导入hbase)
- 比较两者:HBase vs MySQL(hbase和mysql)
- 比较:HBase与MongoDB的优劣(hbase和mongodb)
- HBase与Oracle 比较两款数据库的优缺点(hbase和oracle)
- HBase和MySQL:如何选择适合你的数据库?(hbasemysql)
- 比较:MYSQL与HBASE 数据库管理系统的异同(mysql与hbase)
- 运用Redis与HBase大幅度提升MongoDB性能(redishbase)