使用dinky编写flinksql把数据同步到elasticsearch中报错,提示:org.apache.flink.table.types.logical.utils.LogicalTypeChe
使用dinky编写flinksql把数据同步到elasticsearch中报错,提示:
请问怎么办?
Caused by: org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.users_sink'.
Table options are:
'connector'='elasticsearch-7'
'hosts'='http://192.168.31.254:9200'
'index'='users'
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:322)
at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:454)
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:231)
at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate(PlannerBase.scala:181)
at scala.collection.TraversableLike.$anonfun$map(TraversableLike.scala:286)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:181)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1277)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:862)
at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:109)
at org.dinky.executor.Executor.executeStatementSet(Executor.java:293)
at org.dinky.job.builder.JobTransBuilder.processWithoutGateway(JobTransBuilder.java:116)
at org.dinky.job.builder.JobTransBuilder.handleStatementSet(JobTransBuilder.java:84)
at org.dinky.job.builder.JobTransBuilder.run(JobTransBuilder.java:70)
at org.dinky.job.JobManager.executeSql(JobManager.java:341)
... 137 more
Caused by: java.lang.NoSuchMethodError: org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot(Lorg/apache/flink/table/types/logical/LogicalType;Lorg/apache/flink/table/types/logical/LogicalTypeRoot;)Z
at org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchValidationUtils.lambda$null```CODE```(ElasticsearchValidationUtils.java:73)
at java.util.stream.ReferencePipeline.accept(ReferencePipeline.java:193)
at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchValidationUtils.lambda$validatePrimaryKey(ElasticsearchValidationUtils.java:84)
at java.util.Optional.ifPresent(Optional.java:159)
at org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchValidationUtils.validatePrimaryKey(ElasticsearchValidationUtils.java:63)
at org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7DynamicSinkFactory.createDynamicTableSink(Elasticsearch7DynamicSinkFactory.java:84)
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:319)
... 159 more
请问怎么办?
发布于:1年前 (2024-03-20) IP属地:未知
2 个回答
这是由于flink-connector-elasticsearch依赖包中的flink版本与dinky对应的flink版本不一致,去如下的地方:
把源码下载下来,然后把pom.xml里面的flink版本号修改为finky对应的flink版本号,然后重新编译后获取对应的flink-connector-elasticsearch依赖包,然后把他放到dinky的flink版本中去即可。
https://github.com/apache/flink-connector-elasticsearch
把源码下载下来,然后把pom.xml里面的flink版本号修改为finky对应的flink版本号,然后重新编译后获取对应的flink-connector-elasticsearch依赖包,然后把他放到dinky的flink版本中去即可。
发布于:1年前 (2024-03-20) IP属地:未知
我来回答
您需要 登录 后回答此问题!