Trying to connect to mongoDB using FlinkSQL. Created maven project and implemented the java code. after building the jar and submitted to flink cluster.
Flink version – 1.19.0
Please find the code below
public class mongoDB
{
public static void main(String[] args)
{
try
{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String sqlToRun = "CREATE TABLE flinkSql1 ("
+"_id String,"
+ "Name String,"
+ "Age String,"
+ "PRIMARY KEY(_id) NOT ENFORCED"
+ ") WITH ("
+ "'connector' = 'mongodb-cdc',"
+ "'hosts' = 'njbnet.com:27009',"
+ "'username' = 'user',"
+ "'password' = 'bX2Y%3FVSz',"
+ "'database' = 'eric',"
+ "'collection' = 'flinkSql',"
+ "'connection.options' = 'authSource=admin'"
+")";
tableEnv.executeSql(sqlToRun);
String insertQuery = "SELECT * FROM flinkSql1";
TableResult result = tableEnv.executeSql(insertQuery);
result.print();
env.execute("Job Submission");
}
catch (Exception e)
{
e.printStackTrace();
}
}
}
Executing the above code in the flink cluster and getting the below exception
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:417)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:169)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.<init>(RegularOperatorChain.java:60)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:730)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:712)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.commons.collections.map.LinkedMap to field org.apache.flink.cdc.debezium.DebeziumSourceFunction.pendingOffsetsToCommit of type org.apache.commons.collections.map.LinkedMap in instance of org.apache.flink.cdc.debezium.DebeziumSourceFunction
at java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2076)
at java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2039)
at java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1293)
at java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2512)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2419)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:481)
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:401)
... 9 more
Please find the dependecies which I am using
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.19.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>1.19.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mongodb-cdc</artifactId>
<version>3.1.0</version>
<exclusions>
<exclusion>
<groupId>commons.collections</groupId>
<artifactId>commons-collections</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>1.19.0</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
<version>4.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mongodb</artifactId>
<version>1.2.0-1.19</version>
</dependency>
Tried with diffrent version of the dependencies still getting the same error