Featured

CosmosDB and H2 dual update.

Trying to get to the habit of blogging often this year. New year’s resolution, hoping I can make it as a habit.

One of the interesting thing I was struggling was to try out was a dual update to two data-sources and work through a model where spring batch kicks off the entire workflow. Azure cloud has always been a good place to try out some spring related stack with its native support.

So here is the problem statement that this post is for., get a open data csv file, parse it using spring batch and update an h2 database and replicate the same data in CosmosDB.

CosmosDB is a managed serverless offering from Azure and is a very robust NoSQLDB. Here is a high level overview of the solution

The csv file for the solution can be downloaded from the open data portal. The file has lot more information than what is needed for the solution. Spring boot provides a API tool set for reading csv files. Here is the code snippet for the reader.

    @Bean
    public FlatFileItemReader<RoadInput> reader() {
        return new FlatFileItemReaderBuilder<RoadInput>().name("MatchItemReader")
                .resource(new ClassPathResource("Rustic_Roads.csv")).delimited().names(DATA_STRINGS).
                fieldSetMapper(new BeanWrapperFieldSetMapper<RoadInput>() {
                    {
                        setTargetType(RoadInput.class);
                    }
                }).build();
    }

The processor for the input file that has to be converted to an entity object goes something like this

import me.sathish.myroadbatch.data.Road;
import me.sathish.myroadbatch.data.RoadInput;
import org.springframework.batch.item.ItemProcessor;
public class RoadDataProcessor implements ItemProcessor<RoadInput, Road> {
    @Override
    public Road process(RoadInput roadInput) throws Exception {
        Road road = new Road();
        road.setRoad_name(roadInput.getRoadName());
        road.setSurface_type(roadInput.getSurfaceType());
        road.setMiles(roadInput.getMiles());
        road.setTotal_miles(roadInput.getTotalMiles());
      road.setRustic_road_number(roadInput.getRusticRoadNumber());
        road.setCommunity(roadInput.getCommunity());
        road.setCounty(roadInput.getCounty());
        road.setLine_to_web_page(roadInput.getLineToWebpage());
        road.setShape_len(roadInput.getShapeLen());
        return road;
    }
}

The writer is a simple JDBC template write

    @Bean
    public JdbcBatchItemWriter<Road> writer(DataSource dataSource) {
        return new JdbcBatchItemWriterBuilder<Road>()
                .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
                .sql("Insert Into Road(road_name, surface_type, miles, total_miles, rustic_road_number," +
                        " community, county,line_to_web_page," +
                        " shape_len) VALUES(:road_name, :surface_type,:miles, " +
                        ":total_miles,:rustic_road_number,:community,:county,:line_to_web_page," +
                        ":shape_len)").dataSource(dataSource).build();
    }

Finally the step and job for the reminder of the code is


    @Bean
    public Job importRoadUserJob(RoadsJobCompletionNotificationListener listener, Step step1) {
        return jobBuilderFactory.get("importRoadUserJob").
                incrementer(new RunIdIncrementer()).listener(listener).
                flow(step1).end().build();
    }
    @Bean
    public Step step1(JdbcBatchItemWriter<Road> writer) {
        return stepBuilderFactory.
                get("step2").<RoadInput, Road>chunk(10).
                reader(reader()).
                processor(processor()).
                writer(writer).
                build();
    }

The repo write is based of spring flux., here is a repository code

package me.sathish.myroadbatch.repository;
import com.azure.spring.data.cosmos.repository.ReactiveCosmosRepository;
import me.sathish.myroadbatch.data.RoadForCosmos;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;
@Repository
public interface RoadForCosmosRepo extends ReactiveCosmosRepository<RoadForCosmos, String> {
    Flux<RoadForCosmos> findByMiles(String miles);
}

The properties file for the application

spring.h2.console.enabled=true
spring.datasource.url=jdbc:h2:mem:mytestdb
spring.datasource.driverClassName=org.h2.Driver
spring.datasource.username=sa
spring.datasource.password=
spring.jpa.database-platform=org.hibernate.dialect.H2Dialect
spring.jpa.show-sql=true
spring.jpa.hibernate.ddl-auto=update
# Specify the DNS URI of your Azure Cosmos DB.
azure.cosmos.uri=https://cosmosdbv7.documents.azure.com:443/
# Specify the access key for your database.
azure.cosmos.key=
# Specify the name of your database.
azure.cosmos.database=roadwayscsv
azure.cosmos.populateQueryMetrics=true

finally the listener that ties the dual update is

package me.sathish.myroadbatch.listener;
import me.sathish.myroadbatch.data.Road;
import me.sathish.myroadbatch.data.RoadForCosmos;
import me.sathish.myroadbatch.repository.RoadForCosmosRepo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.persistence.EntityManager;
import java.util.List;
@Component
public class RoadsJobCompletionNotificationListener extends JobExecutionListenerSupport {
    private static final Logger LOGGER = LoggerFactory.getLogger(RoadsJobCompletionNotificationListener.class);
    private final EntityManager entityManager;
    @Autowired
    private RoadForCosmosRepo roadForCosmosRepo;
    @Autowired
    public RoadsJobCompletionNotificationListener(EntityManager entityManager) {
        this.entityManager = entityManager;
    }
    @Override
    public void afterJob(JobExecution jobExecution) {
        LOGGER.debug("Finishing up H2 insert. Going to Couch DB");
        this.roadForCosmosRepo.deleteAll().block();
        if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
            Object count = entityManager.createQuery("from Road").getResultList().get(0);
            List<Road> roadList = entityManager.createQuery("from Road ").getResultList();
            processForCosmosDB(roadList);
            System.out.println("The count is " + count);
        }
    }
    private void processForCosmosDB(List<Road> roadList) {
        this.roadForCosmosRepo.deleteAll().block();
        roadList.stream().forEach(road -> {
            RoadForCosmos roadForCosmos = new RoadForCosmos();
            roadForCosmos.setId(road.getId());
            roadForCosmos.setCounty(road.getCounty());
            roadForCosmos.setCommunity(road.getCommunity());
            roadForCosmos.setMiles(road.getMiles());
            roadForCosmos.setRoad_name(road.getRoad_name());
            roadForCosmos.setLine_to_web_page(road.getLine_to_web_page());
            roadForCosmos.setRustic_road_number(road.getRustic_road_number());
            final RoadForCosmos data = roadForCosmosRepo.save(roadForCosmos).block();
        });
    }
}

The complete code base in the github url. Hope this is useful when you are looking for a solution. Remember cosmosDB is a free managed service, I had a tough time getting an instance in the US region. So I had to get a my instance in Canada region.