如何在FlinkSQL中使用Elasticsearch连接器?

提问者:帅平 问题分类:面试刷题
如何在FlinkSQL中使用Elasticsearch连接器?
2 个回答
一闪一闪亮晶晶べ
一闪一闪亮晶晶べ
具体步骤如下:
1、引入依赖
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-elasticsearch7</artifactId>
  <version>${flink.version}</version>
</dependency>
2、创建elasticsearch表对应的flink表
CREATE TABLE es_table (
  id STRING,
  name STRING,
  timestamp TIMESTAMP(3),
  price DOUBLE,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'elasticsearch-7',  -- 指定Elasticsearch版本
  'hosts' = 'http://localhost:9200', -- Elasticsearch集群地址
  'index' = 'flink_index',          -- 目标索引名称
  'document-type' = '_doc',         -- 文档类型(Elasticsearch 7+ 默认为_doc)
  'schema.ignore' = 'true'          -- 忽略字段类型不匹配的警告
);
发布于:1周前 (05-28) IP属地:
与你无关
与你无关
3、写入数据到es
INSERT INTO es_table
VALUES 
  ('1', 'item1', TIMESTAMP '2023-10-01 12:00:00', 99.9),
  ('2', 'item2', TIMESTAMP '2023-10-01 12:05:00', 199.9);
4、查询数据
--直接查询
SELECT id, name, price
FROM es_table
WHERE price > 100;
-- 适用elasticsearch dsl查询
SELECT *
FROM es_table
WHERE SEARCH(
  '{
    "query": {
      "range": {
        "price": { "gte": 100 }
      }
    }
  }'
);
发布于:1周前 (05-28) IP属地:
我来回答