본문 바로가기
Coding

Pandas 데이터 프레임을 Snowflake로 업데이트하는 방법

by Jake Gyllenhaal 2022. 4. 16.
반응형

Pandas 데이터 프레임을 Snowflake로 업데이트하는 방법

 

파이프라인을 완전히 자동화하십시오.

Unsplash의 Aaron Burden 사진

upsert는 일반적으로 대량으로 테이블에 대한 삽입과 업데이트를 모두 처리하는 SQL 문입니다. 대상 테이블(업데이트 중인 테이블)과 원본 테이블 또는 파일(최신 정보가 포함된 테이블)이 필요합니다.

이 기사에서는 Python에서 Pandas DataFrame이 있을 때 Snowflake 테이블에 upsert를 수행하는 방법을 살펴보겠습니다. 그것은 모두 주위에서 작동합니다 MERGE 여기에서 추가 설명서를 찾을 수 있는 Transact-SQL의 명령입니다.

먼저 차근차근 쪼개고, 마지막에 모두 모을 것입니다. 코드를 읽고 적응을 추론하려는 경우 단계를 건너뛸 수 있습니다.

JSON 파일 만들기

가장 먼저 할 일은 DataFrame을 Snowflake가 준비할 수 있는 파일로 저장하는 것입니다. CSV는 분명히 이동 솔루션처럼 보이지만 JSON은 데이터 유형에 더 안전한 선택입니다. 문자열로 저장하는 모든 숫자(전화번호가 CSV를 포기한 주된 이유임)는 정수 또는 부동 소수점으로 끝날 것이며, 잠재적으로 과학적 표기법으로 쓰여질 것입니다. 이것은 이 집에서 완전히 필요한 마지막 것입니다.

모든 것이 원활하게 실행되도록 하려면 JSON 파일이 충족해야 하는 세 가지 조건이 있습니다.

  • 기록 지향적이어야 하며,
  • 레코드는 (목록이 아닌) 줄에 저장해야 합니다.
  • 날짜 유형은 초 단위로 저장해야 합니다(밀리초 또는 나노초와 반대).

이 모든 것을 만족시키기 위해 다음 코드 줄을 사용할 수 있습니다.

df.to_json(filename,orient='records',lines=True,date_unit='s')

이렇게 하면 다음과 같은 테이블의 아름다운 행별 JSON 레코드가 제공됩니다.

Snowflake에서 JSON 파일을 위한 단계 생성

스테이지는 테이블에 복사해야 하는 데이터가 포함된 파일의 임시 저장 위치입니다. 이상적으로는 JSON 파일을 upsert하려는 테이블과 동일한 데이터베이스 및 스키마에 생성하는 것이 좋습니다.

피팅 단계를 만들려면 Snowflake 워크시트에서 컨텍스트를 올바르게 설정하고 다음 줄을 실행하기만 하면 됩니다.

CREATE OR REPLACE STAGE my_stage_name FILE_FORMAT=(TYPE=’JSON’);

JSON 파일 준비

여기에는 터미널의 snowsql 세션에서 실행하거나 Snowflake 연결을 설정한 후 Python에서 실행할 수 있는 하나의 명령만 필요합니다(마지막에 코드 제공).

PUT file:///path/to/json_filename @my_stage_name OVERWRITE=TRUE;

병합 실행

준비된 파일을 테이블에 병합하는 것은 그다지 어렵지 않습니다. 단지 약간의 구문일 뿐입니다. 모든 열 앞에 $1: JSON 파일에서 "열"을 참조할 때.

열 A가 기본 키와 유사한 이 예를 따를 수 있습니다. 열 B와 C에는 변경될 수 있는 정보(예: "마지막 업데이트" 타임스탬프—이 때문에 병합이 열 A에서 일치하는 항목을 찾았을 때 업데이트하는 이유)가 포함되어 있습니다. ) 및 D 열에는 삽입할 가치가 있지만 업데이트할 수 없는 정보(예: 생년월일)가 포함되어 있습니다.

앞으로 있을 자동화에서는 A 열과 같은 항목을 "ID 열"이라고 하고 B 및 C와 같은 열을 "업데이트 열"이라고 합니다.

다음 함수의 코드는 다음을 수행합니다.

  • 당신의 DataFrame
  • 당신의 id_columns(기본 키를 구성하는 열 목록 - ID가 있는 경우 목록이 될 수 있음)
  • 당신의 insert_columns (일치하지 않는 행에 대해 삽입하려는 모든 열 목록)
  • 당신의 update_columns (업데이트할 수 있는 모든 열 목록 - 위의 예 참조)
  • 컨텍스트(데이터베이스, 스키마, 테이블 및 단계 이름)

그리고 위에서 언급한 모든 단계에서 스테이지 생성을 제외하고 실행합니다.

7~16행에서 Snowflake와의 연결을 열고, 21~23행 JSON 파일을 생성하고, 파일 28행을 준비하고, 29~36행 병합을 실행하고, 38행에서 파일을 제거하고, 로컬 파일을 삭제합니다. 40행은 스테이징 목적으로만 생성되었습니다. 마지막으로 43행에서 Snowflake 연결을 제대로 닫습니다. 4, 18, 41행은 함수가 실행될 때 약간의 통찰력을 제공합니다.

물론 연결을 설정하려면 Python용 Snowflake 커넥터가 필요합니다.

책임감있게 화를 내십시오!

반응형

댓글