2 个回答
具体步骤如下:
1、引入依赖
1、引入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7</artifactId>
<version>${flink.version}</version>
</dependency>
2、创建elasticsearch表对应的flink表CREATE TABLE es_table (
id STRING,
name STRING,
timestamp TIMESTAMP(3),
price DOUBLE,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7', -- 指定Elasticsearch版本
'hosts' = 'http://localhost:9200', -- Elasticsearch集群地址
'index' = 'flink_index', -- 目标索引名称
'document-type' = '_doc', -- 文档类型(Elasticsearch 7+ 默认为_doc)
'schema.ignore' = 'true' -- 忽略字段类型不匹配的警告
);
发布于:1周前 (05-28) IP属地:
3、写入数据到es
INSERT INTO es_table
VALUES
('1', 'item1', TIMESTAMP '2023-10-01 12:00:00', 99.9),
('2', 'item2', TIMESTAMP '2023-10-01 12:05:00', 199.9);
4、查询数据--直接查询
SELECT id, name, price
FROM es_table
WHERE price > 100;
-- 适用elasticsearch dsl查询
SELECT *
FROM es_table
WHERE SEARCH(
'{
"query": {
"range": {
"price": { "gte": 100 }
}
}
}'
);
发布于:1周前 (05-28) IP属地:
我来回答
您需要 登录 后回答此问题!