I use Atomikos with spring boot, spring batch, spring data jpa, hibernate and Postgresql. I have two datasources configured in XA Datasource with the Atomikos pool.
I encounter two difficulties:
The first is that I can't disable the Atomikos pool autocommit. I've searched everywhere and can't find an autocommit-like property that I can set to false.
The second is a bit complex. My distributed transaction manager works because I have my two databases updating.
But when I use springbatch to read data from my second database with a JpaPagingItemReaderBuilder I have a nullPointeurException. Below is the configuration of my second datasource and my second entitymanager for connecting to my second database.
@Bean
@ConfigurationProperties(prefix = "spring.jta.atomikos.datasource.secondary")
public DataSource dataSourceSecondary(){
return new AtomikosDataSourceBean();
}
@Bean
@PersistenceContext(unitName = "entityManagerSecondaryFactory")
public LocalContainerEntityManagerFactoryBean entityManagerSecondaryFactory(){
Properties properties;
HibernateJpaVendorAdapter jpaVendorAdapter;
LocalContainerEntityManagerFactoryBean factory;
factory = new LocalContainerEntityManagerFactoryBean();
factory.setDataSource(this.dataSourceSecondary());
factory.setPackagesToScan(this.deltaProperties.getRepositoryPackageScanSecondary());
//Conf JPAVendorAdapter
jpaVendorAdapter = new HibernateJpaVendorAdapter();
jpaVendorAdapter.setGenerateDdl(Boolean.parseBoolean(this.deltaProperties.getGenerateDdl()));
jpaVendorAdapter.setDatabasePlatform(this.deltaProperties.getDatabase());
factory.setJpaVendorAdapter(jpaVendorAdapter);
//Propriété de l'implémentation du Provider JPA
properties = new Properties();
properties.setProperty("hibernate.cache.use_query_cache", this.deltaProperties.getUseQueryCache());
properties.setProperty("hibernate.cache.use_second_level_cache", this.deltaProperties.getUseSecondLevelCache());
properties.setProperty("hibernate.cache.region.factory_class", this.deltaProperties.getRegionFactoryClass());
properties.setProperty("hibernate.generate_statistics", this.deltaProperties.getGenerateStatistics());
properties.setProperty("hibernate.jdbc.batch_size", Integer.toString(this.deltaProperties.getBatchSize()));
properties.setProperty("hibernate.order_inserts", this.deltaProperties.getOrderInserts());
properties.setProperty("hibernate.order_updates", this.deltaProperties.getOrderUpdates());
properties.setProperty("hibernate.hbm2ddl.auto", this.deltaProperties.getHbm2ddlAuto());
properties.setProperty("hibernate.id.optimizer.pooled.preferred",
this.deltaProperties.getIdOptimizerPooledPreferred());
properties.setProperty("hibernate.format_sql", this.deltaProperties.getFormatSql());
properties.setProperty("hibernate.use_sql_comments", this.deltaProperties.getCommentsSql());
properties.setProperty("hibernate.default_schema", this.deltaProperties.getDefautSchema());
properties.setProperty("hibernate.dialect", this.deltaProperties.getDialect());
properties.setProperty("hibernate.connection.provider_disables_autocommit",
this.deltaProperties.getDisablesAutoCommit());
properties.setProperty("org.hibernate.envers.audit_table_suffix", this.deltaProperties.getHisto());
properties.setProperty("hibernate.jdbc.time_zone", this.deltaProperties.getUtc());
properties.setProperty("hibernate.connection.autocommit", "false");
properties.setProperty("hibernate.transaction.jta.plateform", AtomikosJtaPlatform.class.getName());
factory.setJpaProperties(properties);
JpaDialect dialect = new HibernateJpaDialect();
factory.setJpaDialect(dialect);
return factory;
}
...
@Bean
@StepScope
public ItemReader<ResponseEntity> responseSendItemReader(
@Value("#{jobExecutionContext['num']}") String num) {
Map<String, Object> parameterValues;
parameterValues = new HashMap<>();
//parameter
parameterValues.put("num", num);
return new JpaPagingItemReaderBuilder<ResponseEntity>()
.name("responseSendItemReader")
.entityManagerFactory(this.entityManagerSecondaryFactory)
.queryString("SELECT r FROM ResponseEntity r "
+ " WHERE r.num = :num "
+ " ORDER BY r.dateEmission ASC")
.parameterValues(parameterValues)
.pageSize(25)
.build();
}
Here is my properties file for my second base
spring.jta.atomikos.datasource.secondary.unique-resource-name=dataSourceSecondary
spring.jta.atomikos.datasource.secondary.max-pool-size=50
spring.jta.atomikos.datasource.secondary.min-pool-size=10
spring.jta.atomikos.datasource.secondary.max-life-time=0
spring.jta.atomikos.datasource.secondary.borrow-connection-timeout=10000
spring.jta.atomikos.datasource.secondary.defaultAutoCommit=false
spring.jta.atomikos.datasource.secondary.xa-data-source-class-name=@deltaie.ds.driver.class.name@
spring.jta.atomikos.datasource.secondary.xa-properties.url=url
spring.jta.atomikos.datasource.secondary.xa-properties.user=user
spring.jta.atomikos.datasource.secondary.xa-properties.password=pwd
Here is my transactions.properties file:
# Atomikos properties
com.atomikos.icatch.service=com.atomikos.icatch.standalone.UserTransactionServiceFactory
com.atomikos.icatch.log_base_dir=./atomikos/log
com.atomikos.icatch.log_base_name=deltaieTransaction
com.atomikos.icatch.default_jta_timeout=10000
com.atomikos.icatch.oltp_retry_interval=10000
com.atomikos.icatch.checkpoint_interval=500
com.atomikos.icatch.max_timeout=300000
com.atomikos.icatch.recovery_delay=10000
com.atomikos.icatch.max_actives=50
And here is the error I got
onReadError GunSendResponseStepListener java.lang.NullPointerException: null
at org.springframework.batch.item.database.JpaPagingItemReader.doReadPage(JpaPagingItemReader.java:192)
at org.springframework.batch.item.database.AbstractPagingItemReader.doRead(AbstractPagingItemReader.java:110)
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:93)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:137)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215)
at com.sun.proxy.$Proxy300.read(Unknown Source)
at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:99)
at org.springframework.batch.core.step.item.FaultTolerantChunkProvider.read(FaultTolerantChunkProvider.java:87)
at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:126)
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375)
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145)
at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:118)
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:71)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273)
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82)
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375)
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145)
at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:152)
at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:68)
at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:68)
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169)
at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144)
at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:137)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:320)
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:149)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Thanks for your help
If it helps, it was because the function defining my JpaPagingItemReader returned the IteamReader interface and not the JpapagingItemReader class.
Since this function is of Step scope, it is recommended to use the JpaPagingItemReader class rather than the ItemReader interface because of the proxy.