如何让kettle 读取文件夹遍历读取某个文件夹下的多个 xml文件

Kettle 使用备忘录:生成xml文件
1. 利用 excel中的数据生成xml文件kettle中的xml文件输出组件的功能其实是很弱的,所以要生成较为复杂的xml文件时需要使用组件:
placeholder, js script 等等
使用merge join主要是为了减少在xml
join中需要匹配的结果集合(因为xml join中的匹配功能其实是很弱的)。在merge join之前记录必须先排序。placeholder 组件是往记录流中添加一个常量,这个常量在 add xml中通常不被设为属性,而是在xml join中用于放置需要被组装进去的xml元素。以生成如下简单的xml文件为例:&?xml version="1.0" encoding="UTF-8"?&&document&
name="foo"&
from section foo
&/section&
name="bar"&
from section bar
&/section&&/document&对应的kettle 转换为:
首先需要从excel文件中读取数据,excel文件中的内容为
利用读取到的内容生成xml元素,如下是add
xml中的设置:
这一步主要生成了如下的xml元素:&section name="foo"&
from section foo
&/p&&/section&&section name="bar"&
from section bar
&/section&接下来要生成 &document&元素,因为该元素没有对应的数据,所以使用“生成记录”组件生成一条空记录用于与之前的xml元素进行xml join。它的功能有点类似与之前提到的placeholder,不同的是这个可以作为起始输入,而placeholder是一个转换步骤。
接着就是进行xml join了,source stream中的xml元素会被拼接到target
stream的xml元素中。怎么拼接有join condition properties决定,它是使用xpath来定位要拼接的位置,例如//doc 就是把source stream的元素拼接到每个doc元素中。需要注意的是,由于这次xml join之后xml文件就生成好了,所以不能把omit xml header选上。
当然通过xpath也可以做复杂的条件join,如下是个例子:
可以通过xpath指定在所有target
stream的area元素中如果area的属性STATIONID的值与source stream中STATIONID的值(这个值不一定要在source的xml元素中,但必须在source stream中,例如可以是一开始从excel读入的流中)相同,那么把source stream中的xml元素放到 orderlinecomments元素中。这步之后生成的xml文件如下:&?xml version="1.0" encoding="UTF-8"?&&document& &doc&
name="foo"&
from section foo
&/section&
name="bar"&
from section bar
&/section& &/doc&&/document&和我们预期的多了&doc&和&/doc&,这是我们使用js脚本进行替换:
js脚本处理好之后,输出流的名字就换成
xmlOderLinesNew了。最后把这个流输出到文件中,需要注意的是在内容中不要使用“分割符”,“封闭符”,和“头部”:
没有更多推荐了,
不良信息举报
举报内容:
Kettle 使用备忘录:生成xml文件
举报原因:
原文地址:
原因补充:
最多只允许输入30个字
加入CSDN,享受更精准的内容推荐,与500万程序员共同成长!kettle 批量抽取多个表数据
使用kettle从一个库将数据抽取到另一个库,单个表一个转换就可以简单实现了,那如果是好多表呢,难道要建若干个转换来实现这个功能吗,那就尝试做一个批量抽取的job吧!
整个抽取过程包括一个job和两个trans,先来看看整个job吧:
job的start和success就没什么可说的了,两个脚本是关键,接下来一步一步来。
首先是文件输入这个trans:
这个trans的作用是提取表名并放入结果集中,表输入是用一段SQL语句将表名输出,字段选择是将SQL输出的那个含有表名的字段提取出来并重新命名,最后含有表名的记录复制到结果集中,方便下一个步骤的使用。
接下来设置变量的脚本:
var prevRow=previous_result.getRows();
if (prevRow == null &&(prevRow.size()=0))
parent_job.setVariable("tables", prevRow);
parent_job.setVariable("size", prevRow.size());
parent_job.setVariable("i", 0);
parent_job.setVariable("TABLENAME", prevRow.get(0).getString("tablenames",""));
先获取上一个步骤的结果集,并将表名以ArrayList格式存储在tables这个变量中,将表的数量放入size这个变量中,定义i变量方便稍后控制循环,最后,将这个结果集中的第一个表名取出来并放入到TABLENAME这个变量中。
这个脚本的作用是将你要用的表名放到一个变量中,然后再设置另一个变量遍历取表名,这是很重要的一步。因为在job中有设置变量的这个选项,但是设置变量后,所有的表名都会在这一个变量中,如果你在下一步骤直接取用该变量的值,你得到的不是一个一个的表名,而是闪闪发光的红色报错。
没有表传入是条件判断false后的结果。
接下来就是这个抽取的核心了,
首先是条件判断:
接着就是数据抽取:
最后就是遍历下一个表名:
var list_Tables =parent_job.getVariable("tables").replace(" ","").replace("[","").replace("]","").split(",");
var size = new Number(parent_job.getVariable("size"));
var i = new Number(parent_job.getVariable("i"))+1;
if(i&size){
parent_job.setVariable("TABLENAME", list_Tables[i]);
parent_job.setVariable("i",i);
条件判断简洁后就是if i&size,然后对取出的表名进行抽取工作,很简单,一个表输入,一个表输出,就是一个表的抽取trans,只不过里面用的不是常量表名,而是变量表名select * from ${TABLENAME}(记得勾选替换变量哦)。最后的脚本密密麻麻的一坨,其实就是取出变量TABLENAME中的下一个表名,并且i++,这一步在我看来是最难的,因为不会写啊!
打完收工!
。。。。。还没完呢
总结一下:
目的:减少重复造轮子,那就是造一个模具
难点:在kettle中没有循环这个组件,想要实现就得自己动手实现循环
关键点:利用两个脚本来实现变量的设置和变量的遍历,再用一个检验字段的值实现条件判断,辛辛苦苦只是为了实现一行代码for(i=0;i&i++)
未解决的困难:在抽取表数据的时候发现,如果表中有clob字段并且存的数据很大的话,会抽取失败,原因是
& Clob 太大, 无法存储在 Java 字符串中
这个问题也再网上找过答案,但是木有找到解决的方法,如果网友有解决方法,敬请留下,感激不尽
后续:这个抽取数据的过程有个前提,那就是两个库里都有相同的表和对应的表结构,所以我在上述中也没有写到判断有没有表存在,下一步希望能够实现自动动态创建表,然后加到上述的job中,那就完美了。
总结4已解决:
经排查,上述的clob太大,无法存储的问题是因为有一个clob的字段的字符长度为31800,然后其余的字段都低于1800,将字符长度为31800的字段剔除后,能正常的抽取数据,可见是因为clob的字符长度过长引起的无法存储。
没有更多推荐了,
不良信息举报
举报内容:
kettle 批量抽取多个表数据
举报原因:
原文地址:
原因补充:
最多只允许输入30个字
加入CSDN,享受更精准的内容推荐,与500万程序员共同成长!& & & 环境:windows7,jvm内存设置14G,kettle5.1后来升级到5.4,oracle作为资源库。
& & & 问题背景:我们通过web页面管理kettle的job运行,这只是一个管理界面,即使web项目停掉也不会影响job的运行情况,实际运行job的是后台程序,随着job数量的增多,达到三四百个时,job的运行速度也达到了难以接受的程度。
& & & 方案1:
& & & 针对出现的问题,经测试发现,job一经运行就不会再重新从资源库读取了(针对定时运行的job),job中的转换则每次都会重新从资源库中读取,我找到了......JobEntryTrans这个类,这是一个作业控件,代表一个转换,调试跟踪代码就会发现他确实会在每次运行时重新读取资源库加载转换,因为kettle每次运行时都是克隆了这个控件,然后运行时重新加载,具体还是调试跟踪代码才能更清楚的了解,对于他为什么要克隆,我也不是很了解,当然也不敢乱动相关代码,估计因为里面有些状态属性吧。
& & & 解决问题的思路是尽量从底层,具体的问题点解决问题,一开始就想过优化具体读取转换的过程,但没有去尝试,而是采用了更具针对性,更简单的方式,在JobEntryTrans这个类里面建了个静态Map,用于缓存读取过的转换,这个缓存的生命周期与对应的job差不多,因为每次从数据库读取JobEntryTrans这个控件时,都会清除该控件引用的转换的缓存,若多个job都引用了这个转换,那他的生命周期比job还短。
& & &这个方案需要改的代码量很小,读取转换时,先查看缓存,没有就读取资源库,然后缓存,下次就直接用缓存了,其次就是在JobEntryTrans类的读取资源库的方法中加上清除对应转换缓存的代码。
& & &缓存??到底以什么形式缓存?为了尽量减小对原有逻辑的影响我先缓存了xml,就是调用了TransMeta的getXml方法,下次读取时就直接使用xml,这个方式在测试环境是没问题的,但在正式环境中始终有个转换有问题,运行异常,正式环境不方便调试,于是我就改为直接缓存TransMeta对象,每次就直接用了,这对写代码来说肯定更简单了,但这个对象里面到底有哪些东西,我没有精力去仔细分析,抱着试一试的态度实现了,经测试效果还是很不错的,缓存xml时的问题不存在了,经过一段时间的运行,没有发现什么大问题,就是记录的日志感觉有点问题,该方案现在依然在使用中,基本没有影响kettle完成它的工作。
& & & 这个代码量不大,就贴出来,就是修改了kettle5.4中TransMeta这个类的两个方法:
// Load the jobentry from repository
public void loadRep( Repository rep, IMetaStore metaStore, ObjectId id_jobentry, List&DatabaseMeta& databases,
List&SlaveServer& slaveServers ) throws KettleException {
//.................这里的源码与官方一致。
passingAllParameters = rep.getJobEntryAttributeBoolean( id_jobentry, "pass_all_parameters", true );
if(transMetaMap.containsKey(getDirectory()+"/"+getName())){
logBasic( "该转换已经缓存,马上移除缓存:" + getDirectory()+"/"+getName() );
transMetaMap.remove(getDirectory()+"/"+getName());
} catch ( KettleDatabaseException dbe ) {
throw new KettleException( "Unable to load job entry of type 'trans' from the repository for id_jobentry="
+ id_jobentry, dbe );
public TransMeta getTransMeta( Repository rep, IMetaStore metaStore, VariableSpace space ) throws KettleException {
TransMeta transMeta = null;
switch( specificationMethod ) {
case FILENAME:
long start = new Date().getTime();
String filename = space.environmentSubstitute( getFilename() );
logBasic( "Loading transformation from XML file [" + filename + "]" );
transMeta = new TransMeta( filename, metaStore, null, true, this, null );
log.logBasic(transMeta.getName()+",从文件读取转换耗时:"+(new Date().getTime()-start));
case REPOSITORY_BY_NAME:
if(transMetaMap.containsKey(getDirectory()+"/"+getName())){
logBasic( "该转换已经缓存,直接使用缓存:" + getDirectory()+"/"+getName() );
transMeta = transMetaMap.get(getDirectory()+"/"+getName());
String transname = space.environmentSubstitute( getTransname() );
String realDirectory = space.environmentSubstitute( getDirectory() );
logBasic( BaseMessages.getString( PKG, "JobTrans.Log.LoadingTransRepDirec", transname, realDirectory ) );
if ( rep != null ) {
// It only makes sense to try to load from the repository when the
// repository is also filled in.
// It reads last the last revision from the repository.
RepositoryDirectoryInterface repositoryDirectory = rep.findDirectory( realDirectory );
transMeta = rep.loadTransformation( transname, repositoryDirectory, null, true, null );
transMetaMap.put(getDirectory()+"/"+getName(), transMeta);
logBasic( "从资源库获取转换并缓存:" + getDirectory()+"/"+getName() );
throw new KettleException( BaseMessages.getString( PKG, "JobTrans.Exception.NoRepDefined" ) );
case REPOSITORY_BY_REFERENCE:
if ( transObjectId == null ) {
throw new KettleException( BaseMessages.getString( PKG,
"JobTrans.Exception.ReferencedTransformationIdIsNull" ) );
if ( rep != null ) {
// Load the last revision
transMeta = rep.loadTransformation( transObjectId, null );
throw new KettleException( "The specified object location specification method '"
+ specificationMethod + "' is not yet supported in this job entry." );
if ( transMeta != null ) {
// copy parent variables to this loaded variable space.
transMeta.copyVariablesFrom( this );
// Pass repository and metastore references
transMeta.setRepository( rep );
transMeta.setMetaStore( metaStore );
return transM
} catch ( Exception e ) {
throw new KettleException( BaseMessages.getString( PKG, "JobTrans.Exception.MetaDataLoad" ), e );
& & & 方案1总结:
& & & 1.该方案解决了转换重复加载的问题,在一次加载后,进行了缓存,并给出了清除缓存重新加载的机制,实际使用效果是:第一次运行仍然很慢,但之后就是飞一般的感觉,之前没有数据每次都要运行十来分钟,现在就只需几秒钟,证明这个问题就是导致kettle运行慢的唯一原因,这里我们不完美的解决了他。
& & & 2.该方案第一次运行job仍然很慢,日志记录可能也有问题,对时效性要求高的话,可能该方案还不能完全解决问题,因为重启后台程序时,job的延迟仍然可能达到一个多小时,当然也就刚重启那一下的事,就看你的业务能不能接受了。
& & & 方案2:
& & & 方案1中重启后台程序导致的高延迟,我们的业务上仍然不能忍受,于是开始思考其他解决方案。
& & & 通过尝试解决这个问题,认为想通过改进kettle从数据库读取转换的过程来优化是行不通的,因为kettle读取资源库的过程很复杂,涉及很多逻辑(仔细看过相关代码,并测试了各个关键操作的耗时,对测试出的耗时操作进行分析后认为很难优化),另外经测试,文件资源库读取转换要比数据库资源库快近100倍,于是决定采用如下方式解决:
& & &&1.创建job等过程不做任何改变,所有操作都对数据库资源库。
& & &&2.后台job在获取一些job信息时也使用数据库资源库,但在最后一步运行job时,获取文件资源库中的job并运行,这里就要求job启动时,两个资源库相关job要相同,特别是job名称和路径,这是确认同一个job的方式,实际运行的就是文件资源库中的job了。
& & &&3.这里就涉及到文件资源库与数据库资源库的同步问题了:
& & &&1)人工手动将数据库资源库导出到指定的文件资源库。
& & &&2)页面控制job的启动与停止,只在页面请求启动job时后台自动将该job相关信息同步到文件资源库(待实现),其他地方不做任何改变,即使后台程序重启,也不用同步job,这样重启后台程序时就不会出现几十分钟job都没有运行起来的情况了。
& & &&3)当我们修改了job,确认要运行了,这时就在页面先停止job,再启动就实现同步了。
& & &&以上只是问题解决思路,具体实现也实现了个大概,资源库同步还有问题,通过以上方式实际并没有解决数据库资源库读取慢的问题,这需要对kettle有更深入的了解的技术大牛来解决。
& & &&这个解决方案达到的效果:
& & &&1.解决了后台程序重启时高延迟的问题(主要问题)。
& & &&2.间接减轻了数据库资源库的压力,这样我们创建修改job时就会快一些。
& & &&3.算是备份了一下资源库。
& & & 4.给出了同步资源库的方案,其实这里的文件资源库与上面的缓存很相似,只是缓存得更彻底。
阅读(...) 评论()使用kettle从数据库读取一个xml(CLOB),然后解析得到数据插入另一张表,该怎么做?_百度知道
使用kettle从数据库读取一个xml(CLOB),然后解析得到数据插入另一张表,该怎么做?
步骤:表输入-&java 代码 -&字段选择-&插入/更新,这样的步骤能做到么,如果能java代码怎么写和用,不能的话有什么方法做到?
您的回答被采纳后将获得:
系统奖励15(财富值+成长值)+难题奖励10(财富值+成长值)+提问者悬赏150(财富值+成长值)
我有更好的答案
表输入-&插入更新
为您推荐:
其他类似问题
clob的相关知识
换一换
回答问题,赢新手礼包
个人、企业类
违法有害信息,请在下方选择后提交
色情、暴力
我们会通过消息、邮箱等方式尽快将举报结果通知您。没有更多推荐了,
不良信息举报
举报内容:
kettel解析XML难点
举报原因:
原文地址:
原因补充:
最多只允许输入30个字
加入CSDN,享受更精准的内容推荐,与500万程序员共同成长!}

我要回帖

更多关于 kettle遍历数据 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信