Python을 배워보자

Oracle DB에서 저장 프로시저 호출 및 동적 데이터 처리 방법

_Blue_Sky_ 2025. 4. 27. 00:07
728x90

 
Oracle 데이터베이스에서 저장 프로시저를 호출하고, 결과가 없을 경우 동적으로 대체 데이터를 생성하는 방법을 소개합니다. 이 글에서는 Python의 oracledb 모듈을 활용해 저장 프로시저를 호출하고, 결과 데이터를 분석하며, 필요 시 랜덤 데이터를 생성해 쿼리를 실행하는 과정을 세련되게 정리했습니다. 초보자부터 숙련자까지 참고할 수 있도록 코드와 출력 예시를 포함했습니다.

1. Oracle DB 연결 및 저장 프로시저 호출
먼저, Oracle DB에 연결하고 저장 프로시저를 호출하는 기본 구조를 살펴보겠습니다. 아래 코드는 oracledb 모듈을 사용해 DB에 연결하고, 특정 저장 프로시저(NEXTPOT.STP_STK_050101_LST_01)를 호출합니다.
 

 

CREATE OR REPLACE PROCEDURE NEXTPOT.STP_STK_050101_LST_01
(
    /*----------------------------------------------------------------------------------------------*/
    /* DEFAULT IN/OUT 변수 선언 */
    /*----------------------------------------------------------------------------------------------*/
    ot_status_code OUT VARCHAR2 /* 로그상태 Default NULL */
    ,io_ui_id IN OUT VARCHAR2 /* 화면ID Default NULL */
    ,io_prcd_id IN OUT VARCHAR2 /* 프로세스ID Default NULL */
    ,in_user_id IN VARCHAR2 /* 사용자ID */
    ,in_ip IN VARCHAR2 /* 사용자IP */
    ,ot_respon_code OUT VARCHAR2 /* 응답코드 */
    ,ot_res_msg OUT VARCHAR2 /* 응답메시지 */
    /*----------------------------------------------------------------------------------------------*/
    /* 업무적 IN/OUT 변수 선언 */
    /*----------------------------------------------------------------------------------------------*/
    , P_division_cd IN VARCHAR2 -- 부서 코드
    , P_branch_cd IN VARCHAR2 -- 지사 코드
    , P_start_date IN VARCHAR2 -- 영업일 부터
    , P_end_date IN VARCHAR2 -- 영업일 까지
    , ot_out_cursor OUT SYS_REFCURSOR -- 조회결과 CV_result
)

 

import oracledb
import random

# DB 연결 설정
DB_CONFIG = {
    "user": "your_username",
    "password": "your_password",
    "dsn": "your_tns_entry_or_easy_connect_string"
}

def call_stored_procedure():
    try:
        with oracledb.connect(**DB_CONFIG) as connection:
            with connection.cursor() as cursor:
                # OUT 커서 변수
                out_cursor = cursor.var(oracledb.DB_TYPE_CURSOR)
                
                # 저장 프로시저 호출
                cursor.callproc(
                    "NEXTPOT.STP_STK_050101_LST_01",
                    [
                        None, 
                        "TEST_UI_ID", 
                        "TEST_PRCD_ID", 
                        "test_user",
                        "127.0.0.1", 
                        None, 
                        None, 
                        "DIV01", 
                        
                        "BR01",
                        "2025-01-01", 
                        "2025-01-31", 
                        
                        out_cursor
                    ]
                )
                
                # 결과 커서 처리
                result_cursor = out_cursor.getvalue()
                rows = result_cursor.fetchall()
                column_names = [desc[0] for desc in result_cursor.description]
                
                # 결과 출력
                if rows:
                    print("Stored Procedure Results:")
                    print("Column Names:", column_names)
                    for row in rows:
                        print(row)
                else:
                    print("No data found. Generating fallback data...")
                    generate_random_fallback_data(connection,  result_cursor.description)
                    
    except oracledb.DatabaseError as e:
        print(f"Database error: {e}")
    except Exception as e:
        print(f"Unexpected error: {e}")
핵심 포인트:
  • oracledb.connect로 DB 연결을 설정합니다.
  • cursor.callproc를 통해 저장 프로시저를 호출하며, 입력 및 출력 파라미터를 처리합니다.
  • 결과가 없으면 대체 데이터를 생성하는 함수(generate_random_fallback_data)를 호출합니다.

 

728x90

