728x90

NiFi 2.0에서 이전 버전에서 사용되었던 템플릿 기능이 제거되었습니다.

 

하지만, NiFi 2.0에서는 템플릿 기능을 대체할 수 있는 방법이 있습니다.

 

 

 

- Flow Definition Export

NiFi 2.0에서는 템플릿 기능을 대체하기 위해 데이터 흐름을 JSON 형식으로 내보내고 가져오는 기능이 있습니다.

 

 

 

 

1. Flow Export (내보내기)

  • NiFi UI에서 원하는 프로세스 그룹을 선택합니다.
  • 메뉴에서 "Download flow definition"  옵션을 사용해 흐름을 JSON 파일로 내보낼 수 있습니다.

 

 

 

import 할 그룹을 우클릭 → Download flow definition → with external services

 

 

  • With external services를 사용하는 경우:
    • 데이터베이스 연결, HDFS 설정, 또는 다른 컨트롤러 서비스를 포함한 복잡한 환경을 다른 NiFi 인스턴스에 그대로 옮기고자 할 때 유용합니다.
  • With external services를 사용하지 않는 경우:
    • 기본적인 데이터 흐름만을 공유하고, 외부 서비스는 현지 환경에 맞춰 설정하려는 경우에 유용합니다. 예를 들어, 다른 데이터베이스나 시스템을 사용하는 환경으로 흐름을 이식할 때 이 옵션을 해제할 수 있습니다.

 

 

 

 

2. Flow Import (가져오기)

 

 

 

Add Process group → Browse → 다운받았던 Json 파일 선택

 

 

 

 

 

 

 

이렇게 하면 손쉽게 import 기능 사용 가능!

 

 

+ 이전 버전에서도 있던 기능이기에 버전 상관없이 사용할 수 있습니다!

다만 모든 프로세서가 동작하는지는 확인이 필요할 듯 합니다.

 

728x90

 

들어가기에 앞서, Nifi 2.0 버전에서는 Hive3QL 만 지원됩니다. 

기존 ~HiveQL 은 주로 Hive 1.x 및 2.x를 지원하는데 사용되었지만, Hive3QLHive 3.x 버전만 지원합니다.

 

 

 

 

1. Hive Connection Pool ( Hive3ConnectionPool 2.0.0-M2 )

설정값 입력

  • Database Connection URL: jdbc:hive2://<hive-server-host>:<hive-server-port>/<database-name>
    • 예: jdbc:hive2://hive-server:10000/default
      ( 저는 DB 명 생략했습니다. )
  • Database Driver Class Name: org.apache.hive.jdbc.HiveDriver
  • Database Driver Location(s): file:///path/to/hive-jdbc-driver.jar
    • Hive JDBC 드라이버 파일의 경로를 입력합니다.
  • Database User: Hive 데이터베이스 사용자 이름
  • Password: Hive 데이터베이스 사용자의 비밀번호
  • Max Wait Time: 500 millis
  • Max Total Connections: 8
  • Validation Query: SELECT 1

 

 

 

 

2. Select 쿼리 실행 ( SelectHive3QL 2.0.0-M2 )

  • Select 쿼리는 SelectHive3QL 에서 바로 실행 가능합니다. 

 

 

 

 

 

 

3. Alter, Insert ... 쿼리 실행 ( PutHive3QL 2.0.0-M2 )

 

 

  • PutHive3QL  내 Properties 에는 직접 쿼리를 입력하는 칸이 존재하지 않으므로,
    쿼리를 FlowFile의 내용 또는 속성으로 제공하는 방법을 사용할 수 있습니다.
  • 저는 GenerateFlow 과 같은 타 프로세서를 통해 실행했습니다.
  • LogAttribute 는 실행 완료 확인용입니다. (선택)

 

(1) GenerateFlowFile

실행할 쿼리 입력

 

(2) PutHive3QL

입력받은 쿼리를 Hive 통해 실행

 

728x90

1.  목적

CSV 데이터를 가져와서, DB type 변형없이 Database 에 적재하기 위함. 

 

2. 프로세스

적재 중 CSV 내의 timestamp 와 같은 컬럼의 데이터를 DB로 적재 시 type 을 못 읽는 오류가 발생하는데, 

(ERROR: column "column2" is of type timestamp without time zone but expression is of type character varying
  Hint: You will need to rewrite or cast the expression.)

위의 해결을 위해선 두 가지 방법이 존재함. 

 

(1) tmp 임시테이블 생성 → column type 모두 varchar   →  csv 데이터 그대로 적재  →  본 테이블에 쿼리로 캐스팅하여 적재 ( insert into test select cast(reg_dtm as timestamp) from test_tm,p ) → 임시테이블 truncate

(2) nifi 상에서 프로세서를 통해 type 을 변경하여 본테이블에 바로 적재

 

이 글에선 (2) 의 방법을 기술함. 

 

nifi ver.2.0

 

 

3. 상세 설명

 

(1) GetFile

 

Input directory 밑의 경로에 있는 File Filter 의 파일을 찾아서 가져오기. 

(t_emal~ 로 시작하는 csv 형식의 파일을 가져온다.)

 

 

 

 

(2) ConvertRecord
    : 다양한 입력 데이터 포맷을 다른 포맷으로 변환하는 기능을 제공

 

 

CSV 데이터를 JSON으로 변환하기. 

 

다음단계에서 DB 에 insert 할 때 type 지정해줄 것이기 때문에

Reader, Writer 모두 default 그대로 사용. 

 

 

 

 

(3) PutDatabaseRecord

 

DB Type, DB Connection pool, schema, table name 등 입력.

 

 

JsonRecordSetWriter 에서 timestamp 형식을 지정해준다. 

 

필자는 timestamp 데이터가 밀리세컨드단위까지 있어서 ss.SSS 까지 지정해줬다.

 

 

 

 


 

 

더 많은 방법이 있겠지만

최대한 간단히 적은 프로세서를 사용해서 만들어봤다.

728x90

Output Grouping (One Line Per Object / Array )

결과 확인

 

 

(1) Array

[] 배열안에 모든 값 넣기!

행 나눴을 때, 총 1개의 Key/Value 값 생성.

 

 

(2) One Line Per Object

한 키값 쌍마다 줄바꿈해서 변환

 

행 나눴을 때, 총 4개의 Key/Value 값 생성. 

728x90
  • 모든 프로세서는, 각 프로세서에 맞는 시작/종료 수행 결과를 입력해야 한다.

 

 

1. MergeContent , LogAttribute

- MergeContent 

MergeContent 

 

 

 

- LogAttribute

+ Recent posts