接上篇《构建一个完整的数据仓库(十二)doris进阶之导入数据的方式(上)》,我们介绍了把数据导入到doris的几种方法,这一篇我们继续接着讲。
前提提示:
之前我们安装的doris环境是在本地使用的服务器安装,但是本地服务器由于是内部的开发服务器,存在的冲突比较多,这里我们在腾讯云上临时用了一台服务器,安装了一个单机的doris。做后期介绍的演示。
一、kafka导入到doris
上一篇文章我们介绍了kafka导入到doris,但是我们在生产的消息里面是基于text的,在生产环境中我们一般肯定不是直接使用text文本的方式来发送一条条的消息的,而是使用json的方式,所以这里我们在演示一遍,使用json格式,从kafka导入数据到doris。
1.1、创建一张student的表
CREATE TABLE `student` ( `id` bigint(20) NULL COMMENT "id", `name` varchar(20) NULL COMMENT "姓名", `age` TINYINT NULL COMMENT "年龄", `class` varchar(20) NULL COMMENT "班级" ) ENGINE=OLAP DUPLICATE KEY(`id`,`name`,`age`,`class`) COMMENT "OLAP" DISTRIBUTED BY HASH(`id`) BUCKETS 1 PROPERTIES ( "replication_allocation" = "tag.location.default: 1", "in_memory" = "false", "storage_format" = "V2" )
1.2、然后我们去kafka手动创建一个topic
在创建routine load之前,由于kafka里面没有对应的topic队列,所以我们需要手动创建一下。不然会报错。
kafka-topics.sh --create --bootstrap-server 10.206.0.14:9092 --replication-factor 1 --partitions 1 --topic stu
1.3、创建一个routine load,指定使用json格式
CREATE ROUTINE LOAD test1.stujob ON student COLUMNS(id,name,age,class) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false", "format" = "json" )FROM KAFKA ( "kafka_broker_list"= "10.206.0.14:9092", "kafka_topic" = "stu", "property.group.id"="stu", "property.kafka_default_offsets" = "OFFSET_BEGINNING", "property.enable.auto.commit"="true" );
然后我们看下创建的routine engine状态是什么
show routine load;
状态没有问题,然后我们插入一条json数据:
kafka-console-producer.sh --broker-list 10.206.0.14:9092 --topic stu {"id":1,"name":"张三","age":12,"class":"6.1班"}
然后我们去doris里面查询下
select * from student;
大功告成。
在这里额外说下,doris的routine load导入数据的效率非常高,我们生产环境20多个字段,一次性导入3000W+的数据,只需要1分钟左右的时间。大家有机会可以多试验下。
二、通过挂载外部表的方式,查询数据并插入doris
这种方式对于熟悉大数据开发的同学来说不陌生,这里的意思是,比如doris我们可以直接在创建表的时候,使用外部挂载,例如:mysql,oracle,postgresql,数据是保存在外部的数据库上的,但是我们可以直接使用doris进行查询,同时可以使用insert int table select 。。。的方式,把数据存入doris中。下面做个测试案例:
1)先使用docker部署一个简易的mysql
docker pull mysql:8.0 docker run -itd --name mysql-8 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 mysql:8.0
2)在mysql中创建一个库test,然后导入一点数据。
create database test1;
导入的数据: 数据下载
3)在所有安装be节点的服务器上安装mysql的odbc驱动。
这里我们是centos7,所以我们要下载centos7版本的驱动
wget yum localinstall mysql-connector-odbc-8.0.28-1.el7.x86_64.rpm -y
执行完毕后,我们可以看到有一个配置文件:/etc/odbcinst.ini,这个配置文件里面都是配置好的,我们不需要做其他操作。
接下来我们要创建一个配置文件:vim /etc/odbc.ini
[mysql] Description = Data source MySQL Driver = MySQL ODBC 8.0 Driver Server = 10.206.0.14 Host = 10.206.0.14 Database = test Port = 3306 User = root Password = 123456
上面的信息根据自己的服务器修改。
4)接着我们创建外部表
CREATE EXTERNAL TABLE doris_organization ( id bigint COMMENT 'id', name varchar(255) COMMENT '名称', parent_id bigint COMMENT '父级节点', path varchar(255) COMMENT '路径', level int COMMENT '层级', address varchar(200) COMMENT '详细地址', sort int COMMENT '排序', code varchar(50) COMMENT '地址编码code', created_user bigint COMMENT '创建人', created_at datetime COMMENT '创建时间', del_flag tinyint COMMENT '删除标志', remarks varchar DEFAULT '' COMMENT '备注', update_user bigint COMMENT '更新人', updated_at datetime COMMENT '更新时间', customer_service_tel varchar(255) DEFAULT '' COMMENT '客服电话' ) ENGINE = ODBC PROPERTIES ( "host" = "10.206.0.14", "port" = "3306", "user" = "root", "password" = "123456", "database" = "test", "table" = "org_organization", "driver" = "MySQL ODBC 8.0 Unicode Driver", --注意这里的名称和odbcinst.ini里的mysql[]里的名称一致 "odbc_type" = "mysql" );
然后我们查询一下:
select count(1) from doris_organization;
这里我们就可以查到外部的挂载表了,然后如果需要导入数据的话,就创建新的表,然后通过insert into table select ...的方式导入数据。
在使用外部表的时候,我们演示的是mysql,所以这里需要注意下:
1、mysql的版本要是8.0及以上,因为下载的驱动是8.0的,5.x的我没找到,没有试验过。 2、mysql数据库及表的字符集一定要是用UTF8,不要使用UTF8mb4,目前doris ODBC外表只支持UTF8编码 3、当前ODBC支持ANSI 与 Unicode 两种Driver形式,当前Doris只支持Unicode Driver。如果强行使用ANSI Driver可能会导致查询结果出错。
好了,我们今天就暂时介绍到这里,应该在不久我们还会再出一个下的版本,说明其他的导入方式。敬请期待。
还没有评论,来说两句吧...