2. 동적 대체 데이터 생성
저장 프로시저에서 데이터가 반환되지 않을 경우, 동적으로 대체 데이터를 생성해 쿼리를 실행합니다. 이 과정은 컬럼 이름과 타입을 기반으로 랜덤 데이터를 생성하며, 첫 번째 쿼리에만 컬럼 별칭을 추가합니다.
 
 
def generate_random_fallback_data(connection, col_info):
    try:
        with connection.cursor() as cursor:
            random_rows = []
            for _ in range(3):  # 3개의 랜덤 행 생성
                random_row = []
               for col_info in result_cursor.description:
                    col_name = col_info[0]  # 컬럼 이름
                    col_type = col_info[1]  # 컬럼 타입 (oracledb의 DB_TYPE_* 또는 타입 코드)
                    col_name_upper = col_name.upper()

                    # 데이터 타입에 따라 랜덤 값 생성
                    if col_type == oracledb.DB_TYPE_DATE or col_type == oracledb.DB_TYPE_TIMESTAMP or "DATE" in col_name_upper:
                        # 랜덤 연도 (2020~2025)
                        year = random.randint(2020, 2025)
                        # 랜덤 월 (1~12)
                        month = random.randint(1, 12)
                        # 해당 월의 최대 일수 계산
                        max_day = calendar.monthrange(year, month)[1]
                        # 랜덤 일 (1~최대 일수)
                        day = random.randint(1, max_day)
                        # YYYY-MM-DD 형식으로 포맷
                        random_value = f"{year}-{month:02d}-{day:02d}"
                    elif col_type in (oracledb.DB_TYPE_NUMBER, oracledb.DB_TYPE_INTEGER) or "ID" in col_name_upper or "NUM" in col_name_upper or "CODE" in col_name_upper:
                        random_value = random.randint(1, 100)  # 정수 (ID, 번호, 코드)
                    elif col_type == oracledb.DB_TYPE_BINARY_FLOAT or col_type == oracledb.DB_TYPE_BINARY_DOUBLE or "AMOUNT" in col_name_upper or "PRICE" in col_name_upper or "COST" in col_name_upper:
                        random_value = round(random.uniform(10.0, 1000.0), 2)  # 부동소수점 (금액, 가격)
                    elif col_type == oracledb.DB_TYPE_VARCHAR or col_type == oracledb.DB_TYPE_CHAR or col_type == oracledb.DB_TYPE_NVARCHAR:
                        if "NAME" in col_name_upper or "TITLE" in col_name_upper:
                            random_value = f"Sample_{random.choice(['Alpha', 'Beta', 'Gamma', 'Delta'])}_{random.randint(1, 100)}"  # 제한된 문자열
                        elif "EMAIL" in col_name_upper:
                            random_value = f"user{random.randint(1, 100)}@example.com"  # 이메일 형식
                        elif "FLAG" in col_name_upper or "STATUS" in col_name_upper:
                            random_value = random.choice(["Y", "N"])  # 참/거짓 (Y/N)
                        elif "DESC" in col_name_upper or "COMMENT" in col_name_upper:
                            random_value = f"Description_{random.randint(100, 999)}"  # 긴 문자열
                        else:
                            random_value = f"RandomVal_{random.randint(1, 1000)}"  # 기본 문자열
                    elif col_type == oracledb.DB_TYPE_BOOLEAN:
                        random_value = random.choice([True, False])  # 참/거짓 (Boolean)
                    else:
                        random_value = f"RandomVal_{random.randint(1, 1000)}"  # 기타 타입은 기본 문자열

                    random_row.append(random_value)
                random_rows.append(random_row)

            # 동적 쿼리 생성
            query_parts = []
            for idx, row in enumerate(random_rows):
                if idx == 0:
                    values_with_alias = ", ".join(
                        f"'{val}' AS {col}" if isinstance(val, str) else f"{val} AS {col}"
                        for val, col in zip(row, column_names)
                    )
                    query_parts.append(f"SELECT {values_with_alias} FROM DUAL")
                else:
                    values = ", ".join(
                        f"'{val}'" if isinstance(val, str) else str(val)
                        for val in row
                    )
                    query_parts.append(f"SELECT {values} FROM DUAL")

            query = " UNION ALL ".join(query_parts)
            
            # 쿼리 실행 및 결과 출력
            print("Generated SQL Query:")
            print(query)
            cursor.execute(query)
            rows = cursor.fetchall()
            
            print("Generated Fallback Data:")
            print("Column Names:", column_names)
            for row in rows:
                print(row)

    except oracledb.DatabaseError as e:
        print(f"Database error in fallback data: {e}")
    except Exception as e:
        print(f"Unexpected error in fallback data: {e}")
주요 특징:
  • 컬럼 이름에 따라 적절한 데이터 타입(날짜, 숫자, 문자열)을 생성합니다.
  • 첫 번째 SELECT 문에만 AS [컬럼명]을 추가해 쿼리 구조를 명확히 합니다.
  • UNION ALL을 사용해 여러 행을 결합합니다.

 

 

728x90

3. 실행 예시
 
 
Stored Procedure Results:
Column Names: ['EMP_ID', 'EMP_NAME', 'HIRE_DATE']
(1, 'John Doe', '2025-01-01')
(2, 'Jane Doe', '2025-01-02')
2) 데이터가 없는 경우 (대체 데이터 생성)
 
 
No data found. Generating fallback data...
Generated SQL Query:
SELECT 42 AS COLUMN1, 'RandomVal_753' AS COLUMN2, '2025-04-15' AS COLUMN3 FROM DUAL
UNION ALL SELECT 87, 'RandomVal_951', '2025-04-23' FROM DUAL
UNION ALL SELECT 13, 'RandomVal_678', '2025-04-08' FROM DUAL

Generated Fallback Data:
Column Names: ['COLUMN1', 'COLUMN2', 'COLUMN3']
(42, 'RandomVal_753', '2025-04-15')
(87, 'RandomVal_951', '2025-04-23')
(13, 'RandomVal_678', '2025-04-08')

4. 요약 및 활용 팁
이 코드는 저장 프로시저 호출과 동적 데이터 생성을 안정적으로 처리하는 방법을 보여줍니다. 아래는 실무에서 활용할 수 있는 팁입니다:
  • 에러 처리: try-except 블록을 활용해 DB 에러와 일반 에러를 구분해 처리하세요.
  • 컬럼 타입 확장: 실제 데이터 타입에 따라 더 정교한 랜덤 데이터 생성 로직을 추가할 수 있습니다.
  • 쿼리 최적화: 대량 데이터 처리 시 UNION ALL 대신 다른 방법을 검토하세요.
이 방법을 사용하면 저장 프로시저의 결과 유무에 상관없이 안정적으로 데이터를 처리할 수 있습니다. Oracle DB를 사용하는 개발자라면 이 코드를 참고해 프로젝트에 적용해보세요!

 
728x90