728x90

스칼라 함수도 이 이름과 같은 함수를 제공하지만,
RDD 함수는 분산 데이터에 적용되며 실제 연산이 지연된다.


ex.

예시 파일 생성

 

예제 파일 분석

예제 파일의 각 줄을 쉼표로 구분하고, 이를 idsStr RDD 로 구성한 것이다.
이 결과는 Array.toString 메서드가 반환한 값을 출력한 것이다.

collect 행동 연산자 사용

( * collect 연산자는 새로운 배열을 생성하고, RDD의 모든 요소를 이 배열에 모아 스파크 셸로 반환한다. )

이는 문자가 아닌 문자열의 '배열' 이다. 이 배열의 배열을 '단일배열' 로 분해하려면,

즉 변환 연산자가 반환한 여러 배열의 모든 요소를 단일 배열로 통합하려는 상황에 flatMap 을 활용하면 좋다.

flatMap은 기본적으로 주어진 함수를 RDD의 모든 요소에 적용한다는 점에서 map 과 동일하다.
다른 점은 익명 함수가 반환한 배열의 중첩구조를 한 단계 제거하고 모든 배열의 요소를 단일 컬렉션으로 병합한다는 것이다.

flatMap 활용

flatMap 은 map 함수와 달리 1차원 배열을 반환했다.



- sample/take/takeSample

위의 연산자를 사용해 RDD의 요소를 일부 가져올 수 있다.


** sampel 메서드

- 첫번째 인자 : true/flase => 복원/비복원 샘플링
- 두번째 인자 : 각 요소가 샘플링 될 횟수의 기댓값.

- 생략된 세번째 인자 : 시드 (seed) . 인수를 제공하지 않으면 해당인자의 기본 값을 사용한다. 같은 시드는 항상 같은 유사 난수를 생성하기 때문에 프로그램을 테스트하는데 유용하다.


** takeSample
: sample 메서드의 두번째 인자가 기댓값(확률) 이 아닌 정확한 개수 (num) 으로 샘플링 할 때 사용.
val n = intIds.takeSample(false,5)




** take

지정된 개수의 요소들 모을 때 까지 RDD 의 파티션을 하나씩 처리해 결과를 반환한다.
(클러스터의 여러 노드에 저장된 데이터의 일부분)

take 메서드의 결과는 단일 머신에 전송되므로 인자에 너무 큰 수를 지정해서는 안된다.

728x90

^ spark_home 이 심볼릭 링크를 가리키도록 설정된 값 확인하기.

/usr/bin 폴더 -> 전체 시스템의 사용자 프로그램이 저장되는 기본 위치.

실제로 설치된 위치 보려면,

ls -al /경로/

심볼릭 링크 : 파일이나 폴더에 대한 일종의 참조. 심볼릭 링크를 사용하면 파일 시스템 내 서로 다른 두 위치에서 같은 파일에 접근 가능.
!= 파일/폴더의 복사본 아님.
심볼릭 링크가 폴더를 참조하면 마치 실제 타깃 폴더에 들어온 것처럼 심볼릭 링크의 폴더 내부를 탐색할 수 있음.
변경한 모든 내용은 타깃 폴더에 바로 적용되며, 심볼릭 링크에도 반영됨.
ex. vi 로 심볼릭 링크파일 편집 시 타깃 파일 편집한 것과 같음. 두 위치 모두에서 편집내용 확인 가능.

심볼릭 링크에서는 편하게 ex. 자바/ 하둡 형식으로 가능.


----

가상머신에 하둡 설치 시,
사전 준비없이 hadoop fs -ls 명령으로 HDFS 에 접속해 파일 시스템에 쿼리를 수행할 수 있는 것은 가상 머신이 부팅하면서
HDFS 데몬 프로세스를 자동으로 시작했기 때문이다.

start-dfs.sh 로 시작 가능.
---

스파크 커뮤니티는 2개월마다 새로운 버전을 릴리스 할 정도로 활발한데,
여러 버전의 스파크를 관리하고 현재 사용할 버전을 선택할 수 있는 방법 중 하나는

