NullPointerException with Atomikos and Spring Batch JpaPagingItemReader

430 Views Asked by At

I use Atomikos with spring boot, spring batch, spring data jpa, hibernate and Postgresql. I have two datasources configured in XA Datasource with the Atomikos pool.

I encounter two difficulties:

The first is that I can't disable the Atomikos pool autocommit. I've searched everywhere and can't find an autocommit-like property that I can set to false.

The second is a bit complex. My distributed transaction manager works because I have my two databases updating.

But when I use springbatch to read data from my second database with a JpaPagingItemReaderBuilder I have a nullPointeurException. Below is the configuration of my second datasource and my second entitymanager for connecting to my second database.

@Bean
@ConfigurationProperties(prefix = "spring.jta.atomikos.datasource.secondary")
public DataSource dataSourceSecondary(){
    return new AtomikosDataSourceBean();
}


@Bean
@PersistenceContext(unitName = "entityManagerSecondaryFactory")
public LocalContainerEntityManagerFactoryBean entityManagerSecondaryFactory(){

    Properties properties;
    HibernateJpaVendorAdapter jpaVendorAdapter;
    LocalContainerEntityManagerFactoryBean factory;

    factory = new LocalContainerEntityManagerFactoryBean();
    factory.setDataSource(this.dataSourceSecondary());

    factory.setPackagesToScan(this.deltaProperties.getRepositoryPackageScanSecondary());

    //Conf JPAVendorAdapter
    jpaVendorAdapter = new HibernateJpaVendorAdapter();
    jpaVendorAdapter.setGenerateDdl(Boolean.parseBoolean(this.deltaProperties.getGenerateDdl()));
    jpaVendorAdapter.setDatabasePlatform(this.deltaProperties.getDatabase());
    factory.setJpaVendorAdapter(jpaVendorAdapter);

    //Propriété de l'implémentation du Provider JPA
    properties = new Properties();
    properties.setProperty("hibernate.cache.use_query_cache", this.deltaProperties.getUseQueryCache());
    properties.setProperty("hibernate.cache.use_second_level_cache", this.deltaProperties.getUseSecondLevelCache());
    properties.setProperty("hibernate.cache.region.factory_class", this.deltaProperties.getRegionFactoryClass());
    properties.setProperty("hibernate.generate_statistics", this.deltaProperties.getGenerateStatistics());
    properties.setProperty("hibernate.jdbc.batch_size", Integer.toString(this.deltaProperties.getBatchSize()));
    properties.setProperty("hibernate.order_inserts", this.deltaProperties.getOrderInserts());
    properties.setProperty("hibernate.order_updates", this.deltaProperties.getOrderUpdates());
    properties.setProperty("hibernate.hbm2ddl.auto", this.deltaProperties.getHbm2ddlAuto());
    properties.setProperty("hibernate.id.optimizer.pooled.preferred",
            this.deltaProperties.getIdOptimizerPooledPreferred());
    properties.setProperty("hibernate.format_sql", this.deltaProperties.getFormatSql());
    properties.setProperty("hibernate.use_sql_comments", this.deltaProperties.getCommentsSql());
    properties.setProperty("hibernate.default_schema", this.deltaProperties.getDefautSchema());
    properties.setProperty("hibernate.dialect", this.deltaProperties.getDialect());
    properties.setProperty("hibernate.connection.provider_disables_autocommit",
            this.deltaProperties.getDisablesAutoCommit());
    properties.setProperty("org.hibernate.envers.audit_table_suffix", this.deltaProperties.getHisto());
    properties.setProperty("hibernate.jdbc.time_zone", this.deltaProperties.getUtc());
    properties.setProperty("hibernate.connection.autocommit", "false");

    properties.setProperty("hibernate.transaction.jta.plateform", AtomikosJtaPlatform.class.getName());


    factory.setJpaProperties(properties);

    JpaDialect dialect = new HibernateJpaDialect();
    factory.setJpaDialect(dialect);

    return factory;
}

...

@Bean
@StepScope
public ItemReader<ResponseEntity> responseSendItemReader(
        @Value("#{jobExecutionContext['num']}") String num) {

    Map<String, Object> parameterValues;

    parameterValues = new HashMap<>();

    //parameter
    parameterValues.put("num", num);

    return new JpaPagingItemReaderBuilder<ResponseEntity>()
            .name("responseSendItemReader")
            .entityManagerFactory(this.entityManagerSecondaryFactory)
            .queryString("SELECT r FROM ResponseEntity r "
                    + " WHERE r.num = :num "
                    + " ORDER BY r.dateEmission ASC")

            .parameterValues(parameterValues)
            .pageSize(25)
            .build();

}

Here is my properties file for my second base

spring.jta.atomikos.datasource.secondary.unique-resource-name=dataSourceSecondary
spring.jta.atomikos.datasource.secondary.max-pool-size=50
spring.jta.atomikos.datasource.secondary.min-pool-size=10
spring.jta.atomikos.datasource.secondary.max-life-time=0
spring.jta.atomikos.datasource.secondary.borrow-connection-timeout=10000
spring.jta.atomikos.datasource.secondary.defaultAutoCommit=false
spring.jta.atomikos.datasource.secondary.xa-data-source-class-name=@deltaie.ds.driver.class.name@
spring.jta.atomikos.datasource.secondary.xa-properties.url=url
spring.jta.atomikos.datasource.secondary.xa-properties.user=user
spring.jta.atomikos.datasource.secondary.xa-properties.password=pwd

Here is my transactions.properties file:

# Atomikos properties

com.atomikos.icatch.service=com.atomikos.icatch.standalone.UserTransactionServiceFactory
com.atomikos.icatch.log_base_dir=./atomikos/log
com.atomikos.icatch.log_base_name=deltaieTransaction
com.atomikos.icatch.default_jta_timeout=10000
com.atomikos.icatch.oltp_retry_interval=10000
com.atomikos.icatch.checkpoint_interval=500
com.atomikos.icatch.max_timeout=300000
com.atomikos.icatch.recovery_delay=10000
com.atomikos.icatch.max_actives=50

And here is the error I got

onReadError GunSendResponseStepListener java.lang.NullPointerException: null
at org.springframework.batch.item.database.JpaPagingItemReader.doReadPage(JpaPagingItemReader.java:192)
at org.springframework.batch.item.database.AbstractPagingItemReader.doRead(AbstractPagingItemReader.java:110)
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:93)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:137)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215)
at com.sun.proxy.$Proxy300.read(Unknown Source)
at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:99)
at org.springframework.batch.core.step.item.FaultTolerantChunkProvider.read(FaultTolerantChunkProvider.java:87)
at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:126)
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375)
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145)
at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:118)
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:71)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273)
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82)
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375)
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145)
at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:152)
at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:68)
at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:68)
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169)
at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144)
at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:137)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:320)
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:149)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)

Thanks for your help

1

There are 1 best solutions below

0
patrick BAK On

If it helps, it was because the function defining my JpaPagingItemReader returned the IteamReader interface and not the JpapagingItemReader class.

Since this function is of Step scope, it is recommended to use the JpaPagingItemReader class rather than the ItemReader interface because of the proxy.