Caused by: java.lang.ClassCastException: LinkedMap in instance of org.apache.flink.cdc.debezium.DebeziumSourceFunction

  Kiến thức lập trình

Trying to connect to mongoDB using FlinkSQL. Created maven project and implemented the java code. after building the jar and submitted to flink cluster.

Flink version – 1.19.0

Please find the code below

public class mongoDB
{
    public static void main(String[] args) 
    {
        try
        {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

            String sqlToRun = "CREATE TABLE flinkSql1 ("
                    +"_id String,"
                    + "Name String,"
                    + "Age String,"
                    + "PRIMARY KEY(_id) NOT ENFORCED"
                    + ") WITH ("
                    + "'connector' = 'mongodb-cdc',"
                    + "'hosts' = 'njbnet.com:27009',"
                    + "'username' = 'user',"
                    + "'password' = 'bX2Y%3FVSz',"
                    + "'database' = 'eric',"
                    + "'collection' = 'flinkSql',"
                    + "'connection.options' = 'authSource=admin'"
                    +")";

            tableEnv.executeSql(sqlToRun);
            String insertQuery = "SELECT * FROM flinkSql1";
            TableResult result = tableEnv.executeSql(insertQuery);
            result.print();
            env.execute("Job Submission");
    }
    catch (Exception e)
    {
            e.printStackTrace();
    }
    }
}

Executing the above code in the flink cluster and getting the below exception

Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:417)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:169)
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.<init>(RegularOperatorChain.java:60)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:730)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:712)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.commons.collections.map.LinkedMap to field org.apache.flink.cdc.debezium.DebeziumSourceFunction.pendingOffsetsToCommit of type org.apache.commons.collections.map.LinkedMap in instance of org.apache.flink.cdc.debezium.DebeziumSourceFunction
    at java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2076)
    at java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2039)
    at java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1293)
    at java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2512)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2419)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
    at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
    at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:481)
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:401)
    ... 9 more

Please find the dependecies which I am using

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.19.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>1.19.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-mongodb-cdc</artifactId>
            <version>3.1.0</version>
            <exclusions>
                <exclusion>
                    <groupId>commons.collections</groupId>
                    <artifactId>commons-collections</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>1.19.0</version>
        </dependency>
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongodb-driver-sync</artifactId>
            <version>4.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-collections4</artifactId>
            <version>4.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-mongodb</artifactId>
            <version>1.2.0-1.19</version>
        </dependency>

Tried with diffrent version of the dependencies still getting the same error

Theme wordpress giá rẻ Theme wordpress giá rẻ Thiết kế website Kho Theme wordpress Kho Theme WP Theme WP

LEAVE A COMMENT