数据订阅 SDK 订阅到的数据为 DTS 自定义格式,本小节简单介绍各种类型 SQL 语句解析的代码实现。
DDL 解析
如果一个 Record 是 DDL 语句,那么这个 Record 的操作类型为 DDL,且 DDL 语句存储在第一列的 value 中。获取 DDL 语句的代码示例如下:
String ddl_string;
Record.Type type=record.getOpt();
if(type.equals(Record.Type.DDL)){
List<DataMessage.Record.Field> fields = record.getFieldList();
ddl_string = fields.get(0).getValue().toString();
}
Insert 解析
如果一个 Record 是 Insert 语句,那么这个 Record 的操作类型为 INSERT。获取 Insert 完整语句的代码示例如下:
StringBuilder insert_string=new StringBuilder();
Record.Type type=record.getOpt();
DataMessage.Record.Field field;
StringBuilder FieldName=new StringBuilder();
StringBuilder FieldValue = new StringBuilder();
if(type.equals(Record.Type.INSERT)){
int i=0;
List<DataMessage.Record.Field> fields = record.getFieldList();
for (; i < fields.size(); i++) {
field = fields.get(i); FieldName.append('`'+field.getFieldname().toLowerCase()+'`');
FieldValue.append("'"+field.getValue()+"'");
if (i != fields.size() - 1) {
FieldName.append(',');
FieldValue.append(',');
}
}
insert_string.append("insert "+ record.getTablename()+"("+FieldName.toString()+") values("+FieldValue.toString()+");");
}
Update 解析
如果一个 Record 是 Update 语句,那么这个 Record 的操作类型为 UPDATE。
update 更新前的字段存储在 Record.getFieldList() 中索引为偶数的 Field,更新后的字段值存储在索引为奇数的 Field。
下面的示例代码介绍当 update 的表有主键时,如何获取 Update 完整语句:
StringBuilder update_string=new StringBuilder();
Record.Type type=record.getOpt();
DataMessage.Record.Field field;
StringBuilder SetValue = new StringBuilder();
StringBuilder WhereCondition = new StringBuilder();
String ConditionStr;
boolean hasPk=false;
boolean pkMode=false;
boolean hasSet=false;
if(type.equals(Record.Type.UPDATE)){
int i=0;
DataMessage.Record.Field OldField = null;
DataMessage.Record.Field NewField = null;
List<DataMessage.Record.Field> fields = record.getFieldList();
for (; i <fields.size() ; i++) {
if (i % 2 == 0) {
OldField = fields.get(i);
continue;
}
NewField = fields.get(i);
field = NewField;
if (field.isPrimary()) {
if (hasPk) {
WhereCondition.append(" and ");
}
//where old value
ConditionStr = getFieldValue(OldField);
if(ConditionStr==null){ WhereCondition.append("`"+field.getFieldname().toLowerCase()+"`" + " " + "is null");
}else{
WhereCondition.append("`"+field.getFieldname().toLowerCase()+"`"+" = "+ "'"+OldField.getValue()+"'");
}
hasPk = true;
}
if (hasSet) {
SetValue.append(",");
}
SetValue.append("`"+field.getFieldname().toLowerCase()+"`" + " = " + "'"+field.getValue()+"'");
String setStr = getFieldValue(field);
hasSet = true;
}
update_string.append("Update "+record.getTablename() +" Set " + SetValue + " Where "+WhereCondition +";");
}
protected String getFieldValue(Field field) throws Exception {
ByteString byteString = field.getValue();
if (byteString == null) {
return null;
}
else {
String value;
if (field.getType() == com.aliyun.drc.client.message.DataMessage.Record.Field.Type.STRING && field.getEncoding() != null && field.getEncoding() != "ASCII") {
value = field.getValue().toString(field.getEncoding());
}
else {
value = byteString.toString();
}
return value;
}
}
Delete 解析
如果一个 Record 是 Delete 语句,那么这个 Record 的操作类型为 DELETE。下面的代码示例介绍当 Delete 对应的表有主键时,如何获取完整的 Delete 语句:
StringBuilder delete_string=new StringBuilder();
Record.Type type=record.getOpt();
DataMessage.Record.Field field;
StringBuilder FieldName=new StringBuilder();
StringBuilder FieldValue = new StringBuilder();
StringBuilder DeleteCondition = new StringBuilder();
boolean hasPk=false;
boolean pkMode=false;
if(type.equals(Record.Type.DELETE)){
int i=0;
List<DataMessage.Record.Field> fields = record.getFieldList();
delete_string.append("Delete From" + record.getTablename() + "where");
// 表是否有主键?
if (record.getPrimaryKeys() != null) {
pkMode = record.getPrimaryKeys().length() > 0 ? true : false;
}
for (; i < fields.size(); i++) {
if ((pkMode && !field.isPrimary())) {
continue;
}
if (hasPk) {
delete_string.append(" and ");
}
delete_string.append(field.getFieldname() + "=" + field.getValue());
hasPk = true;
}
delete_string.append(";");
}
Replace 解析
如果源库执行了 Replace 语句,那么这个 Record 的操作类型为 UPDATE 或 INSERT。当 Replace 设置的值不存在时,Record 的操作类型为 INSERT,当 Replace 设置的值存在时,Record 的操作类型为 UPDATE。
Begin 解析
如果一个 Record 是 Begin 语句,那么这个 Record 的操作类型为 BEGIN。begin 语句没有实际内容,所以不需要对 Field 做处理,只需要判断操作为 begin 即可,代码示例如下:
StringBuilder sql_string = new StringBuilder();
Record.Type type = record.getOpt();
if(type.equals(Record.Type.BEGIN)){
sql_string.append("Begin");
}
Commit 解析
如果一个 Record 是 Commit 语句,那么这个 Record 的操作类型为 COMMIT。commit 语句没有实际内容,所以不需要对 Field 做处理,只需要判断操作为 commit 即可,代码示例如下:
StringBuilder sql_string = new StringBuilder();
Record.Type type = record.getOpt();
if(type.equals(Record.Type.COMMIT)){
sql_string.append("commit");
}
原创文章,作者:网友投稿,如若转载,请注明出处:https://www.cloudads.cn/archives/34138.html