심볼릭 링크 활용 )

이를 사용하면 스파크 버전과 관계없이 모든 스파크 프로그램, 스크립트, 환경 설정 파일에서 스파크 설치 폴더를 참조할 때 항상
지정한 폴더를 사용할 수 있다.


(1) spark shell

스파크를 사용하는 방법은 두 가지다.
1 - 스파크 라이브러리.
: 스파크 api를 사용해 스칼라, 자바, 파이썬 독립 프로그램을 작성하는 것.
애플리케이션 코드를 작성하고 스파크에 제출해 실행한 후, 에플리케이션이 파일로 출력한 결과를 분석하는 과정을 거친다.
2 - 스파크의 스칼라 셸 / 파이썬 셸 사용
: 스파크 셸에서 작성한 프로그램은 셸을 종료하면 삭제되므로 주로 테스트/일회성 작업에 사용됨.

PATH에 스파크 bin 디렉토리를 등록해놓으면 spark-shell / pyspark 를 입력해도 스파크 셸을 실행할 수 있다.


(2) log4j

스파크의 오랜 이전 버전에는 콘솔에 INFO 로그가 상세하게 출력되었지만, 최신 버전은 이 문제를 개선했다. 하지만 일부 유용한 로그마저 출력에서 생략해 다소 불편하다.

스파크의 LOG4J 설정을 변경해 스파크 셸에는 오류 로그만 출력하고 나머지 로그들은 추후 문제 진단에 사용할 수 있도록
스파크 루트 폴더의 logs/info.log 파일에 저장해보자.
(log4j : 자바의 로깅 라이브러리.)


스파크 셸을 실행하면

출력한 내용 중

Spark context : 스파크 접속하고 세션 설정, 잡 실행관리, 파일 읽기/쓰기 작업 할 수 있다.
Spark session

스파크와 교신할 수 있는 일종의 창구이다.



간단 예제) 스파크 API 사용하여 파일 읽어들이고, 줄 개수 세어보기


맨 위의 경로( 루트 디렉토리) 에 테스트 용 파일을 만들어두고,
파일의 전체 줄 개수를 세고,
'dd' 가 등장한 줄만 포함하는 ddLines 컬렉션 새로 만들음.


RDD의 개념

위의 licLines, ddLines 는 마치 평범한 스칼라 컬렉션 같지만,
이는 RDD 라는 스파크 전용 분산 컬렉션이다.

RDD 는 데이터를 조작할 수 있는 다양한 변환 연산자를 제공하지만, 변환 연산자는 항상 새로운 RDD 객체를 생성한다.
즉 한 번 생성된 RDD 는 절대 바뀌지 않는 불변의 성질이 있다.

이는 분산 시스템에서 가장 중요한 장애 내성을 직관적인 방법으로 보장할 수 있다.

RDD 연산자는 크게 변환/행동 두 유형으로 나눈다.

변환 연산자 : RDD의 데이터를 조작해 새로운 RDD를 생성한다. (filter,map 함수)
행동 연산자 : 연산자를 호출한 프로그램으로 계산 결과를 반환하거나 RDD 요소에 특정 작업을 수행하려고 실제 계산을 시작하는 역할을 한다. (count, foreach)


1. map 변환 연산자

parallelize 메서드는 Seq를 받아
Seq 객체의 요소로 구성된 새로운 RDD 를 만든다.
(Seq : 스파크의 컬렉션 인터페이스. 이 인터페이스를 구현한 클래스에는 Array 나 List 등이 있다.)

이 Seq 객체의 요소는 여러 스파크 실행자로 분산된다.
parallelize 메서드는 makeRDD 라고 alias 로 호출할 수 있다.


map 함수로 RDD 타입 바꾸기

728x90

spark-in-action 공부를 위해 가상머신을 설치하던 중 , 해당 에러를 발견했다. 

내가 하고있던 작업은 

 

1. 가상 머신을 저장할 폴더를 생성 후, 

