springbatch-ItemWriter使用
ItemReader是一个数据一个数据的读,而ItemWriter是一批一批的输出
把数据输出到数据库
@Configuration
public class ItemWriterDbDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Bean
public Job itemWriterDbDemoJob(){
return jobBuilderFactory
.get("itemWriterDbDemoJob")
.start(itemWriterDbDemoStep())
.build();
}
@Bean
public Step itemWriterDbDemoStep(){
return stepBuilderFactory
.get("itemWriterDbDemoStep")
.<User,User>chunk(2)
.reader(myReader())
.writer(dbWriter())
.build();
}
@Bean
public JdbcBatchItemWriter<User> dbWriter(){
JdbcBatchItemWriter<User> writer = new JdbcBatchItemWriter<>();
writer.setDataSource(dataSource);//设置数据源
//设置sql
writer.setSql("insert into t_user(id,name,pass) values(:id,:name,:pass)");
//替换占位符
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
return writer;
}
@Bean
public ItemReader<User> myReader(){
List<User> list = new ArrayList<>();
for (int i = 0; i < 50; i++) {
User user = new User()
.setId(i)
.setName("name:" + i)
.setPass("pass:" + i);
list.add(user);
}
return new ListItemReader<>(list);
}
}
把数据输出到文件
@Configuration
public class FileItemWriterDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Bean
public Job fileItemWriterDemoJob(){
return jobBuilderFactory
.get("fileItemWriterDemoJob")
.start(fileItemWriterDemoStep())
.build();
}
@Bean
public Step fileItemWriterDemoStep(){
return stepBuilderFactory
.get("fileItemWriterDemoStep")
.<User,User>chunk(10)
.reader(dbJdbcReader())
.writer(fileWriter())
.build();
}
@SneakyThrows
@Bean
public ItemWriter<User> fileWriter(){
FlatFileItemWriter<User> writer = new FlatFileItemWriter<>();
//指定输出位置
writer.setResource(new FileSystemResource("D:\\test\\users.txt"));
//把对象转成字符串
writer.setLineAggregator(new LineAggregator<User>() {
@SneakyThrows
@Override
public String aggregate(User user) {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(user);
}
});
writer.afterPropertiesSet();
return writer;
}
@Bean
public ItemReader<User> dbJdbcReader(){
JdbcPagingItemReader<User> reader = new JdbcPagingItemReader<>();
reader.setDataSource(dataSource);//设置数据源
reader.setFetchSize(2);//一次去多少记录
//把读取到的记录转换成user对象
reader.setRowMapper(new RowMapper<User>() {
@Override
public User mapRow(ResultSet rs, int rowNum) throws SQLException {
User user = new User();
user.setId(rs.getInt(1));
user.setName(rs.getString(2));
user.setPass(rs.getString(3));
return user;
}
});
//指定sql语句
MySqlPagingQueryProvider provider = new MySqlPagingQueryProvider();
provider.setSelectClause("id,name,pass");//查哪些字段
provider.setFromClause("from t_user");//从哪个表查
//排序
Map<String, Order> sort = new HashMap<>(1);
sort.put("id",Order.DESCENDING);//排序字段,升序还是降序
provider.setSortKeys(sort);
reader.setQueryProvider(provider);
return reader;
}
}
把数据输出到xml文件
@Configuration
public class XmlItemWriterDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job xmlItemWriterDemoJob(){
return jobBuilderFactory
.get("xmlItemWriterDemoJob")
.start(xmlItemWriterDemoStep())
.build();
}
@Bean
public Step xmlItemWriterDemoStep(){
return stepBuilderFactory
.get("xmlItemWriterDemoStep")
.<User,User>chunk(2)
.reader(myReader())
.writer(xmlItemWriter())
.build();
}
@SneakyThrows
@Bean
public ItemWriter<User> xmlItemWriter(){
StaxEventItemWriter<User> writer = new StaxEventItemWriter<>();
XStreamMarshaller marshaller = new XStreamMarshaller();
Map<String,Class> aliases = new HashMap<>();
aliases.put("user",User.class);
marshaller.setAliases(aliases);
writer.setMarshaller(marshaller);
writer.setRootTagName("users");//设置根标签
//文件地址
String path = "D:\\test\\users.xml";
writer.setResource(new FileSystemResource(path));
writer.afterPropertiesSet();
return writer;
}
@Bean
public ItemReader<User> myReader(){
List<User> userList = new ArrayList<>();
userList.add(new User().setId(1).setName("zhangsan").setPass("123"));
userList.add(new User().setId(2).setName("lisi").setPass("123"));
userList.add(new User().setId(3).setName("wangwu").setPass("123"));
ListItemReader<User> reader = new ListItemReader<>(userList);
return reader;
}
}
把数据同时输出到多个文件
@Configuration
public class MultiFileItemWriterDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job multiFileDemoJob(){
return jobBuilderFactory
.get("multiFileDemoJob")
.start(multiFileDemoStep())
.build();
}
@Bean
public Step multiFileDemoStep(){
return stepBuilderFactory
.get("multiFileDemoStep")
.<User,User>chunk(2)
.reader(myReader())
.writer(multiFileItemWriter())
.build();
}
@SneakyThrows
@Bean
public CompositeItemWriter<User> multiFileItemWriter(){
CompositeItemWriter<User> writer = new CompositeItemWriter();
//设置由谁进行写
writer.setDelegates(Arrays.asList(xmlFileItemWriter(),jsonFileItemWriter()));
writer.afterPropertiesSet();
return writer;
}
@SneakyThrows
@Bean
public ItemWriter<User> xmlFileItemWriter(){
StaxEventItemWriter<User> writer = new StaxEventItemWriter<>();
XStreamMarshaller marshaller = new XStreamMarshaller();
Map<String,Class> aliases = new HashMap<>();
aliases.put("user",User.class);
marshaller.setAliases(aliases);
writer.setRootTagName("users");
writer.setMarshaller(marshaller);
String path = "D:\\test\\u2.xml";
writer.setResource(new FileSystemResource(path));
writer.afterPropertiesSet();
return writer;
}
@SneakyThrows
@Bean
public ItemWriter<User> jsonFileItemWriter(){
FlatFileItemWriter<User> writer = new FlatFileItemWriter<>();
String path = "D:\\test\\u1.txt";
writer.setResource(new FileSystemResource(path));
//把对象转换为字符串
writer.setLineAggregator(new LineAggregator<User>() {
@SneakyThrows
@Override
public String aggregate(User user) {
return new ObjectMapper()
.writeValueAsString(user);
}
});
writer.afterPropertiesSet();
return writer;
}
@Bean
public ItemReader<User> myReader(){
List<User> userList = new ArrayList<>();
userList.add(new User().setId(1).setName("zhangsan").setPass("123"));
userList.add(new User().setId(2).setName("lisi").setPass("123"));
userList.add(new User().setId(3).setName("wangwu").setPass("123"));
ListItemReader<User> reader = new ListItemReader<>(userList);
return reader;
}
}
把数据同时输出到多个文件并进行分类
@Configuration
public class MultiFileItemWriterDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job multiFileDemoJob(){
return jobBuilderFactory
.get("multiFileDemoJob")
.start(multiFileDemoStep())
.build();
}
@Bean
public Step multiFileDemoStep(){
return stepBuilderFactory
.get("multiFileDemoStep")
.<User,User>chunk(2)
.reader(myReader())
.writer(multiFileItemWriter())//它没有实现ItemStream
.stream(jsonFileItemWriter())
.stream(xmlFileItemWriter())
.build();
}
@Bean
public ClassifierCompositeItemWriter<User> multiFileItemWriter(){
ClassifierCompositeItemWriter<User> writer = new ClassifierCompositeItemWriter<>();
//进行分类
writer.setClassifier(new Classifier<User, ItemWriter<? super User>>() {
@Override
public ItemWriter<? super User> classify(User user) {
//安装id分类
return user.getId()%2==0
?jsonFileItemWriter()
:xmlFileItemWriter();
}
});
return writer;
}
@SneakyThrows
@Bean
public StaxEventItemWriter<User> xmlFileItemWriter(){
StaxEventItemWriter<User> writer = new StaxEventItemWriter<>();
XStreamMarshaller marshaller = new XStreamMarshaller();
Map<String,Class> aliases = new HashMap<>();
aliases.put("user",User.class);
marshaller.setAliases(aliases);
writer.setRootTagName("users");
writer.setMarshaller(marshaller);
String path = "D:\\test\\u2.xml";
writer.setResource(new FileSystemResource(path));
writer.afterPropertiesSet();
return writer;
}
@SneakyThrows
@Bean
public FlatFileItemWriter<User> jsonFileItemWriter(){
FlatFileItemWriter<User> writer = new FlatFileItemWriter<>();
String path = "D:\\test\\u1.txt";
writer.setResource(new FileSystemResource(path));
//把对象转换为字符串
writer.setLineAggregator(new LineAggregator<User>() {
@SneakyThrows
@Override
public String aggregate(User user) {
return new ObjectMapper()
.writeValueAsString(user);
}
});
writer.afterPropertiesSet();
return writer;
}
@Bean
public ItemReader<User> myReader(){
List<User> userList = new ArrayList<>();
userList.add(new User().setId(1).setName("zhangsan").setPass("123"));
userList.add(new User().setId(2).setName("lisi").setPass("123"));
userList.add(new User().setId(3).setName("wangwu").setPass("123"));
ListItemReader<User> reader = new ListItemReader<>(userList);
return reader;
}
}