Three months passed from the last post. I have many ideas but there is no time, because of my job where I developing interesting project (as java developer) and parallel studied machine learning course on coursera.org (I learned many useful things and increase my python skills).
Introduction
Now I going to talk about Oracle Data Integrator, specifically about customizing LKM. Data flows between different RDBMS is one of the main tasks for data integrator. ODI has many knowledge modules "from the box", but it could not cover all needs. For example, I had one task six month ago. I had to load data from MS SQL to TERADATA, If tables are small, you can use standard LKM SQL to SQL, but if table contains millions rows it is a problem, because this LKM works very slow. You can see this notification in description of module
Restrictions:
- This Knowledge Module is NOT RECOMMENDED when using LARGE VOLUMES. Other specific modules using Bulk utilities (SQL)
Obviously, we have to use modules which uses bulk utilities.
Background
Goal
So, we need knowledge module which uses bulk utils in both sides (on source and target). In our case, it is BCP and TBUILD for Teradata. The first utility export data, and second one load data into target system.
Implementation
We could take two knowledge modules and merge it in one. For example lets take
LKM MSSQL to MSSQL (BCP) - it will be a base module for src procedure
LKM SQL to TERADATA (TTU) - for target
The first module uses standard BCP utility for bulk export, there for we take only part for exporting data in file.
From the second LKM we get many commands, lets see the result (is a merge module)
LEGEND
YELLOW - command from LKM MSSQL to MSSQL (BCP)-
RED commands from LKM SQL to TERADATA (TTU)
GREEN custom commands....
And we have to merge options too
But its not the end. LKM needs more deep customization.
1. Task export data from SQL in file
Export from BCP must be in the format suitable for import utility teradata TTU. This prevents unnecessary file transformation.
Original task BCP out data to temporary bcp file looks like:
1
2
3
4
5
6
| <%=snpRef.getUserExit("CMD_SHELL")%> "<%=snpRef.getUserExit("BCP_DIRECTORY")%>/bcp" <%=snpRef.getInfo("SRC_WORK_CATALOG")%>.<%=snpRef.getInfo("SRC_WORK_SCHEMA")%>.<%=snpRef.getInfo("COLL_NAME")%>_V out <%=snpRef.getUserExit("DEFAULT_DIRECTORY")%>/X<%=snpRef.getSession("SESS_NO")%>.bcp <%
if (odiRef.getUserExit("UNICODE_SUPPORT").equals("1"))
out.print(" -w ");
else
out.print(" -c ");
%> -S<%=snpRef.getInfo("SRC_DSERV_NAME")%> -U<%=snpRef.getInfo("SRC_USER_NAME")%> -P<%=odiRef.getInfo("SRC_PASS")%> > <%=snpRef.getUserExit("DEFAULT_DIRECTORY")%>/X<%=snpRef.getSession("SESS_NO")%><%=odiRef.getStep( "NNO" )%>.out.log
|
And
it exports whole table, but I had some limitations by access (I didn't
have access on creating view or table on src server), I was going to
export only several filtered columns...
Therefore I had to modified code for using query instead of table name. Notice that BCP is command line utility so, we had to remove all newline symbols
.replaceAll("\n"," "). New step looks like:
OdiOSCommand -ERR_FILE=<?=getLKMOptions.getTempDir(\u0022<%=odiRef.getUserExit("DEFAULT_DIRECTORY")%>\u0022)?>X<%=snpRef.getSession("SESS_NO")%><%=odiRef.getStep( "NNO" )%>.out.err -OUT_FILE=<?=getLKMOptions.getTempDir(\u0022<%=odiRef.getUserExit("DEFAULT_DIRECTORY")%>\u0022)?>X<%=snpRef.getSession("SESS_NO")%><%=odiRef.getStep( "NNO" )%>.out.log
<% path=odiRef.getOption("BCP_PATH");
if (path.length()>0)
out.print(path+"/bcp ");
else
out.print("bcp ");
query=("select "+odiRef.getPop("DISTINCT_ROWS")+ odiRef.getColList("", "[EXPRESSION]\t[ALIAS_SEP] [CX_COL_NAME]", ",", "", "")+" from "+odiRef.getFrom()+" where (1=1) "+ odiRef.getFilter()+odiRef.getJrnFilter()+odiRef.getJoin()+odiRef.getGrpBy()+odiRef.getHaving()).replaceAll("\n"," ");
%> "<% out.print(query); %>" queryout <?=getLKMOptions.getTempDir(\u0022<%=odiRef.getUserExit("DEFAULT_DIRECTORY")%>\u0022)?>X<%=snpRef.getSession("SESS_NO")%><%=odiRef.getStep( "NNO" )%>.bcp <%
if (odiRef.getUserExit("UNICODE_SUPPORT").equals("1"))
out.print(" -w ");
else
out.print(" -c ");
%> <%
if (!odiRef.getUserExit("BCP_LOCAL_SERVER").equals("1"))
out.print(" -S"+odiRef.getInfo("SRC_DSERV_NAME"));
%> -U<%=odiRef.getInfo("SRC_USER_NAME")%> -P"<%=odiRef.getInfo("SRC_PASS")%>" <%
terminator=odiRef.getOption("FIELD_SEPARATOR");
if (terminator.length()>0) {
if (terminator.equals("' '"))
out.print(" -t"+(char)34+" "+(char)34);
else
out.print(" -t"+terminator);
}
%> <% recordTerminator=odiRef.getUserExit("RECORD_SEPARATOR");
if (recordTerminator.length() > 0)
out.print(" -r" + recordTerminator);
%> -C1251
Note, that we use the same FIELD_SEPARATOR and ROW_SEPARATOR in both utilities.
The result of this stage is *.bcp file with data, than we have to load it into TERADATA.
2. Loading data from file to Teradata using TBUILD
LKM File to Teradata (TTU) allows using of several utilits:
I used TPT-LOAD, because I didn't modified code for supporting all utilities.
At fist we have to generate script for TPT_LOAD utility. It is Generate ttu script stage.
OdiOutFile "-File=<?=getLKMOptions.getTempDir(\u0022<%=odiRef.getUserExit("DEFAULT_DIRECTORY")%>\u0022)?>X<%=snpRef.getSession("SESS_NO")%><%=odiRef.getStep( "NNO" )%>.script"
<% if (odiRef.getOption("TERADATA_UTILITY").startsWith("TPT")) {
/********************************************************************************/
/* Parallel Transporter Script */
/********************************************************************************/ %>
DEFINE JOB ODILoad<%=odiRef.getStep("SESS_NO")%>
DESCRIPTION 'ODI step name: <%=odiRef.getStep("STEP_NAME")%> ODI step #:<%=odiRef.getStep("NNO")%> from session #<%=odiRef.getStep("SESS_NO")%>'
(
DEFINE SCHEMA StagingTable
DESCRIPTION 'Staging table for <%=odiRef.getTable("L", "TARG_NAME", "D")%>'
(
<%=odiRef.getColList("", "\t[CX_COL_NAME]\t"+
/* Is this a date? Then we need to translate into ANSIDATE type */
"<? if (\u0022[DEST_DT]\u0022.equals(\u0022DATE\u0022)) { ?>"+
"ANSIDATE"+
/* otherwise use standard data type */
"<? } else { ?>"+
"[DEST_WRI_DT]"+
"<?};?> "+ odiRef.getInfo("DEST_DDL_NULL"),
",\n", "","")%>
);
DEFINE SCHEMA SourceFile
DESCRIPTION 'Staging table for <%=odiRef.getTable("L", "TARG_NAME", "D")%>'
(
<%=odiRef.getColList("", "\t[CX_COL_NAME]\t"+
/* Is this a date or timestamp field? Then we set the buffer length independent of LONGC */
"<? if (\u0022[DEST_DT]\u0022.equals(\u0022DATE\u0022) ||"+
" \u0022[DEST_DT]\u0022.equals(\u0022TIMESTAMP\u0022) ||"+
" \u0022[DEST_DT]\u0022.equals(\u0022TIMESTAMP(0)\u0022) ||"+
" \u0022[DEST_DT]\u0022.equals(\u0022TIMESTAMP() WITH TIME ZONE\u0022) ||"+
" \u0022[DEST_DT]\u0022.equals(\u0022TIMESTAMP(0)\u0022)) {?>"+
"VARCHAR(21)"+
"<?} else ?>"+
/* Is this a numeric field? Then we add +2 to the length to include potential sign and decimal separator. */
"<? if (\u0022[DEST_DT]\u0022.equals(\u0022BIGINT\u0022) ||"+
" \u0022[DEST_DT]\u0022.equals(\u0022BYTE\u0022) ||"+
" \u0022[DEST_DT]\u0022.equals(\u0022BYTEINT\u0022) ||"+
" \u0022[DEST_DT]\u0022.equals(\u0022DECIMAL\u0022) ||"+
" \u0022[DEST_DT]\u0022.equals(\u0022DOUBLE PRECISION\u0022) ||"+
" \u0022[DEST_DT]\u0022.equals(\u0022FLOAT\u0022) ||"+
" \u0022[DEST_DT]\u0022.equals(\u0022INTEGER\u0022) ||"+
" \u0022[DEST_DT]\u0022.equals(\u0022NUMERIC\u0022) ||"+
" \u0022[DEST_DT]\u0022.equals(\u0022REAL\u0022) ||"+
" \u0022[DEST_DT]\u0022.equals(\u0022SMALLINT\u0022)) {?>"+
"VARCHAR(<?=(new Integer([LONGC]+2)).toString()?>)"+
"<? } else { ?>"+
"VARCHAR([LONGC])"+
"<?};?>"+
"", ",\n", "","")%>
);
DEFINE OPERATOR FileReader
DESCRIPTION 'Read File Using DataConnector'
TYPE DATACONNECTOR PRODUCER
SCHEMA SourceFile
ATTRIBUTES
(
VARCHAR PrivateLogName,
VARCHAR FileName = '<% out.print( odiRef.getOption("DEFAULT_DIRECTORY")+"\/X");%><%=snpRef.getSession("SESS_NO")%><%=odiRef.getStep( "NNO" )%>.bcp', /*Name of the file read from or written to*/
VARCHAR OpenMode = 'Read', /*How to open the file: read, write, or writeappend*/
VARCHAR Format = 'delimited',
VARCHAR TextDelimiter = '<%=odiRef.getOption("FIELD_SEPARATOR")%>',
VARCHAR IndicatorMode = 'N', /*Whether indicator bytes are used in the operation*/
INTEGER RowsPerInstance,
INTEGER VigilWaitTime,
VARCHAR VigilStartTime,
VARCHAR VigilStopTime,
VARCHAR VigilNoticeFileName,
VARCHAR TraceLevel = 'none'
);
<% if (odiRef.getOption("TERADATA_UTILITY").equals("TPT-LOAD")) { %>
DEFINE OPERATOR LoadStagingTable
TYPE LOAD
SCHEMA *
ATTRIBUTES
(
INTEGER MaxSessions = <%=odiRef.getOption("SESSIONS")%>,
<% if ((new java.lang.Integer(odiRef.getOption("MAX_ALLOWED_ERRORS"))).longValue()>0) {%>INTEGER ErrorLimit = <%=odiRef.getOption("MAX_ALLOWED_ERRORS")%>,<%}; %>
VARCHAR UserName = '<%=odiRef.getInfo("DEST_USER_NAME")%>',
VARCHAR UserPassword = '<%=odiRef.getInfo("DEST_PASS")%>',
VARCHAR LogTable = '<%=odiRef.getTable("L", "COLL_NAME", "W").replaceAll("<.=","<?s=").replaceAll("..>",".replaceAll((char)92+\\u0022.\\u0022,\\u0022.LOG_\\u0022); out.print(s.substring(0,s.indexOf(\\u0022.\\u0022)+1+java.lang.Math.min(s.length()-s.indexOf(\\u0022.\\u0022)-1,"+ odiRef.getInfo("DEST_MAX_TAB_NAME_LEN") +")));?>")%>',
VARCHAR TargetTable = '<%=odiRef.getTable("L", "COLL_NAME", "W")%>T',
VARCHAR ErrorTable1 = '<%=odiRef.getTable("L", "COLL_NAME", "W").replaceAll("<.=","<?s=").replaceAll("..>",".replaceAll((char)92+\\u0022.\\u0022,\\u0022.ET_\\u0022); out.print(s.substring(0,s.indexOf(\\u0022.\\u0022)+1+java.lang.Math.min(s.length()-s.indexOf(\\u0022.\\u0022)-1,"+ odiRef.getInfo("DEST_MAX_TAB_NAME_LEN") +")));?>")%>',
VARCHAR ErrorTable2 = '<%=odiRef.getTable("L", "COLL_NAME", "W").replaceAll("<.=","<?s=").replaceAll("..>",".replaceAll((char)92+\\u0022.\\u0022,\\u0022.UV_\\u0022); out.print(s.substring(0,s.indexOf(\\u0022.\\u0022)+1+java.lang.Math.min(s.length()-s.indexOf(\\u0022.\\u0022)-1,"+ odiRef.getInfo("DEST_MAX_TAB_NAME_LEN") +")));?>")%>',
VARCHAR PauseAcq,
VARCHAR PrivateLogName,
VARCHAR TdpId ='<%=odiRef.getInfo("DEST_DSERV_NAME")%>'
);
<% }; if (odiRef.getOption("TERADATA_UTILITY").equals("TPT-SQL-INSERT")) { %>
DEFINE OPERATOR LoadStagingTable
TYPE INSERTER
SCHEMA *
ATTRIBUTES
(
VARCHAR UserName = '<%=odiRef.getInfo("DEST_USER_NAME")%>',
VARCHAR UserPassword ='<%=odiRef.getInfo("DEST_PASS")%>',
VARCHAR PrivateLogName,
VARCHAR TdpId ='<%=odiRef.getInfo("DEST_DSERV_NAME")%>'
);
<%}%>
APPLY
(
'INSERT INTO <%=odiRef.getTable("L", "COLL_NAME", "W")%>T
(
<%=odiRef.getColList("", "\t[CX_COL_NAME]", ",\n\t", "","")%>
)
VALUES
(
<%=odiRef.getColList("", ":[CX_COL_NAME]"+
/* Is this a date or timestamp field? Then apply teradata cast. */
"<? if (\u0022[DEST_DT]\u0022.equals(\u0022DATE\u0022) ||"+
" \u0022[DEST_DT]\u0022.equals(\u0022TIMESTAMP\u0022) ||"+
" \u0022[DEST_DT]\u0022.equals(\u0022TIMESTAMP(0)\u0022) ||"+
" \u0022[DEST_DT]\u0022.equals(\u0022TIMESTAMP() WITH TIME ZONE\u0022) ||"+
" \u0022[DEST_DT]\u0022.equals(\u0022TIMESTAMP(0) WITH TIME ZONE\u0022)) {?>"+
"\t(TIMESTAMP(3), FORMAT ''YYYYMMDDHHMISS.S(3)'')"+
"<? };?>"
, ",\n\t\t", "","")%>
);'
)
TO OPERATOR
(
LoadStagingTable[1]
)
SELECT
<%=odiRef.getColList("", "\t[CX_COL_NAME]", ",\n\t", "", "")%>
FROM OPERATOR(FileReader[1]);
);
<%}%>
This script reads export BCP file and loads it into work table on target system, I made minimal changes for
Generate ttu script stage.
Next stage is
Load to staging area. This part is very important, because it runs tbuild utility with a script which we built on previous step.
from jarray import array
from java.lang import Thread
from java.io import BufferedOutputStream
from java.io import PrintStream
from java.lang import String
from java.lang import Runtime
from java.lang import System
from java.io import StringWriter
from java.io import InputStreamReader
from java.io import OutputStreamWriter
from java.io import BufferedReader
from java.io import BufferedWriter
from java.io import FileReader
from java.io import FileWriter
from jarray import array
from jarray import zeros
import com.sunopsis.dwg.tools.SqlUnload as JSnpsSqlUnload
import java.util.Vector as JVector
import threading
import time
import os
from datetime import datetime
#
# Source connection variables
#
srcdriver = "<%=odiRef.getInfo("SRC_JAVA_DRIVER")%>"
srcurl = "<%=odiRef.getInfo("SRC_JAVA_URL")%>"
srcuser = "<%=odiRef.getInfo("SRC_USER_NAME")%>"
srcpass = "<%=odiRef.getInfo("SRC_ENCODED_PASS")%>"
fetchsize = "<%=odiRef.getInfo("SRC_FETCH_ARRAY")%>"
LOG_FILE_NAME='<%=odiRef.getOption("LOG_FILE")%>'
#
# Temporary Files variables
#
filename = r"<?=getLKMOptions.getTempDir(\u0022<%=odiRef.getUserExit("DEFAULT_DIRECTORY")%>\u0022)?>X<%=snpRef.getSession("SESS_NO")%><%=odiRef.getStep( "NNO" )%>.%s"
#"<%=odiRef.getOption("DEFAULT_DIRECTORY")%>/<%=odiRef.getTable("L", "TARG_NAME", "W")%>_<%=odiRef.getInfo("SRC_SET_NAME")%>.%s"
logname = filename % "log"
scriptname = filename % "script"
outname = filename % "out"
dataname = r"<%
out.print( odiRef.getOption("DEFAULT_DIRECTORY")+"/"+odiRef.getTable("L", "TARG_NAME", "W")+"_"+odiRef.getInfo("SRC_SET_NAME")+".data" );
%>"
#
# Unload Options
#
fieldsep = "<%=odiRef.getOption("FIELD_SEPARATOR")%>"
datefmt = "yyyyMMddHHmmss.SSS"
# query has already been defined in previous step
#
# Load Options
#
loadcmd = r"""<%
if (odiRef.getOption( "TERADATA_UTILITY" ).equals( "FASTLOAD" )) { %>fastload < "%s" > "%s"<% } else
if (odiRef.getOption( "TERADATA_UTILITY" ).equals( "MLOAD" )) { %>mload < "%s" > "%s"<% } else
if (odiRef.getOption( "TERADATA_UTILITY" ).equals( "TPUMP" )) { %>tpump < "%s" > "%s"<% } else
{ %>tbuild -f "%s" tdjob<%};%> """
loadcmd = loadcmd % (scriptname)
utility = r"""<%
if (odiRef.getOption( "TERADATA_UTILITY" ).equals( "FASTLOAD" )) { %>fastload<% } else
if (odiRef.getOption( "TERADATA_UTILITY" ).equals( "MLOAD" )) { %>mload<% } else
if (odiRef.getOption( "TERADATA_UTILITY" ).equals( "TPUMP" )) { %>tpump<% } else
{ %>tbuild -f %s <%};%> """ <%
if (odiRef.getOption( "TERADATA_UTILITY" ).startsWith( "TPT" )) { %> % (scriptname)<%};%>
lf = None
class Gobbler(Thread):
def __init__(self, stream, label):
self.br = BufferedReader(InputStreamReader(stream))
self.label = label
def run(self):
line=self.br.readLine()
while (line != None):
print >> lf, self.label + line
line=self.br.readLine()
#print >>lf, "ODI: End of polling of " + self.label
class Writer(Thread):
def __init__(self, stream):
self.ps = PrintStream(BufferedOutputStream(stream))
def run(self):
fsrc = open(r"<%=odiRef.getOption("DEFAULT_DIRECTORY")%>/<%=odiRef.getTable("L", "TARG_NAME", "W")%>_<%=odiRef.getInfo("SRC_SET_NAME")%>.script", 'r')
try:
for lsrc in fsrc.readlines():
self.ps.println(lsrc)
finally:
fsrc.close()
self.ps.flush()
self.ps.close()
#write log to file
def write_log(strlog):
logfile=open(LOG_FILE_NAME, "a")
strtime=datetime.now().strftime('%Y-%m-%d %H:%M:%S')+':'+strlog
strtime=strtime+'\n'
logfile.write(strtime)
logfile.close()
print strtime
#===========================
# load()
# executes the file based loadd
#===========================
def load():
write_log('Run export from teradata '+loadcmd)
#ok status
error_result=0
try:
out,err,status = run_process()
write_log (' output='+out +'\n error='+err+'\n status='+str(status))
if status!=0:
write_log ('ERROR status='+str(status))
error_result=1
else:
write_log ('Status OK')
except Exception,ex:
err=str(ex)
write_log ('Error running command '+ loadcmd+'\n'+err)
error_result=1
#if error rise
if(error_result==1):
raise Exception(error_result,err)
def run_process():
#run console process and returns its output
#Return (process output, error output, status code)
import subprocess
loadcmd = r"""tbuild -f %s tdJob"""%(scriptname)
p = subprocess.Popen(loadcmd.split(' '), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
#wait until process finished
p.wait
#read output
out, err = p.communicate()
#process return code
status = p.returncode
return out,err,status
#snpssqlunload()
load()
I rewrote load procedure and add run_process function. Standard load() procedure uses os.system function to run tbuild
But I used
subprocess class to run utility, because it allowes to read output from console and I could write output into the log file:
out,err,status = run_process()
write_log (' output='+out +'\n error='+err+'\n status='+str(status))
if status!=0:
write_log ('ERROR status='+str(status))
error_result=1
else:
write_log ('Status OK')
Output in the log files helps to debug errors which occures on the load stage.
Verification
Some words about custom stages Calculate number of exported rows and Verify results
These stages are uses for verification number of exported/imported rows, it must be equal. You could made these steps optional (but I worried about data losing therefore always use it).
Calculate number of exported rows (pre-integration step)
It calculates the number of rows in export file
Verify results (post-integration step) - it count rows in target
Results
Lets try new LKM MS SQL to TERADATA knowledge module,
Test data:
Sources Microsoft SQL table size: approximately 70 000 000 rows
BCP file size: approximately 7 GB
Time execution: 4001 seconds
Our new module loads 70 000 000 rows to target Teradata table for 4001 - it is not bad result!
I compared it with standard LKM SQL to SQL with same source table
Sources Microsoft SQL table size: approximately 70 000 000 rows....
I have waited for 10 000 seconds.... And aborted session, because it loads only 146000 rows in work table for this time.... Simple calculation, we need
479 000 seconds to load 70 millions rows.... Not fast, is not it?
Counclusion
ODI gives us powerful tools for Integration, many modules, but it are only peaces of the puzzle, where picture is our Integration Solution. As you can see, it is not a hard task to create custom knowledge modules for flows between databases, which not implemented in standard LKM modules, we could merge existing modules and use it. The result saves our time and increase performance of integration solution.