2. 책의 깃허브 저장소에서 JSON 형식의 파일을 내려받고,

3. 위의 vagrant box -add ~ 명령어를 사용해 가상머신을 내려받는 것. 

 

하지만 위의 캡쳐처럼 

bsdtar: Error opening archive: Unrecognized archive format 오류가 났다. 

 

처음엔 그저 vagrant 프로그램 설치 오류인 줄 알았는데 .. 

 

알고보니 github 에서 파일을 내려받는 부분이 잘못되었고, 그 때문에 vagrant 명령을 입력해도 해당 파일을 못찾는 오류였다.;;

 

 


github 에서 올바르게 파일 다운받기 

 

 

 

 위의 raw 버튼을 누르면, 

 

창이 뜨면 Ctrl + S 로 다운로드 후 

위치해야 할 경로에 넣어주면 제대로 다운로드가 된다. 

 

이후 위의 vagrant 명령어를 입력하면 문제 해결 완료 !

 

 

 

 

 

나와 같은 오류로 시간낭비 하는 사람이 없길 바라며 ,, ;;

 

728x90

1.  컴포넌트

종류 ) 스파크 코어, 스파크 SQL, 스파크 스트리밍, 스파크 GraphX, 스파크 MLlib

 

이 글에선 스파크 MLlib, GraphX 를 제외하고 나머지 컴포넌트에 대해 다룬다.


A. 스파크 코어 

스파크 job, 다른 스파크 컴포넌트에 필요한 기본 기능 제공. 

- 여기서 가장 중요한 개념은 RDD 인데, RDD : 분산 데이터 컬렉션 (Dataset) 을 추상화한 객체로,

데이터셋에 적용할 수 있는 연산 및 변환 메서드를 함께 제공한다.

 

RDD는 노드에 장애가 발생해도 데이터셋을 재구성 할 수있는 복원성을 갖추고있다. 

 

- 스파크 코어는 HDFS , GluterFD, 아마존 S3 등 다양한 파일 시스템에 연결할 수 있다.

  또, 공유변수/누적변수를 사용해 컴퓨팅 노드 간 정보를 공유할 수있다.

 

- 스파크 코어에는 네트워킹, 보안, 스케줄링 및 데이터 셔플링 등 기본 기능이 구현되어있다.

 

 

 

B.  스파크 SQL

스파크와 하이브SQL 이 지원하는 SQL을 사용해 대규모 분산 정형 데이터를 다룰 수 있는 기능을 제공한다. 

- JSON, Parquet, RDB테이블, Hive table 등 다양한 정형 데이터를 읽고 쓰는데도 사용할 수 있다.

(*Parquet : 데이터, 스키마를 함께 저장할 수 있는 파일 포맷. 최근 널리 사용된다고 한다. )

 

- 스파크 SQL 은 DataFrame, Dataset에 적용된 연산을 일정 시점에 RDD 연산으로 변환해 일반 스파크 잡으로 실행한다. 

 

- '카탈리스크' 라는 쿼리 최적화 프레임워크를 제공하며,

       사용자가 직접 정의한 최적화 규칙을 적용해 프레임워크 확장도 가능하다. 

 

- 외부시스템은 '아파치 Thrift 서버'를 통해 JDBC, ODBC 프로토콜을 이용하여 쿼리를 실행할 수 있다. 

 

 

 

C. 스파크 스트리밍 

다양한 데이터 소스에서 유입되는 실시간 스트리밍 데이터를 처리하는 프레임워크이다.

지원 스트리밍 소스

- HDFS, 아파치 카프카, 아파치 플립, 트위터, ZeroMQ, 커스텀 데이터 소스.

 

- 스트리밍 데이터를 처리할 때는 장애 복원성이 매우 중요한데, 

    스파크 스트리밍은 장애 발생 시 연산 결과를 자동으로 복구한다. 

 

- 이산 스트림 방식으로 데이터를 표현. 

   => 기징 마지막 타임 윈도 안에 유입된 데이터를 RDD로 구성해 주기적으로 생성한다.

 

