1.Kettle插件开发
2.kettle 源码 怎么运行repositories插件
3.解决Kettle中Timestamp类型被转成Date类型
4.利用Kettle进行数据同步(下)
Kettle插件开发
工欲善其事,源码必先利其器。修改Kettle作为离线或准实时ETL的源码工具链,在特殊业务流程的修改处理中,往往需要定制化插件开发。源码这种情况可能出现在需要对特定数据进行特殊函数处理,修改源码原生封装现有组件无法满足数据捕获需求,源码或者缺乏对数据重复消费的修改处理。因此,源码定制开发流程处理组件,修改实现数据管理、源码验证、修改转换和特定数据源抽取,源码成为解决此类问题的修改关键。
以splunk查询插件为例,源码其开发遵循Kettle转换插件的homeassistant源码基本框架。该框架由四个关键类构成,分别扮演不同的角色。首先是步骤类(SplunkInput),继承自BaseStep父类,并实现StepInterface接口,是数据实际处理的位置。数据类(SplunkInputData)继承自BaseStepData,实现StepDataInterface接口,用于存储数据,每个执行线程拥有唯一的实例。对话框类(SplunkInputDialog)继承自BaseStepDialog,实现StepDialogInterface接口,用于组件步骤与开发者之间的交互配置。元数据类(SplunkInputMeta)继承自BaseStepMeta,实现StepMetaInterface接口,maltable源码保存和序列化步骤实例的配置信息,跟踪开发者设置的属性。
在展现配置方面,通过注解或plugin.xml实现插件在Kettle可视化UI工作台中的显示效果。配置项包括ID、名称、描述、图标、类别等,帮助开发者快速识别和理解插件功能。
开发过程中,步骤类需要实现初始化、释放资源和处理数据流的方法,如检查splunk连接状态、执行数据转换等。wepERP源码数据类在执行线程中提供数据存储空间,确保独立运行环境。对话框类负责输入属性监听、配置数据初始化、连接校验及结果字段预览等功能。元数据类则跟踪输入输出设置,提供参数序列化和扩展验证检查,确保步骤如何处理数据流行的信息清晰。
总体而言,Kettle自定义插件开发流程清晰,通过遵循基本框架和配置规则,开发者可以高效地实现复杂数据处理需求。如需深入了解源码、自定义插件集成或在开发过程中遇到问题,欢迎关注公众号"游走在数据之间",diffimg 源码加入QQKettle交流群,获取更多支持和交流。
kettle 源码 怎么运行repositories插件
1.2. 编译源码
项目加载eclipse
kettle项目拷贝eclipseworkspace目录eclipse新建java project项目名称拷贝kettle文件夹名称致
项目导入eclipse现错误图文件源码全部注释掉
编译
打build.xml, 右边Outline 点击kettle-run as -ant build
第编译候需要网载几文件放C:\Documents and Settings\Administrator\.subfloor网络载比较慢直接文件放C:\Documents and Settings\Administrator\编译完bin目录.bat文件拷贝Kettle目录点击Spoon.bat运行运行功代表编译已近通
用源码运行Spoon
Kettle源码工程本身能linux位机器调试swt配置linux库所运行源码前需要修改winswt步骤:工程à属性àJava Build Pathàlibrariesàadd jars
linuxSWT库删除
打src-uiàorg.pentaho.di.ui.spoonàSpoon.java Run As àjava application
二.源码析
2.1. 修改kettle界面
修改初始化界面
打package org.pentaho.di.ui.spoonSpoon.Java找main函数该main函数Spoon工具入口找语句
Splash splash = new Splash(display);
该语句spoon初始化显示界面跳定义Splash.java面函数
canvas.addPaintListener(new PaintListener() {
publicvoid paintControl(PaintEvent e) {
String versionText = BaseMessages.getString(PKG, "SplashDialog.Version") + " " + Const.VERSION; //$NON-NLS-1$ //$NON-NLS-2$
StringBuilder sb = new StringBuilder();
String line = null;
try {
BufferedReader reader = new BufferedReader(newInputStreamReader(Splash.class.getClassLoader().getResourceAsStream("org/pentaho/di/ui/core/dialog/license/license.txt")));//$NON-NLS-1$
while((line = reader.readLine()) != null) {
sb.append(line + System.getProperty("line.separator")); //$NON-NLS-1$
}
} catch (Exception ex) {
sb.append(""); //$NON-NLS-1$
Log.warn(BaseMessages.getString(PKG, "SplashDialog.LicenseTextNotFound")); //$NON-NLS-1$
}
String licenseText = sb.toString();
e.gc.drawImage(kettle_image, 0, 0);
// If this is a Milestone or RC release, warn the user
if (Const.RELEASE.equals(Const.ReleaseType.MILESTONE)) {
versionText = BaseMessages.getString(PKG, "SplashDialog.DeveloperRelease") + " - " + versionText; //$NON-NLS-1$ //$NON-NLS-2$
drawVersionWarning(e);
} elseif (Const.RELEASE.equals(Const.ReleaseType.RELEASE_CANDIDATE)) {
versionText = BaseMessages.getString(PKG, "SplashDialog.ReleaseCandidate") + " - " + versionText; //$NON-NLS-1$//$NON-NLS-2$
}
elseif (Const.RELEASE.equals(Const.ReleaseType.PREVIEW)) {
versionText = BaseMessages.getString(PKG, "SplashDialog.PreviewRelease") + " - " + versionText; //$NON-NLS-1$//$NON-NLS-2$
}
elseif (Const.RELEASE.equals(Const.ReleaseType.GA)) {
versionText = BaseMessages.getString(PKG, "SplashDialog.GA") + " - " + versionText; //$NON-NLS-1$//$NON-NLS-2$
}
Font verFont = new Font(e.display, "Helvetica", , SWT.BOLD); //$NON-NLS-1$
e.gc.setFont(verFont);
e.gc.drawText(versionText, , , true);
// try using the desired font size for the license text
int fontSize = 8;
Font licFont = new Font(e.display, "Helvetica", fontSize, SWT.NORMAL); //$NON-NLS-1$
e.gc.setFont(licFont);
// if the text will not fit the allowed space
while (!willLicenseTextFit(licenseText, e.gc)) {
fontSize--;
licFont = new Font(e.display, "Helvetica", fontSize, SWT.NORMAL); //$NON-NLS-1$
e.gc.setFont(licFont);
}
e.gc.drawText(licenseText, , , true);
}
});
1. 修改背景
找ui/image/面kettle_splash.png替换该
2. 修改版本信息
找e.gc.drawText(versionText, , , true); 改e.gc.drawText("海康威视数据交换平台V1.0", , , true);
3. 修改面描述性文字
找e.gc.drawText(licenseText, , , true);改e.gc.drawText("作者:海康", , , true);
4. 预览效
解决Kettle中Timestamp类型被转成Date类型
在Kettle处理数据元数据信息时,若数据库字段类型为Timestamp,系统通常会自动判断是否支持Timestamp数据类型。如果支持,将使用ValueMetaTimestamp;若不支持,则默认采用ValueMetaDate。然而,如何确保Timestamp类型被正确识别与处理呢?
深入源码,我们能发现Kettle判断Timestamp支持性的机制。关键在于数据库连接属性中的高级配置选项。
解决方案在于,在流程设计器Spoon中,务必勾选数据库连接属性的“Supports the timestamp data type”选项。这一操作能够明确指示Kettle,数据库支持Timestamp数据类型,从而避免将Timestamp错误地转换为Date。通过这一简单步骤,确保了数据处理的准确性和效率,避免了不必要的数据转换错误。
利用Kettle进行数据同步(下)
上篇内容对基于kettle的数据同步工程的构建进行了介绍,entrypoint.kjb作为工程执行的入口。
为了减少操作成本,并确保数据同步过程稳定、安全,需要从更高层次进行抽象,创建一个简单易用的系统。
以下是应用截图:
除了选择数据源和数据库,还增加了授权码,意味着只有授权范围内的用户才能使用该系统。
由于是内部使用,授权用户尚未实现后台管理,直接在应用数据库中添加,选择的数据源和数据库都通过配置文件生成。
文末会提供GitHub上的源码地址,有需要的读者可以进行二次开发。
一、数据库设计
数据库名称为kettle,目前包含两张表:
1、授权用户表。表中记录的用户可以使用数据同步系统。
2、同步记录表。记录用户的数据同步操作。
二、程序设计
系统简单实用,没有特别的设计。以下是重点说明的三点:
1、数据源及其参数配置。
在application.yml配置文件中,存在如下配置:
使用了springboot的@ConfigurationProperties注解。
其中的DBSetting定义如下:
通过客户端传递的参数,可以定位到相应的参数设置。
2、集成kettle的API。
由于kettle相关jar包放在了自建的nexus私服上,因此如果使用maven管理jar包,需要在settings.xml配置文件中做一些修改:
其中的mirrorOf节点添加了!pentaho-releases,表示排除pentaho-releases。
然后,在springboot工程的pom.xml中指定pentaho-releases的url。
接下来是核心的对接代码,具体可以参考工程源码。
3、异步执行作业
由于Job的执行时间可能会很长,主要取决于数据量,因此一个request的来回可能会导致TIMEOUT,需要改为异步模式。
核心思想是:启动新的线程,客户端定时轮询执行结果。
三、总结
本文分两篇文章介绍了如何利用kettle进行数据同步,并实现一个简易的系统,以降低操作成本和出错率。
介绍到此,如有疑问,请留言。
欢迎fork我的工程代码。