使用dinky编写flinksql把数据同步到elasticsearch中报错,提示:org.apache.flink.table.types.logical.utils.LogicalTypeChe

提问者:帅平 问题分类:大数据
使用dinky编写flinksql把数据同步到elasticsearch中报错,提示:
Caused by: org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.users_sink'.

Table options are:

'connector'='elasticsearch-7'
'hosts'='http://192.168.31.254:9200'
'index'='users'
	at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:322)
	at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:454)
	at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:231)
	at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate(PlannerBase.scala:181)
	at scala.collection.TraversableLike.$anonfun$map(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:181)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1277)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:862)
	at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:109)
	at org.dinky.executor.Executor.executeStatementSet(Executor.java:293)
	at org.dinky.job.builder.JobTransBuilder.processWithoutGateway(JobTransBuilder.java:116)
	at org.dinky.job.builder.JobTransBuilder.handleStatementSet(JobTransBuilder.java:84)
	at org.dinky.job.builder.JobTransBuilder.run(JobTransBuilder.java:70)
	at org.dinky.job.JobManager.executeSql(JobManager.java:341)
	... 137 more
Caused by: java.lang.NoSuchMethodError: org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot(Lorg/apache/flink/table/types/logical/LogicalType;Lorg/apache/flink/table/types/logical/LogicalTypeRoot;)Z
	at org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchValidationUtils.lambda$null```CODE```(ElasticsearchValidationUtils.java:73)
	at java.util.stream.ReferencePipeline.accept(ReferencePipeline.java:193)
	at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
	at org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchValidationUtils.lambda$validatePrimaryKey(ElasticsearchValidationUtils.java:84)
	at java.util.Optional.ifPresent(Optional.java:159)
	at org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchValidationUtils.validatePrimaryKey(ElasticsearchValidationUtils.java:63)
	at org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7DynamicSinkFactory.createDynamicTableSink(Elasticsearch7DynamicSinkFactory.java:84)
	at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:319)
	... 159 more

请问怎么办?
2 个回答
走过的路
走过的路
这是由于flink-connector-elasticsearch依赖包中的flink版本与dinky对应的flink版本不一致,去如下的地方:
https://github.com/apache/flink-connector-elasticsearch

把源码下载下来,然后把pom.xml里面的flink版本号修改为finky对应的flink版本号,然后重新编译后获取对应的flink-connector-elasticsearch依赖包,然后把他放到dinky的flink版本中去即可。
发布于:1个月前 (03-20) IP属地:未知
我来回答