- ver 2.0 에서 정형 스트리밍 API 를 사용해 일괄 처리 프로그램을 구현하는 것처럼 스트리밍 프로그램을 구현할 수 있다.

 


 

2. 스파크 프로그램의 실행 과정

 

만약 한 파일 (ex. 로그파일) 이 여러 노드로 구성된 HDFS 에 분산저장 되어있다면, 

(HDFS 는 파일의 크기를 자동으로 나눠 각 블록을 클러스터의 여러 노드에 나누어 저장한다.) 

 

 

가장 먼저 스파프 셸을 시작하고 스파크 클러스터에 연결한 후,

scala 명령을 입력해 HDFS 에 저장된 로그 파일을 메모리에 로드한다. 

 

 

이 때 스파크는 데이터 지역성을 만족하기 위해 로그 파일의 각 블록이 저장된 위치를 하둡에게 요청하고, 

모든 블록을 클러스터 노드의 RAM 메모리로 전송한다. 

(* 대량의 데이터를 네트워크로 전송해야 하는 상황을 만들지 않기 위해 데이터 지역성이 필요하다.)

 

 

데이터 전송이 완료되면 스파크 shell 에서 RAM 에 저장된 각 블록 (파티션) 을 참조할 수 있다. 

(

이 파티션의 집합 => RDD 가 참조하는 분산 컬렉션. 

즉 , RDD 를 사용하면 비-분산 로컬 컬렉션을 처리하는 것과 같은 방식으로 대규모 분산 컬렉션을 다룰 수 있다. 

사용자는 컬렉션이 여러 클러스터 노드에 분산 저장된다는 사실을 굳이 알 필요가 없고, 노드 장애에 따로 대비할 필요도 없다. 

 

 

스파크는 RDD의 컬렉션에 함수형 프로그래밍을 사용할 수 있는 정교한 API 를 제공한다.

이를 통해 (1) RDD의 컬렉션 필터링하거나 사용자정의함수로 컬렉션 매핑하고,

               (2) 누적 값 하나로 리듀스하고,

               (3) 두 RDD 를 서로 빼거나 교차하거나 결합하는 등

다양한 작업 실행할 수 있다. 

)

 

 

컬렉션을 변수를 주어 필터링하면 RDD 에는 분석에 필요한 데이터만 포함된다. 

cache 함수를 호출 해 추후 다른 잡을 수행할 때도 RDD 가 메모리에 계속 유지되도록 지정할 수있다. 

728x90

맵리듀스 : 하둡 에코시스템의 기본적인 프로그램의 패러다임. 스팍의 동작방식 동일하다. 

word count ex) n개의 단어가 block 단위로 나뉨. 

 

 

Spark : 기존의 맵리듀스 프로그램보다 더 빠르고 다양하게 사용할 수 있게 만든 것. 

컴퓨터 클러스터 벙렬 데이터 프로세싱을 하는 모듈 라이브러리 집합/ 통합된 컴퓨팅 엔진.



spark 은 기본적으로 scala 언어로 개발이 되어있고 function 언어의 장점을 가짐. 코드량도 간결하고 

데이터 프레임이 더 추상화 되어있어 개발의 생산성 측면에서 good!

 

데이터 분석도 가능하고, 머신러닝, 그래프 분석, 실시간 스트리밍 데이터 처리 모두 가능함. 

 

동작 방식 : 멀티노드의 구성.


 

할당받은 executors 에서 실제로 본인이 작성한 코드로 돌아가는 구조. 

 

 


 

 

 

 


spark : when not to use

spark 자체가 컴퓨팅을 하는 프레임워크이기 때문에 기존에 있던 DB solution을 대체 하는것도 아니다.

다른 기술과 결합해서 사용했을 때 좀 더 프로세싱 할 수 있는 능력이 좋아지는 것이다. 

데이터를 읽는 구조 자체가 shared storage 에서 읽어들이는 것이기 때문에 DB 만큼의 성능은 안나온다.

 

메모리 이슈는 계속 발생 중이다! 

 

 


 

 

 

+ Recent posts