pypapago 개발기

TL;DR

아래 내용을 통해 개발한 pypapago 는 현재 pypi에 올라가 있어 아래 명령어로 설치해 바로 사용할 수 있습니다.

1
pip install -U pypapago

2019.07.09일자 기준 최신버전은 0.1.1.1 입니다.

Github Repo: https://github.com/Beomi/pypapago

뉴스 댓글 분석 프로젝트[0]: 프로젝트를 시작하며

온라인 뉴스 댓글은 정말 사람들의 목소리일까?

“여기 오신 모든 분들 손을 들어주세요.” (모두 손을 든다)

“나는 인터넷 뉴스를 보지 않는다, 하시는 분 손 내려주세요.” (조금 손을 내린다)

“나는 인터넷 뉴스를 보고 댓글은 보지 않는다, 하시는 분 손 내려주세요.” (좀 더 손을 내린다)

“아직도 반이 넘는 많은 분들이 손을 들어주시고 계시네요. 이제 손 내려주셔도 됩니다. 감사합니다.”

– 2018 PyConKR 발표를 시작하며

GPU EC2 스팟 인스턴스에 Cuda/cuDNN와 Tensorflow/PyTorch/Jupyter Notebook 세팅하기

들어가며

Tensorflow나 PyTorch등을 사용하며 딥러닝 모델을 만들고 학습을 시킬 때 GPU를 사용하면 CPU만 사용하는 것에 비해 몇배~몇십배에 달하는 속도향상을 얻을 수 있다는 것은 누구나 알고 있습니다.

그래서 비싼 GPU를 사용하고 낯선 리눅스 환경을 이용하기도 합니다. 하지만 실제로 GPU, 특히 Cuda를 이용한 GPU가속을 세팅하고 cuDNN등을 통해 각 머신러닝 라이브러리에서 속도를 향상시키려고 할 때는 항상 무언가 문제가 발생합니다. 물론 Floydhub혹은 AWS SageMaker와 같이 이미 GPU 가속 환경이 마련되어있는 경우는 필요가 없지만, GPU 인스턴스의 시간당 요금 자체가 상당히 높습니다.

k80 GPU를 제공하는 경우 시간당 약 1~2달러의 비용이 발생합니다.

조금이라도 저렴하게 GPU를 사용하고, 한번 설정된 GPU 인스턴스를 그대로 유지하기 위해 스팟 인스턴스를 사용해 봅시다.

Django: Truncated or oversized response headers received from daemon process 에러 해결법

문제 발생 환경

  • OS: Ubuntu 16.04 LTS
  • Python 3.5.2
  • Django 2.0.2
  • Apache HTTPd 2.4
  • numpy / Pandas / pymssql 등 사용중

문제의 발생

장고 배포를 마친 뒤 배포 서버에 접속시 화면이 뜨지 않고 500에러가 났던 상황.

1
Timeout when reading response headers from daemon process 'djangoproject': /home/ubuntu/djangoproject/djangoproject/wsgi.py

에러 로그로 살펴보면 위와 같이 “Timeout when reading response headers from daemon process”이라는 문제가 발생했다.

PySpark: 손상된 parquet파일 무시하기

문제

PySpark를 이용해 파일을 읽어와 DataFrame 객체로 만드는 경우 읽어오는 파일이 parquet 파일이라면 이 파일이 어떤 형식으로 되어있는지(어떤 Column/Type으로 이루어져있는지)에 대한 정보를 필요로 합니다.

보통 parquet파일에 이 파일에 대한 스키마가 저장되어있어 파일을 읽고 쓰는데 지장이 없습니다. 하지만 간혹 parquet파일이 깨져있는 경우가 있습니다.

user mode로 설치한 pip 패키지 PATH에 등록하기

이번 글은 macOS 기준입니다.

pip 유저모드?

파이썬 패키지 매니저인 pip를 사용할 때 종종 이용하는 옵션이 --user, 즉 사용자 디렉토리에 패키지 패키지를 설치하는 방법을 통해 sudo처럼 권한 상승 없이 패키지들을 설치해 사용할 수 있습니다.

이때 차이가 나는 부분은 저 패키지들이 어떤 디렉토리(폴더)에 설치되는지입니다.

Direct S3 Upload with Lambda

들어가며

이전 글인 AWS Lambda에 Tensorflow/Keras 배포하기에서 Lambda 함수가 실행되는 트리거는 s3버킷에 파일이 생성되는 것이었습니다.

물론 파일을 올릴 수 있는 방법은 여러가지가 있습니다. 아주 단순하게 POST 폼 전송 요청을 받고 boto3등을 이용해 서버에서 s3으로 파일을 전송할 수도 있고, 더 단순하게는 AWS s3 콘솔을 이용해 파일을 올리라고 할 수도 있습니다.

하지만 이런 부분에는 약간의 단점이 함께 있습니다.

첫 번째 방법처럼 파일을 수신해 다시 s3에 올린다면 그 과정에서 서버 한대가 상시로 켜져있어야 하고 전송되는 속도 역시 서버에 의해 제약을 받습니다. 한편 두 번째 방법은 가장 단순하지만 제3자에게서 파일을 받기 위해서 AWS 계정(비록 제한된 권한이 있다 하더라도)을 제공한다는 것 자체가 문제가 됩니다.

따라서 이번 글에서는 사용자의 브라우저에서 바로 s3으로 POST 요청을 전송할 수 있도록 만드는 과정을 다룹니다.

시나리오

사용자는 아주 일반적인 Form 하나를 보게 됩니다. 여기에서 드래그-드롭 혹은 파일 선택을 이용해 일반적으로 파일을 올리게 됩니다. 물론 이 때 올라가는 주소는 AWS S3의 주소가 됩니다.

하지만 이게 바로 이뤄진다면 문제가 생길 소지가 많습니다. 아무나 s3 버킷에 파일을 올린다면 악의적인 파일이 올라올 수도 있고, 기존의 파일을 덮어쓰기하게 될 수도 있기 때문입니다.

따라서 중간에 s3에 post 요청을 할 수 있도록 인증(signing)해주는 서버가 필요합니다. 다만 이 때 서버는 요청별로 응답을 해주면 되기 때문에 AWS Lambda를 이용해 제공할 수 있습니다.

따라서 다음과 같은 형태로 진행이 됩니다.

전체 처리 과정 모식도

S3에 POST 요청을 하기 전 Signing 서버에 업로드하는 파일 정보와 위치등을 보낸 뒤, Lambda에서 해당 POST 요청에 대한 인증 정보가 들어간 header를 반환하면 그 헤더 정보를 담아 실제 S3에 POST 요청을 하는 방식입니다.

만약 Signing하는 과정 없이 업로드가 이뤄진다면 s3 버킷을 누구나 쓸 수 있는 Public Bucket으로 만들어야 하는 위험성이 있습니다. 하지만 이와 같이 제한적 권한을 가진 iam 계정을 생성하고 Signing하는 과정을 거친다면 조금 더 안전하게 사용할 수 있습니다.

Note: 이번 글에서는 API Gateway + Lambda 조합으로 Signing서버를 구성하기 때문에 만약 추가적인 인증 과정을 붙인다면 API Gateway단에서 이뤄지는 것이 좋습니다.

버킷 만들기 & 권한 설정

새로운 버킷은 https://s3.console.aws.amazon.com/s3/home에서 추가할 수 있습니다.

새로운 버킷을 하나 만들어주세요. 이번 글에서는 s3-signature-dev-py3라는 이름으로 만들어 진행해 보았습니다.

버킷에 GET/POST 요청을 하기 위해 CORS 설정을 해줘야 합니다.(localhost:8000와 같은 제 3의 URL에서 s3 버킷의 주소로 POST 요청을 날리기 위해서는 CORS 설정이 필수입니다.)

아래 스크린샷과 같이 CORS 설정을 진행해 주세요.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<CORSConfiguration>
<CORSRule>
<AllowedOrigin>*</AllowedOrigin>
<AllowedMethod>GET</AllowedMethod>
<MaxAgeSeconds>3000</MaxAgeSeconds>
<AllowedHeader>Authorization</AllowedHeader>
</CORSRule>
<CORSRule>
<AllowedOrigin>*</AllowedOrigin>
<AllowedMethod>POST</AllowedMethod>
<MaxAgeSeconds>3000</MaxAgeSeconds>
<AllowedHeader>Authorization</AllowedHeader>
</CORSRule>
</CORSConfiguration>

이처럼 구성해주면 모든 도메인(*)에서 요청한 GET/POST 요청을 정상적인 크로스-도메인 요청으로 받아들입니다.

Note: 만약 여러분이 여러분의 프론트 서비스에서만 이 요청을 허용하려면 AllowedOrigin부분을 여러분이 사용하는 프론트 서비스의 도메인으로 변경해주세요.

이제 s3을 사용할 준비는 마쳤습니다.

버킷 액세스 iam 계정 만들기

새로운 iam 계정은 https://console.aws.amazon.com/iam/home?region=ap-northeast-2#/users$new?step=details에서 만들 수 있습니다.

다음으로는 앞서 만들어준 버킷에 액세스를 할 수 있는 iam 계정을 만들어야 합니다. 이번에 사용할 유저 이름도 s3-signature-dev-py3로 만들어 줍시다. 아래 스크린샷처럼 Programmatic access를 위한 사용자를 만들어 줍시다.

우리는 버킷내 uploads폴더에 파일을 ‘업로드만 가능’한, PutObjectPutObjectAcl이라는 아주 제한적인 권한을 가진 계정을 만들어 줄 것이기 때문에 다음과 같이 Create Policy를 눌러 json 기반으로 계정 정책을 새로 생성해 줍시다.

새 창이 뜨면 아래와 같이 arn:aws:s3:::s3-signature-dev-py3/uploads/* 리소스에 대해 PutObjectPutObjectAcl에 대해 Allow를 해 주는 json을 입력하고 저장해줍시다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "s3UploadsGrant",
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:PutObjectAcl"
],
"Resource": [
"arn:aws:s3:::s3-signature-dev-py3/uploads/*"
]
}
]
}

이제 policy의 name을 입력하고 저장해줍시다.

저장해주고 창을 끈 뒤 이전 페이지로 돌아와 Refresh를 누르면 다음과 같이 앞서 만들어준 Policy가 나오는 것을 볼 수 있습니다. 체크박스에 체크를 누른 뒤 다음을 눌러주세요.

이제 마지막 확인을 눌러주세요.

확인을 누르면 다음과 같이 Access key ID와 Secret access key가 나옵니다. 이 키는 지금만 볼 수 있으니 csv로 받아두거나 따로 기록해 두세요. 그리고 글 아래부분에서 이 키를 사용하게 됩니다.

Signing Lambda 함수 만들기

이제 POST 요청을 받아 인증을 해줄 AWS Lambda함수를 만들어 줍시다.

아래 코드를 받아 AWS Lambda 새 함수를 만들어주세요. (역시 s3-signature-dev-py3라는 이름으로 만들었습니다.)

Github Gist: index.py

이번 함수는 python3의 내장함수만을 이용하기 때문에 따로 zip으로 만들 필요없이 AWS 콘솔 상에서 인라인 코드 편집으로 함수를 생성하는 것이 가능합니다.

아래 스크린샷처럼 lambda_function.py 파일을 위의 gist 코드로 덮어씌워주세요. 그리고 Handler부분을 lambda_function.index로 바꿔 index함수를 실행하도록 만들어 주세요. 그리고 저장을 눌러야 입력한 코드가 저장됩니다.

코드를 조금 뜯어보면 아래와 같이 ACCESS_KEYSECRET_KEY를 저장하는 부분이 있습니다.

1
2
ACCESS_KEY = os.environ.get('ACCESS_KEY')
SECRET_KEY = os.environ.get('SECRET_KEY')

AWS Lambda에서 함수를 실행할 때 아래 환경변수를 가져와 s3 버킷에 액세스하기 때문에 위 두개 값을 아래 스크린샷처럼 채워줍시다. 입력을 마치고 저장을 눌러주면 환경변수가 저장됩니다.

Note: 각 키의 값은 앞서 iam 계정 생성시 만든 값을 넣어주세요!

API Gateway 연결하기

API Gateway는 https://ap-northeast-2.console.aws.amazon.com/apigateway/home?region=ap-northeast-2#/apis/create에서 만들 수 있습니다.

이렇게 만들어 준 AWS Lambda 함수는 각각은 아직 외부에서 결과값을 받아올 수 있는 형태가 아닙니다. 람다 함수를 트리거해주고 결과값을 받아오기 위해서는 AWS API Gateway를 통해 웹 URL로 오는 요청에 따라서 람다 함수가 실행되도록 구성해야 합니다. 또한, CORS역시 활성화 해줘야 합니다.

API Gateway 만들고 Lambda와 연결하기

Resources에서는 Api URL의 하위 URL와 root URL에 대해 각각 메소드들을 정의해 사용할 수 있습니다. 우리는 요청을 받을 때 POST 방식으로 요청을 받아 처리해줄 것이랍니다.

여기서 새 메소드 중 POST를 선택해 줍시다.

메소드에 Lambda 함수를 연결해 주기 위해 다음과 같이 Lambda Function을 선택하고 Proxy는 체크 해제한 뒤, Regionap-northeast-2(서울)리전을 선택하고, 아까 만들어준 함수 이름을 입력한 뒤 Save를 눌러줍시다.

Tip: Lambda Proxy를 활성화 시킬 경우 HTTP 요청이 그대로 들어오는 대신, AWS에서 제공하는 event 객체가 대신 Lambda함수로 넘어가게 됩니다. 우리는 HTTP 요청을 받아 Signing해주는 과정에서 Header와 Body를 유지해야하기 때문에 Proxy를 사용하지 않습니다.

Save를 누르면 다음과 같이 API Gateway에 Lambda함수를 실행할 권한을 연결할지 묻는 창이 뜹니다. 가볍게 OK를 눌러줍시다.

연결이 완료되면 API Gateway가 아래 사진처럼 Lambda 함수와 연결 된 것을 볼 수 있습니다.

CORS 활성화하기

조금만 더 설정을 해주면 API Gateway를 배포할 수 있게 됩니다. 지금 해줘야 하는 작업이 바로 CORS 설정인데요, 우리가 나중에 만들 프론트 페이지의 URL와 s3의 URL이 다르기 때문에 브라우저에서는 보안의 이유로 origin이 다른 리소스들에 대해 접근을 제한합니다. 따라서 CORS를 활성화 해 타 URL(프론트 URL)에서도 요청을 할 수 있도록 설정해줘야 합니다.

Actions에서 Enable CORS를 눌러주세요.

다음과 같이 Access-Control-Allow-Headers의 값을 '*'로 설정한 뒤 Enable CORS 버튼을 눌러 저장해주세요.

다시한번 Confirm을 눌러주시면…

CORS가 활성화되고 Options 메소드가 새로 생기게 됩니다.

이제 API Gateway를 ‘배포’해야 실제로 사용할 수 있습니다.

API Gateway 배포하기

API Gateway의 설정을 모두 마치고나서는 배포를 진행해야 합니다. 아래와 같이 Actions에서 Deploy API를 눌러주세요.

API Gateway는 Deployment Stage를 필요로 합니다. Stage namelive로 설정하고 Deploy를 눌러줍시다.

Tip: Deployment Stage는 API Gateway의 URL 뒤 /stagename의 형식으로 추가 URL을 지정해줍니다. 이를 통해 API를 개발 버전과 실 서비스 버전을 분리해 제공할 수 있습니다.

배포가 완료되면 아래와 같이 API Gateway를 사용할 수 있는 URL을 받을 수 있습니다.

이번에는 https://9n2qae2nak.execute-api.ap-northeast-2.amazonaws.com/live가 Signing Lambda 함수를 실행할 수 있는 API Gateway URL이 됩니다.

파일 업로드 프론트 만들기

이제 파일을 업로드할 form이 있는 Static 웹 사이트를 만들어봅시다.

이번 글에서는 이미 만들어진 파일 업로더인 VanillaJS용 Fine Uploader를 이용해 최소한의 업로드만 구현합니다.

React용 React Fine Uploader와 Vue용 Vue Fine Uploader도 있습니다.

https://github.com/Beomi/s3-direct-uploader-demo 깃헙 레포를 clone받아 app.js를 열어 아래 목록을 수정해주세요.

  • request/endpoint: 여러분이 사용할 s3 버킷 이름 + .s3.amazonaws.com
  • request/accessKey: 앞서 만든 iam 계정의 Access Key
  • objectProperties/region: s3 버킷의 리전
  • objectProperties/key/prefixPath: s3 버킷 내 올릴 폴더 이름(putObject 권한을 부여한 폴더)
  • signature/endpoint: 앞서 만든 AWS Lambda의 API Gateway URL
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
var uploader = new qq.s3.FineUploader({
debug: false, // defaults to false
element: document.getElementById('fine-uploader'),
request: {
// S3 Bucket URL
endpoint: 'https://s3-signature-dev-py3.s3.amazonaws.com',
// iam ACCESS KEY
accessKey: 'AKIAIHUAMKBO27EZQ6RA'
},
objectProperties: {
region: 'ap-northeast-2',
key(fileId) {
var prefixPath = 'uploads'
var filename = this.getName(fileId)
return prefixPath + '/' + filename
}
},
signature: {
// version
version: 4,
// AWS API Gate URL
endpoint: 'https://9n2qae2nak.execute-api.ap-northeast-2.amazonaws.com/live'
},
retry: {
enableAuto: true // defaults to false
}
});

그리고나서 index.html 파일을 열어보시면 아래 사진과 같은 업로더가 나오게 됩니다.

DEMO: https://beomi.github.io/s3-direct-uploader-demo/

맺으며

이제 여러분은 Serverless하게 파일을 s3에 업로드 할 수 있게 됩니다. 권한 관리와 같은 부분은 API Gateway에 접근 가능한 부분에 제약을 걸어 업로드에 제한을 걸어 줄 수도 있습니다.

ec2등을 사용하지 않고도 간단한 signing만 갖춰 s3에 파일을 안전하게 업로드 하는 방식으로 전체 프로세스를 조금씩 Serverless한 구조로 바꾸는 예시였습니다.

Reference

AWS Lambda에 Tensorflow/Keras 배포하기

Update @ 20190306: amazonlinux:latest 버전이 2버전이 latest로 변경됨에 따라 아래 코드를 amazonlinux:1로 변경

이번 글은 macOS을 기반으로 작성되었지만, docker 명령어를 사용할 수 있는 모든 플랫폼(윈도우/맥/리눅스)에서 따라올 수 있습니다.

들어가며

여러분이 이미지를 받아 텐서플로로 분류를 해 준 뒤 결과를 반환해주는 작업을 하는 모델을 만들었다고 가정해봅시다. 이때 가장 빠르고 간단하게 결과를 내는(Inference/추론을 하는)방법은 Cuda가속을 할 수 있는 고급 시스템 위에서 텐서플로 모델을 이용해 결과를 반환하도록 서비스를 구현할 수 있습니다. 혹은 조금 느리더라도 CPU를 사용하는 EC2와 같은 VM위에서 텐서플로 코드를 동작하게 만들 수도 있습니다.

하지만 만약 여러분이 처리해야하는 이미지가 몇장이 아니라 수십, 수백장을 넘어 수천장을 처리해야한다면 어떻게 될까요?

한 EC2 위에서 서비스를 제공하는 상황에서는 이런 경우라면 for문처럼 이미지 하나하나를 돌며 추론을 한다면 전체 이미지의 결과를 내려면 한참 시간이 걸리게 됩니다. 따라서 일종의 병렬 처리를 생각해 보아야 합니다.

즉, 이미지를 순서대로 하나씩 추론하는 대신, 추론하는 코어 함수만을 빼고 결과를 반환하도록 만들어 주면 됩니다.

물론 병렬 처리를 위해서 여러가지 방법들이 있습니다. 여러개의 GPU를 사용하고 있다면 각 GPU별로 작업을 진행하도록 할 수도 있고, 혹은 EC2를 여러개 띄워서 작업을 분산해 진행할 수도 있습니다. 하지만 이 방법보다 조금 더(혹은 상당히 많이) 빠르게 결과를 얻어낼 수 있는 방법이 있습니다.

바로 AWS Lambda를 이용하는 방법입니다.

AWS Lambda는 현재 각 실행별 최대 3GB메모리와 5분의 실행시간 내에서 원하는 코드를 실행해 한 리전에서 동시에 최대 1000개까지 병렬로 실행할 수 있습니다.

즉, 우리가 1만개의 이미지를 처리해야 한다면 한 리전에서만 1000개를 동시에 진행해 10개를 처리하는 시간 내 모든 작업을 마칠 수 있다는 것이죠. 그리고 작업이 끝나고 서버가 자동으로 끝나기 때문에 동작하지 않는 시간에도 돈을 내는 EC2와는 가격차이가 많이 나게됩니다.

이번 가이드는 다음과 같은 시나리오로 작성했습니다.

  • 시나리오
    1. s3의 어떤 버킷의 특정한 폴더에 ‘이미지 파일’을 올린다.
    2. 이미지 파일이 ‘생성’될 때 AWS Lambda 함수가 실행이 트리거된다.
    3. (Lambda) 모델을 s3에서 다운받아 Tensorflow로 읽는다.
    4. (Lambda) 함수가 트리거 될때 발생한 event 객체를 받아 s3에 업로드된 파일의 정보(버킷, 버킷내 파일의 경로)를 가져온다.
    5. (Lambda) s3에 업로드된 이미지 파일을 boto3을 통해 가져와 Tensorflow로 Inference를 진행한다.
    6. (Lambda) Inference가 끝난 결과물을 s3에 저장하거나 결과값을 DynamoDB에 저장하거나 혹은 API Gateway를 통해 json으로 반환한다.

위와같이 진행할 경우 s3에 파일 업로드를 1개를 하든, 1000개를 하든 업로드 자체에 필요한 시간을 제외하면 실행 시간 자체는 동일하게 유지할 수 있습니다.

(마치 시간복잡도가 O(1)인 척 할 수 있는 것이죠!)

환경 준비하기

오늘 사용한 예제는 Github tf-keras-on-lambda Repo에서 확인할 수 있습니다.

AWS Lambda는 아마존에서 RedHat계열의 OS를 새로 만든 Amazon Linux위에서 동작합니다. 그렇기 때문에 만약 우리가 C의존적인 라이브러리를 사용해야한다면 우선 Amazon Linux에 맞게 pip로 설치를 해줘야 합니다. 그리고 간혹 빌드가 필요한 패키지의 경우 사용하고자 하는 OS에 맞춰 빌드작업 역시 진행해야 합니다. Tensorflow 역시 C의존적인 패키지이기 때문에 OS에 맞는 버전을 받아줘야 합니다. 우리가 사용하는 OS는 macOS혹은 windows이기 때문에 docker를 통해 Amazon Linux를 받아 그 안에서 빌드를 진행합니다.

도커를 사용하고있다면 그대로 진행해주시면 되고, 도커를 설치하지 않으셨다면 우선 도커를 먼저 설치해주세요.

도커는 Docker Community Edition Download Page에서 받으실 수 있습니다.

도커를 받아주세요

여러분이 다음부분을 진행하기 전, docker라는 명령어를 터미널 혹은 cmd상에서 입력시 도커가 실행되어야 합니다. 도커가 실행된다면, 우선은 실행할 준비를 마친 것이랍니다.

물론 여러분 각자의 모델파일이 있어야 합니다. 이번 가이드에서는 Keras 예시 중 Pre-trained squeezenet을 이용한 Image Classification(imagenet)으로 진행합니다.

결과 저장용 DynamoDB 만들기(Optional)

우리가 predict를 진행하고 나서 나온 결과물을 어딘가에 저장해둬야 합니다. AWS에는 DynamoDB라는 간단한 NoSQL DB가 있으니, 이걸 이용해 결과물을 저장해 봅시다.

우선 DynamoDB 메뉴에서 아래와 같이 새 테이블 하나를 만들어 줍시다.

기본키 정도만 문자열 필드 filename을 만들고 테이블을 생성해 줍시다.

이제 기본키만 지키면 나머지 필드는 자유롭게 올릴 수 있습니다. (물론 기본키와 정렬키만 인덱스가 걸리기 때문에, 빠른 속도가 필요하다면 인덱스를 건 뒤 데이터를 추가해줘야 합니다.)

squeezenet ImageNet 모델 s3에 올리기

이번 글에서는 squeezenet Imagenet 모델을 이용해 predict를 진행합니다. squeezenet_weights_tf_dim_ordering_tf_kernels.h5파일이 필요한데, AWS Lambda에서 비용이 들지 않으며 빠른 속도로 모델을 받아오기 위해서는 같은 리전의 s3에 파일을 올려둬야 합니다.

keras-blog 버킷에 올린 모델 파일

이번 글에서는 keras-blog라는 s3 버킷의 squeezenet 폴더에 파일을 올려두었습니다.

각 파일은 아래 링크에서 받을 수 있습니다. 아래 두 파일을 받아 s3 버킷에 올려주세요.

squeezenet.py 만들기

Keras의 squeezenet은 squeezenet.py을 참조합니다. 하지만 이 파일에는 Pre-Trained Model의 경로를 바꿔주기 때문에 이 부분을 약간 수정한 커스텀 squeezenet.py를 만들어주었습니다.

이 파일을 다운받아 여러분의 index.py 옆에 놓아두세요.

도커 + Amazon Linux로 빌드 준비하기

이제 docker라는 명령어로 도커를 사용해봅시다.

우선 여러분이 작업할 폴더 하나를 만들어 주세요. 저는 지금 tf_on_lambda라는 폴더에서 진행하고 있습니다. 이 폴더 안에는 Lambda에서 실행할 python파일이 들어가게 되고, 도커와 이 폴더를 이어줄 것이기 때문에 새 폴더 하나를 만들어서 진행하시는 것을 추천합니다.

폴더를 만들고 들어가셨다면 다음 명령어를 입력해 AmazonLinux 이미지를 받아 도커로 띄워주세요.

1
docker run -v $(pwd):/outputs --name lambdapack -d amazonlinux:1 tail -f /dev/null

도커 컨테이너의 이름을 lambdapack으로 지정하고 현재 폴더($(pwd))를 도커의 /outputs폴더로 연결해줍니다.

도커가 실행된 모습

성공적으로 받아졌다면 다음과 같이 임의의 난수 id가 생깁니다. 그리고 docker ps라는 명령어로 현재 실행중인 컨테이너들을 확인해보면 다음과 같이 lambdapack라는 이름을 가진 컨테이너가 생성된 것을 볼 수 있습니다.

도커 ps

람다가 실행할 python 파일 작성하기

그러면 이제 람다가 실제로 실행할 python 파일을 만들어줍시다. 이번 가이드에서는 이 파일의 이름을 index.py라고 지어보았습니다.

index.py 파일은 사실 어떤 이름으로 해도 상관없습니다만, AWS에 람다 함수를 만들 때 handler함수 위치 지정을 파일이름.함수이름, 즉 index.handler와 같이 적어줄 것이기 때문에 대표적인 이름을 가진 파일로 만들어 주시면 됩니다.

index.py안에는 다음과 같은 내용으로 작성해 봅시다. 중요한 부분은 handler함수입니다.

전체 코드를 바로 이용하시려면 index.py on GIST을 이용하세요.

제일 먼저 해줘야 하는 부분은 우리가 사용할 라이브러리를 import하는 것이죠.

1
2
3
4
5
6
7
8
9
10
11
12
import boto3 # AWS S3 접근용
from tensorflow.python import keras # Keras!
from tensorflow.python.keras.preprocessing import image
from tensorflow.python.keras.applications.resnet50 import preprocess_input, decode_predictions
import numpy as np
import io # File 객체를 메모리상에서만 이용하도록
import os # os.path / os.environ
from PIL import Image # Image 객체
import urllib.request # 파일받기

# (.h5경로변경추가, 레포의 squeezenet.py를 확인하세요.)
from squeezenet import Squeezenet # 커스텀한 squeezenet

이 중 boto3 라이브러리는 AWS Lambda의 python3내에 이미 설치되어있기 때문에 특정 버전boto3을 이용하시려는게 아니라면 도커 컨테이너에 설치하지 않아도 됩니다. (즉, Lambda에 올릴 패키지 zip파일에 boto3이 들어있지 않아도 괜찮습니다.)

import를 끝냈으니 코드를 작성해 봅시다. 우선 S3에서 파일을 다운로드/업로드하는 함수를 만들어줍시다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ACCESS_KEY = os.environ.get('ACCESS_KEY')
SECRET_KEY = os.environ.get('SECRET_KEY')

def downloadFromS3(strBucket, s3_path, local_path):
s3_client = boto3.client(
's3',
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY,
)
s3_client.download_file(strBucket, s3_path, local_path)

def uploadToS3(bucket, s3_path, local_path):
s3_client = boto3.client(
's3',
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY,
)
s3_client.upload_file(local_path, bucket, s3_path)

AWS 콘솔에서 람다 함수별로 환경변수를 설정해줄 수 있기 때문에, 위과 같이 os.environ을 통해 설정한 환경변수 값을 가져옵시다.

물론 여기에 설정한 Access Key와 Secret Key의 iam 유저는 당연히 해당 S3 버킷에 R/W권한이 있어야 합니다.

이 두가지 함수를 통해 S3에서 모델을 가져오고, 모델로 추론한 결과물을 S3에 넣어줄 수 있습니다.

Note: s3 버킷과 람다 함수는 같은 Region에 있어야 데이터 전송 비용이 발생하지 않습니다.

이제 가장 중요한 부분인 handler함수를 살펴봅시다.

handler함수는 기본적으로 eventcontext를 인자로 전달받습니다. 이때 우리가 사용하는 인자는 event인자입니다.

우리의 사용 시나리오 중 ‘S3에 이미지 파일을 올린다’, 이 부분이 바로 event의 내용이 됩니다.

S3 Event의 내용

AWS에서 Lambda 실행이 트리거 될 때 전달되는 event는 파이썬의 딕셔너리 형태로 전달됩니다.

만약 여러분이 s3파일이 추가되는 이벤트를 Lambda에 연결해두셨다면 파일이 업로드 될 때 마다 아래와 같은 딕셔너리가 전달됩니다.

아래 예시는 csv_icon.pngkeras-blog라는 버킷내 wowwow 폴더에 올렸을때 발생한 event 객체입니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# event 객체
{
'Records': [
{
'eventVersion': '2.0',
'eventSource': 'aws:s3',
'awsRegion': 'ap-northeast-2', # 버킷 리전
'eventTime': '2017-12-13T03:28:13.528Z', # 업로드 완료 시각
'eventName': 'ObjectCreated:Put',
'userIdentity': {'principalId': 'AFK2RA1O3ML1F'},
'requestParameters': {'sourceIPAddress': '123.24.137.5'},
'responseElements': {
'x-amz-request-id': '1214K424C14C384D',
'x-amz-id-2': 'BOTBfAoB/gKBbn412ITN4t2psTW499iMRKZDK/CQTsjrkeSSzSdsDUMGabcdnvHeYNtbTDHoHKs='
},
's3': {
's3SchemaVersion': '1.0', 'configurationId': 'b249eeda-3d48-4319-a7e2-853f964c1a25',
'bucket': {
'name': 'keras-blog', # 버킷 이름
'ownerIdentity': {
'principalId': 'AFK2RA1O3ML1F'
},
'arn': 'arn:aws:s3:::keras-blog'
},
'object': {
'key': 'wowwow/csv_icon.png', # 버킷 내 파일의 절대경로
'size': 11733, # 파일 크기
'eTag': 'f2d12d123aebda1cc1fk17479207e838',
'sequencer': '125B119E4D7B2A0A48'
}
}
}
]
}

여기서 봐야 하는 것은 bucketnameobjectkey입니다. 각각 업로드된 버킷의 이름과 버킷 내 파일이 업로드된 경로를 알려주기 때문에 S3 내 업로드된 파일의 절대경로를 알 수 있습니다.

따라서 handler함수 내 다음과 같이 버킷이름과 버킷 내 파일의 경로를 얻을 수 있습니다.

1
2
3
4
5
6
# 윗부분 생략
def handler(event, context):
bucket_name = event['Records'][0]['s3']['bucket']['name']
# bucket_name은 'keras-blog' 가 됩니다.
file_path = event['Records'][0]['s3']['object']['key']
# file_path는 'wowwow/csv_icon.png'가 됩니다.

이를 통해 파일 업로드 이벤트 발생시마다 어떤 파일을 처리해야할 지 알 수 있습니다.

s3에서 이미지 파일 받아오기

이제 어떤 파일을 처리해야 할지 알 수 있게 되었으니 downloadFromS3 함수를 통해 실제로 파일을 가져와봅시다.

1
2
3
4
5
6
# 윗부분 생략
def handler(event, context):
bucket_name = event['Records'][0]['s3']['bucket']['name']
file_path = event['Records'][0]['s3']['object']['key']
file_name = file_path.split('/')[-1] # csv_icon.png
downloadFromS3(bucket_name, file_path, '/tmp/'+file_name)

위 코드를 보면 s3에 올라간 파일을 /tmp안에 받는 것을 볼 수 있습니다. AWS Lambda에서는 ‘쓰기’ 권한을 가진 것은 오직 /tmp폴더뿐이기 때문에 우리가 파일을 받아 사용하려면 /tmp폴더 내에 다운받아야 합니다. (혹은 온메모리에 File 객체로 들고있는 방법도 있습니다.)

Prediction Model 다운받기 (Optional)

만약 여러분이 그냥 사용한다면 Github에서 파일을 다운받게 됩니다. 이때 속도가 굉장히 느려 lambda비용이 많이 발생하기 때문에 여러분의 s3 버킷에 실제 파일을 올리고 s3에서 파일을 받아 사용하시는 것을 추천합니다.

우리는 위에서 squeezenet모델을 사용했는데, 이때 모델 가중치를 담은 .h5파일을 먼저 받아야 합니다. handler 함수 내 다음 두 파일을 더 받아줍시다.

1
2
3
4
5
6
7
8
9
10
def handler(event, context):
bucket_name = event['Records'][0]['s3']['bucket']['name']
file_path = event['Records'][0]['s3']['object']['key']
file_name = file_path.split('/')[-1]
downloadFromS3(bucket_name, file_path, '/tmp/'+file_name)
downloadFromS3(
'keras-blog',
'squeezenet/squeezenet_weights_tf_dim_ordering_tf_kernels.h5',
'/tmp/squeezenet_weights_tf_dim_ordering_tf_kernels.h5'
) # weights용 h5를 s3에서 받아오기

squeezenet 모델로 Predict 하기

이제 s3에 올라간 이미지 파일을 Lambda내 /tmp폴더에 받았으니 Predict를 진행해봅시다. predict라는 함수를 아래와 같이 이미지 경로를 받아 결과를 반환하도록 만들어 줍시다.

1
2
3
4
5
6
7
8
9
10
11
12
13
# index.py 파일, handler함수보다 앞에 
def predict(img_local_path):
model = Squeezenet(weights='imagenet')
img = image.load_img(img_local_path, target_size=(224, 224))
x = image.img_to_array(img)
x = np.expand_dims(x, axis=0)
x = preprocess_input(x)
preds = model.predict(x)
res = decode_predictions(preds)
return res

def handler(event, context):
# 내용 생략 ...

predict함수는 squeezenet의 Pre-trained 모델을 이용해 이미지 예측을 진행합니다.

그러면 실제 handler함수에서 predict함수를 실행하도록 수정해줍시다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def predict(img_local_path):
# 내용 생략 ...

def handler(event, context):
bucket_name = event['Records'][0]['s3']['bucket']['name']
file_path = event['Records'][0]['s3']['object']['key']
file_name = file_path.split('/')[-1]
downloadFromS3(bucket_name, file_path, '/tmp/'+file_name)
downloadFromS3(
'keras-blog',
'squeezenet/squeezenet_weights_tf_dim_ordering_tf_kernels.h5',
'/tmp/squeezenet_weights_tf_dim_ordering_tf_kernels.h5'
)
result = predict('/tmp/'+file_name) # 파일 경로 전달
return result

완성이네요!

DynamoDB에 결과 올리기(Optional)

위 predict의 결과는 단순히 결과가 생기기만 하고 결과를 저장하거나 알려주는 부분은 없습니다. 이번 글에서는 간단한 예시로 AWS DynamoDB에 쌓아보는 부분을 추가해보겠습니다.

predict함수를 통해 생성된 result는 다음과 같은 모습이 됩니다.

1
2
# result[0]의 내용, tuples in list
[('n02099712', 'Labrador_retriever', 0.68165195), ('n02099601', 'golden_retriever', 0.18365686), ('n02104029', 'kuvasz', 0.12076716), ('n02111500', 'Great_Pyrenees', 0.0042763283), ('n04409515', 'tennis_ball', 0.002152696)]

따라서 map을 이용해 다음과 같이 바꿔줄 수 있습니다.

1
2
_tmp_dic = {x[1]:{'N':str(x[2])} for x in result[0]}
dic_for_dynamodb = {'M': _tmp_dic}

그러면 dic_for_dynamodb는 아래와 같은 형태로 나오게 됩니다.

NOTE: 숫자는 floatint가 아닌 str로 바꾸어 전달해야 오류가 나지 않습니다. DynamoDB의 제약입니다.

1
2
3
4
5
6
7
8
9
{
'M':{
'Labrador_retriever': {'N': '0.68165195'},
'golden_retriever': {'N': '0.18365686'},
'kuvasz': {'N': '0.12076716'},
'Great_Pyrenees': {'N': '0.0042763283'},
'tennis_ball': {'N': '0.002152696'}
}
}

데이터를 넣기 위해서는 dict타입으로 만든 객체를 put할수 있는데, 이때 각각의 키에 대해 타입을 알려줘야 합니다. M은 이 객체가 dict타입이라는 것을, N은 이 타입이 숫자라는 것을, S는 문자열이라는 것을 의미합니다.

더 상세한 내용은 DynamoDB에 데이터 넣기를 참고하세요.

이제 이 방식을 이용해 result를 반환하기 전 DynamoDB에 데이터를 넣어줄 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def handler(event, context):
# 중간 생략 ...
result = predict('/tmp/'+file_name)
_tmp_dic = {x[1]:{'N':str(x[2])} for x in result[0]}
dic_for_dynamodb = {'M': _tmp_dic}
dynamo_client = boto3.client(
'dynamodb',
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY,
region_name='ap-northeast-2' # DynamoDB는 리전 이름이 필요합니다.
)
dynamo_client.put_item(
TableName='keras-blog-result', # DynamoDB의 Table이름
Item={
'filename': {
'S': file_name,
},
'predicts': dic_for_dynamodb,
}
)
return result

도커로 Lambda에 올릴 pack.zip파일 만들기

AWS Lambda에 함수를 올리려면 AmazonLinux에 맞게 pip패키지들을 index.py 옆에 같이 설치해준 뒤 압축파일(.zip)으로 묶어 업로드해야 합니다.

이때 약간의 제약이 있는데, AWS콘솔에서 Lambda로 바로 업로드를 하려면 .zip파일이 50MB보다 작아야 하고, S3에 .zip파일을 올린 뒤 Lambda에서 가져와 사용하려면 압축을 푼 크기가 250MB보다 작아야 합니다.

문제는 Tensorflow나 기타 라이브러리를 모두 설치하면 용량이 무지막지하게 커진다는 점인데요, 이를 해결하기 위해 사용하지 않는 부분을 strip하는 방법이 들어갑니다.

앞서만든 AmazonLinux 기반 컨테이너인 lambdapack를 이용해 패키지들을 설치하고 하나의 압축파일로 만들어줍시다.

아래 내용의 buildPack.sh파일을 index.py 옆에 만들어 주세요.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# buildPack.sh
dev_install () {
yum -y update
yum -y upgrade
yum install -y \
wget \
gcc \
gcc-c++ \
python36-devel \
python36-virtualenv \
python36-pip \
findutils \
zlib-devel \
zip
}

pip_rasterio () {
cd /home/
rm -rf env
python3 -m virtualenv env --python=python3
source env/bin/activate
text="
[global]
index-url=http://ftp.daumkakao.com/pypi/simple
trusted-host=ftp.daumkakao.com
"
echo "$text" > $VIRTUAL_ENV/pip.conf
echo "UNDER: pip.conf ==="
cat $VIRTUAL_ENV/pip.conf
pip install -U pip wheel
pip install --use-wheel "h5py==2.6.0"
pip install "pillow==4.0.0"
pip install protobuf html5lib bleach --no-deps
pip install --use-wheel tensorflow --no-deps
deactivate
}


gather_pack () {
# packing
cd /home/
source env/bin/activate

rm -rf lambdapack
mkdir lambdapack
cd lambdapack

cp -R /home/env/lib/python3.6/site-packages/* .
cp -R /home/env/lib64/python3.6/site-packages/* .
cp /outputs/squeezenet.py /home/lambdapack/squeezenet.py
cp /outputs/index.py /home/lambdapack/index.py
echo "original size $(du -sh /home/lambdapack | cut -f1)"

# cleaning libs
rm -rf external
find . -type d -name "tests" -exec rm -rf {} +

# cleaning
find -name "*.so" | xargs strip
find -name "*.so.*" | xargs strip
rm -r pip
rm -r pip-*
rm -r wheel
rm -r wheel-*
rm easy_install.py
find . -name \*.pyc -delete
echo "stripped size $(du -sh /home/lambdapack | cut -f1)"

# compressing
zip -FS -r1 /outputs/pack.zip * > /dev/null
echo "compressed size $(du -sh /outputs/pack.zip | cut -f1)"
}

main () {
dev_install
pip_rasterio
gather_pack
}

main

dev_install 함수에서는 운영체제에 Python3/pip3등을 설치해 주고, pip_rasterio 함수에서는 가상환경에 들어가 tensorflow등 pip로 패키지들을 설치해 주고, gather_pack 함수에서는 가상환경에 설치된 패키지들과 index.py파일을 한 폴더에 모은 뒤 pack.zip파일로 압축해줍니다.

중간에 pip.conf를 바꾸는 부분을 통해 느린 pip global cdn대신 kakao의 pip 미러서버로 좀 더 패키지들을 빠르게 받을 수 있습니다. 이 방법은 여러분의 pip에도 바로 적용할 수 있습니다.

이 sh 파일을 도커 내에서 실행하기 위해서 다음 명령어를 사용해 실행해주세요.

1
docker exec -it lambdapack /bin/bash /outputs/buildPack.sh

이 명령어는 lambdapack이라는 컨테이너에서 buildPack.sh파일을 실행하게 됩니다.

실행하고 나면 약 50MB안팎의 pack.zip파일 하나가 생긴것을 볼 수 있습니다.

하지만 앞서 언급한 것처럼, AWS 콘솔에서 ‘ZIP 파일 올리기’로 한번에 올릴수 있는 압축파일의 용량은 50MB로 제한됩니다. 따라서 이 zip 파일을 s3에 올린 뒤 zip파일의 HTTP주소를 넣어줘야 합니다.

zip파일 s3에 올리고 AWS Lambda 트리거 만들기

이제 AWS 콘솔을 볼 때가 되었습니다.

지금까지 작업한 것은 Lambda에 올릴 패키지/코드를 압축한 파일인데요, 이 부분을 약간 수정해 이제 실제로 AWS Lambda의 이벤트를 통해 실행해 봅시다.

s3에 파일 업로드하기

S3에 파일을 올린 뒤 파일의 HTTPS주소를 복사해주세요.

s3의 pack.zip HTTP주소

여기에서는 https://s3.ap-northeast-2.amazonaws.com/keras-blog/pack.zip가 주소가 됩니다.

이제 진짜로 AWS Lambda 함수를 만들어봅시다.

Lambda 콘솔 함수만들기에 들어가 “새로 작성”을 선택 후 아래와 같이 내용을 채운 뒤 함수 생성을 눌러주세요.

함수가 생성되고 나면 화면 아래쪽 ‘함수 코드’에서 다음과 같이 AWS s3에서 업로드를 선택하고 런타임을 Python3.6으로 잡은 뒤 핸들러를 index.handler로 바꾸고 S3링크를 넣어준 뒤 ‘저장’을 눌러주세요.

그 뒤, ‘기본 설정’에서 메모리를 1500MB 이상으로, 그리고 제한시간은 30초 이상으로 잡아주세요. 저는 테스트를 위해 3000MB/5분으로 잡아주었습니다.

이제 s3에서 파일이 추가될때 자동으로 실행되도록 만들어 주기 위해 다음과 같이 ‘구성’에서 s3를 선택해주세요.

트리거에서 s3 선택하기

이제 화면 아래에 ‘트리거 구성’ 메뉴가 나오면 아래 스크린샷처럼, 파일을 올릴 s3, 그리고 어떤 이벤트(파일 업로드/삭제/복사 등)를 탐지할지 선택하고, 접두사에서 폴더 경로를 폴더이름/으로 써 준 뒤, 필요한 경우 접미사(주로 파일 확장자)를 써 주면 됩니다.

S3 트리거 구성

이번에는 keras-blog라는 버킷 내 uploads폴더 내에 어떤 파일이든 생성되기만 하면 모두 람다 함수를 실행시키는 것으로 만들어 본 것입니다.

NOTE: 접두사/접미사에 or 조건은 AWS콘솔에서 지원하지 않습니다.

추가버튼을 누르고 난 뒤 저장을 눌러주면 됩니다.

함수 저장하기

트리거가 성공적으로 저장 되었다면 다음과 같은 화면을 볼 수 있을거에요.

람다 함수 다 만든 모습

Lambda 환경변수 추가하기

우리가 앞서 index.py에서 os.environ을 통해 시스템의 환경변수를 가져왔습니다. 이를 정상적으로 동작하게 하기 위해서는 ACCESS_KEYSECRET_KEY을 추가해주어야 합니다. 아래 스크린샷처럼 각각 값을 입력하고 저장해주세요.

환경변수 추가하기

이 키는 AWS iam을 통해 가져올 수 있습니다. 해당 iam계정은 s3 R/W권한, DynamoDB write 권한이 있어야 합니다.

Lambda 내 테스트 돌리기(Optional)

AWS Lambda 콘솔에서도 테스트를 돌릴 수 있습니다.

아래와 같이 event 객체를 만들어 전달하면 실제 이벤트처럼 동작합니다.

예제 테스트

1
2
3
4
5
6
7
8
9
10
{
"Records": [
{
"s3": {
"bucket": {"name": "keras-blog"},
"object": {"key": "uploads/kitten.png"}
}
}
]
}

유의: 실제로 keras-blog버킷 내 uploads폴더 내 kitten.png파일이 있어야 테스트가 성공합니다! (인터넷의 아무 사진이나 넣어두세요.)

테스트가 성공하면 다음과 같이 return된 결과가 json으로 보입니다.

람다콘솔 테스트 성공

마무리: 파일 업로드하고 DB에 쌓이는지 확인하기

이제 s3에 가서 파일을 업로드 해 봅시다.

s3에 파일 업로드

keras-blog 버킷 내 uploads폴더에 고양이 사진 몇 개를 올려봅시다.

몇초 기다리면 DynamoDB에 다음과 같이 파일 이름과 Predict된 결과가 쌓이는 것을 볼 수 있습니다.

DynamoDB에 쌓인 결과

이제 우리는 파일이 1개가 올라가든 1000개가 올라가든 모두 동일한 속도로 결과를 얻을 수 있습니다.

한글이 보이는 Flask CSV Response 만들기

들어가며

웹 사이트를 만들다 보면 테이블 등을 csv파일로 다운받을 수 있도록 만들어달라는 요청이 자주 있습니다. 이번 글에서는 Flask에서 특정 URL로 들어갈 때 CSV파일을 받을 수 있도록 만들고, 다운받은 CSV파일을 엑셀로 열 때 한글이 깨지지 않게 처리해 봅시다.

이번에는 Flask + SQLAlchemy + Pandas를 사용합니다.

Flask 코드짜기

우선 Flask 코드를 하나 봅시다. app.py라는 이름을 갖고 있다고 생각해 봅시다.

아래 코드는 Post라는 모델을 모두 가져와 df라는 DataFrame객체로 만든 뒤 .to_csv를 통해 csv 객체로 만들어 준 뒤 StringIO를 통해 실제 io가 가능한 바이너리형태로 만들어 줍니다.

또, output을 해주기 전 u'\ufeff'를 미리 넣어줘 이 파일이 ‘UTF-8 with BOM’이라는 방식으로 인코딩 되어있다는 것을 명시적으로 알려줍니다.

인코딩 명시를 빼면 엑셀에서 파일을 열 때 한글이 깨져서 나옵니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# app.py
from io import StringIO
from flask import Flask, jsonify, request, Response
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__) # Flask App 만들기
app.config['SQLALCHEMY_DATABASE_URI'] = '데이터베이스 URI' # SQLAlchemy DB 연결하기

db = SQLAlchemy()
db.init_app(app)

# 기타 설정을 해줬다고 가정합니다.

@app.route('/api/post/csv/') # URL 설정하기
def post_list_csv(self):
queryset = Post.query.all()
df = pd.read_sql(queryset.statement, queryset.session.bind) # Pandas가 SQL을 읽도록 만들어주기
output = StringIO()
output.write(u'\ufeff') # 한글 인코딩 위해 UTF-8 with BOM 설정해주기
df.to_csv(output)
# CSV 파일 형태로 브라우저가 파일다운로드라고 인식하도록 만들어주기
response = Response(
output.getvalue(),
mimetype="text/csv",
content_type='application/octet-stream',
)
response.headers["Content-Disposition"] = "attachment; filename=post_export.csv" # 다운받았을때의 파일 이름 지정해주기
return response

PySpark & Hadoop: 2) EMR 클러스터 띄우고 PySpark로 작업 던지기

이번 글은 PySpark & Hadoop: 1) Ubuntu 16.04에 설치하기와 이어지는 글입니다.

들어가며

이전 글에서 우분투에 JAVA/Hadoop/PySpark를 설치해 spark를 통해 EMR로 작업을 던질 EC2를 하나 생성해보았습니다. 이번에는 동일한 VPC그룹에 EMR 클러스터를 생성하고 PySpark의 yarn설정을 통해 원격 EMR에 작업을 던져봅시다.

EMR 클러스터 띄우기

AWS 콘솔에 들어가 EMR을 검색해 EMR 대시보드로 들어갑시다.

AWS 콘솔에서 EMR검색하기

EMR 대시보드에서 ‘클러스터 생성’을 클릭해주세요.

EMR 대시보드 첫화면에서 클러스터 생성 클릭

이제 아래와 같이 클러스터이름을 적어주고, 시작 모드를 ‘클러스터’로, 릴리즈는 최신 릴리즈 버전(현 5.10이 최신)으로, 애플리케이션은 Spark를 선택해주세요.

그리고 EC2 키 페어를 갖고있다면 기존에 갖고있는 .pem파일을, 없다면 새 키를 만들고 진행하세요.

주황색 표시 한 부분 외에는 기본 설정값 그대로 두면 됩니다. 로깅은 필요한 경우 켜고 필요하지 않은 경우 꺼두면 됩니다.

그리고 할 작업에 따라 인스턴스 유형을 r(많은 메모리), c(많은 CPU), i(많은 스토리지), p(GPU)중 선택하고 인스턴스 개수를 원하는 만큼 선택해주면 됩니다.

많으면 많을수록 Spark작업이 빨리 끝나는 한편 비용도 그만큼 많이 듭니다. 여기서는 기본값인 r3.xlarge 인스턴스 3개로 진행해 봅시다. 인스턴스 3대가 생성되면 한대는 Master 노드가, 나머지 두대는 Core 노드가 됩니다. 앞으로 작업을 던지고 관리하는 부분은 모두 Master노드에서 이루어집니다.

EMR Spark 클러스터 만들기

설정이 끝나고 나면 아래 ‘클러스터 생성’ 버튼을 눌러주세요.

클러스터 생성 클릭

클러스터가 시작되고 ‘준비’ 단계가 될 때까지는 약간의 시간(1~3분)이 걸립니다. ‘마스터 퍼블릭 DNS’가 화면에 뜰 때까지 잠시 기다려 줍시다.

클러스터: PySpark화면

클러스터가 준비가 완료되면 아래와 같이 ‘마스터 퍼블릭 DNS’ 주소가 나옵니다.

클러스터: PySpark DNS나온 화면

‘마스터 퍼블릭 DNS’는 앞으로 설정할때 자주 사용하기 때문에 미리 복사를 해 둡시다.

1
2
# 이번에 만들어진 클러스터의 마스터 퍼블릭 DNS
ec2-13-124-83-135.ap-northeast-2.compute.amazonaws.com

이렇게 나오면 우선 클러스터를 사용할 준비가 완료된 것으로 볼 수 있습니다. 이제 다시 앞 글에서 만든 EC2를 설정해봅시다.

EC2 설정 관리하기

이제 EMR 클러스터가 준비가 완료되었으니 EC2 인스턴스에 다시 ssh로 접속을 해 봅시다.

이전 편인 PySpark & Hadoop: 1) Ubuntu 16.04에 설치하기글을 읽고 따라 왔다면 여러분의 EC2에는 아마 JAVA와 PySpark, 그리고 Hadoop이 설치가 되어있을겁니다.

우리는 Hadoop의 yarn을 통해서 EMR 클러스터에 spark작업을 던져주기 때문에 이 부분을 설정을 조금 해줘야 합니다.

이전 편을 따라왔다면 아래 두 파일을 수정해주면 되고, 만약 따로 Hadoop을 설치해줬다면 which hadoop을 통해서 나오는 주소를 약간 수정해 사용해주면 됩니다.

우선 앞서 우리가 Hadoop을 설치해준 곳은 /usr/local/hadoop/bin/hadoop 입니다.

which hadoop

그리고 우리가 수정해줘야 하는 두 파일은 위와 같은 위치에 있는 core-site.xml파일, 그리고 yarn-site.xml 파일입니다. 즉, 절대 경로는 아래와 같습니다.

1
2
3
4
# core-site.xml
/usr/local/hadoop/etc/hadoop/core-site.xml
# yarn-site.xml
/usr/local/hadoop/etc/hadoop/yarn-site.xml

만약 다른 곳에 설치했다면 /하둡을설치한위치/etc/hadoop/ 안의 core-site.xmlyarn-site.xml을 수정하면 됩니다.

core-site.xml 수정하기

이제 core-site.xml파일을 수정해 봅시다.

core-site.xml 수정

core-site.xml에는 다음과 같이 fs.defaultFS라는 name을 가진 property를 하나 추가해주면 됩니다. 그리고 그 값을 hdfs://마스터퍼블릭DNS로 넣어줘야 합니다.

1
2
3
4
5
6
7
8
9
<!-- core-site.xml -->
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://ec2-13-124-83-135.ap-northeast-2.compute.amazonaws.com</value>
</property>
</configuration>

수정은 vim이나 nano등의 편집기를 이용해주세요.

yarn-site.xml 수정하기

이제 다음 파일인 yarn-site.xml파일을 수정해 봅시다.

yarn-site.xml 수정

yarn-site.xml에는 다음과 같이 두가지 설정을 마스터퍼블릭DNS로 넣어줘야 합니다. address에는 포트도 추가적으로 붙여줘야 합니다.

1
2
3
4
5
6
7
8
9
10
11
<?xml version="1.0"?>
<configuration>
<property>
<name>yarn.resourcemanager.address</name>
<value>ec2-13-124-83-135.ap-northeast-2.compute.amazonaws.com:8032</value>
</property>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>ec2-13-124-83-135.ap-northeast-2.compute.amazonaws.com</value>
</property>
</configuration>

이렇게 두 파일을 수정해주었으면 EC2에서 설정을 수정할 부분은 끝났습니다.

EMR 클러스터 설정 관리하기

같은버전 Python 설치하기

Spark에서 파이썬 함수(혹은 파일)을 실행할 때 Spark가 실행되고있는 파이썬 버전과 PySpark등을 통해 Spark 서버로 요청된 파이썬 함수의 버전과 일치하지 않으면 Exception을 일으킵니다.

Python driver 3.4_3.5 Exception

현재 EMR에 설치되어있는 python3은 3.4버전인데, EC2(Ubuntu 16.04)의 파이썬 버전은 3.5버전이기 때문에 Exception이 발생합니다.

아래 세 가지 방법 중 하나를 선택해 해결해주세요.

첫번째 방법: Ubuntu EC2에 Python3.4를 설치하기

Ubuntu16은 공식적으로 Python3.4를 지원하지 않습니다. 하지만 간단한 방법으로 Python3.4를 설치할 수 있습니다.

아래 세 줄을 입력해주세요.

1
2
3
sudo add-apt-repository ppa:fkrull/deadsnakes
sudo apt-get update
sudo apt-get install python3.4 -y

이렇게 하면 Python3.4를 사용할 수 있습니다.

막 설치해준 Python3.4에는 아직 pyspark가 설치되어있지 않으니 아래 명령어로 pyspark를 설치해 줍시다.

1
python3.4 -m pip install -U pyspark --no-cache

이제 EMR 클러스터에 작업을 던져줍시다.

두번째 방법: EMR을 이루는 인스턴스에 원하는 Python버전(3.5)를 설치하기

EMR에 python3.5를 설치해 문제를 해결해 봅시다.

만약 여러분이 Ubuntu 17버전을 사용한다면 기본적으로 Python3.6이 설치되어있기 때문에 아래 코드에서 35 대신 36을 이용해주시면 됩니다.

이제 SSH를 통해 EMR Master에 접속해 봅시다.

1
2
3
chmod 400 sshkey.pem # ssh-add는 권한을 따집니다. 400으로 읽기권한만 남겨두세요.
ssh-add sshkey.pem # 여러분의 .pem 파일 경로를 넣어주세요.
ssh hadoop@ec2-13-124-83-135.ap-northeast-2.compute.amazonaws.com

SSH Login EMR

로그인을 하고 나서 파이썬 버전을 알아봅시다. 파이썬 버전은 아래 사진처럼 볼 수 있습니다.

EMR python은 3.4버전

파이썬이 3.4버전인것을 확인할 수 있습니다.

한편, EC2의 파이썬은 아래와 같이 3.5버전입니다.

EC2 python은 3.5버전

이제 EMR에 Python3.5를 설치해 줍시다.

1
sudo yum install python35

위 명령어를 입력하면 python3.5버전이 설치됩니다.

yum install python35

Y/N을 물어보면 y를 눌러줍시다.

Press y to install

python3 -V를 입력해보면 성공적으로 파이썬 3.5버전이 설치된 것을 볼 수 있습니다.

Python3.5.1

이 과정을 Master / Core 각 인스턴스별로 진행해주시면 됩니다. SSH로 접속 후 python35만 설치하면 됩니다.

이제 EMR 클러스터에 작업을 던져줍시다.

세번째 방법: EMR 클러스터 부트스트랩 이용하기

두번째 방법과 같이 EMR 클러스터를 이루는 인스턴스 하나하나에 들어가 설치를 진행하는 것은 굉장히 비효율적입니다.

그래서 EMR 클러스터가 생성되기 전에 두번째 방법에서와 같이 EMR 클러스터 내에 Python35, Python36을 모두 설치해두면 앞으로도 문제가 없을거에요.

이때 사용할 수 있는 방법이 ‘bootstrap action’ 입니다. bootstrap action은 EMR 클러스터가 생성되기 전 .sh같은 쉘 파일등을 실행할 수 있습니다.

우선 우리가 실행해줄 installpy3536.sh 파일을 로컬에서 하나 만들어 줍시다.

1
2
3
4
5
#!/bin/bash

sudo yum install python34 -y
sudo yum install python35 -y
sudo yum install python36 -y

EMR 클러스터는 아마존리눅스상에서 돌아가기 때문에 yum을 통해 패키지를 설치할 수 있습니다. 각각 python3.4/3.5/3.6버전을 받아 설치해주는 명령어입니다.

이 파일을 s3에 올려줍시다.

우선 빈 버킷 혹은 기존 버킷에 파일을 올려주세요.

빈 s3 버킷 만들기

파일 권한은 기본 권한 그대로 두면 됩니다. 그리고 이 파일은 AWS 외부에서 접근하지 않기 때문에 퍼블릭으로 해둘 필요는 없습니다.

파일 업로드 완료

이제 EMR을 실행하러 가 봅시다.

앞서서는 ‘빠른 옵션’을 이용했지만 이제 ‘고급 옵션’을 이용해야 합니다.

새 EMR 클러스터 만들기

고급 옵션에서 소프트웨어 구성을 다음과 같이 체크하고 ‘다음’을 눌러줍시다.

단계1: 소프트웨어 및 단계

다음 단계인 ‘하드웨어’는 기본값 혹은 필요한 만큼 설정해준 뒤 ‘다음’을 눌러줍시다. 여기서는 기본값으로 넣어줬습니다.

단계2: 하드웨어

이번 단계인 ‘일반 클러스터 설정’이 중요합니다. 여기에서 ‘부트스트랩 작업’을 누르고 ‘사용자 지정 작업’을 선택해주세요.

단계3: 일반 클러스터 설정

‘사용자 지정 작업’을 선택한 뒤 ‘구성 및 추가’를 눌러주세요.

구성 및 추가

추가를 누르면 다음과 같이 ‘이름’, ‘스크립트 위치’를 찾아줘야 합니다. 이름을 InstallPython343536이라고 지어봅시다.

이제 스크립트 옆 폴더 버튼을 눌러줍시다.

아까 만든 installpy3536.sh파일이 있는 버킷에 찾아들어가 installpy3536.sh 파일을 선택해줍시다.

선택을 눌러준 뒤 ‘추가’를 눌러줍시다.

아래와 같이 ‘부트스트랩 작업’에 추가되었다면 ‘다음’을 눌러 클러스터를 만들어 줍시다.

이제 마지막으로 SSH접속을 위한 키 페어를 선택한 후 ‘클러스터 생성’을 눌러줍시다.

이제 생성된 EMR 클러스터에는 python3.4/3.5/3.6이 모두 설치되어있습니다. 이 버전 선택은 아래 PYSPARK_PYTHON 값을 설정할때 변경해 사용하면 됩니다.

ubuntu 유저 만들고 hadoop그룹에 추가하기

python 설치를 마쳤다면 이제 ubuntu유저를 만들어줘야 합니다.

EMR 클러스터 마스터 노드에 작업을 추가해 줄 경우 기본적으로 작업을 실행한 유저(우분투 EC2에서 요청시 기본 유저는 ubuntu)의 이름으로 마스터 노드에서 요청한 유저의 홈 폴더를 찾습니다.

만약 EC2에서 EMR로 요청한다면 ubuntu라는 계정 이름으로 EMR 마스터 노드에서 /home/ubuntu라는 폴더를 찾아 이 폴더에 작업할 파이썬 파일과 의존 패키지 등을 두고 작업을 진행합니다. 하지만 EMR은 기본적으로 hadoop이라는 계정을 사용하고, 따라서 ubuntu라는 유저는 추가해줘야 합니다. 그리고 우리가 새로 만들어준 ubuntu 유저는 하둡에 접근할 권한이 없기 때문에 이 유저를 hadoop그룹에 추가해줘야 합니다.

우분투 계정 만들고 하둡 그룹에 추가

위 사진처럼 두 명령어를 입력해 줍시다.

1
2
sudo adduser ubuntu
sudo usermod -a -G hadoop ubuntu

첫 명령어는 ubuntu라는 유저를 만들고 다음에서 hadoop이라는 그룹에 ubuntu유저를 추가합니다.

이제 우리는 EC2에서 EMR로 분산처리할 함수들을 보낼 수 있습니다.

마무리: 파이(pi) 계산 예제 실행하기

한번 PySpark의 기본 예제중 하나인 pi(원주율) 계산을 진행해 봅시다.

공식 예제: https://github.com/apache/spark/blob/master/examples/src/main/python/pi.py

공식 예제는 스파크와 하둡을 로컬에서 사용합니다. 하지만 우리는 EMR 클러스터에 작업을 던져줄 것이기 때문에 약간 코드를 변경해줘야 합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# pi.py
import sys
from random import random
from operator import add
import os

os.environ["PYSPARK_PYTHON"] = "/usr/bin/python34" # python3.5라면 /usr/bin/python35

from pyspark.sql import SparkSession

if __name__ == "__main__":
"""
Usage: pi [partitions]
"""
# 이 부분을 추가해주시고
spark = SparkSession \
.builder \
.master("yarn") \
.appName("PySpark") \
.getOrCreate()

# 이부분을 주석처리해주세요.
#spark = SparkSession\
# .builder\
# .appName("PythonPi")\
# .getOrCreate()

partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
n = 100000 * partitions

def f(_):
x = random() * 2 - 1
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 <= 1 else 0

count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n))

spark.stop()

기존 코드는 builder를 통해 로컬에서 작업을 던져주지만 이렇게 .master("yarn")을 추가해주면 yarn 설정을 통해 아래 작업이 EMR 클러스터에서 동작하게 됩니다.

EC2상에서 아래 명령어로 위 파이썬 파일을 실행해 봅시다.

1
python3.4 pi.py

실행을 해 보면 결과가 잘 나오는 것을 볼 수 있습니다.

pi is roughly 3.144720

만약 두번째/세번째 방법으로 Python3.5를 설치해주셨다면 별다른 설정 없이 python3 pi.py로 실행하셔도 됩니다.

자주 보이는 에러/경고

WARN yarn.Client: Same path resource

새 task를 Spark로 넘겨줄 때 마다 패키지를 찾기 때문에 나오는 에러입니다. 무시해도 됩니다.

Initial job has not accepted any resources

EMR 설정 중 spark.dynamicAllocation.enabledTrue일 경우 생기는 문제입니다.

pi.py파일 코드를 일부 수정해주세요.

기존에 있던 spark 생성하는 부분에 아래 config 몇줄을 추가해주세요.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 기존 코드를 지우고
# spark = SparkSession \
# .builder \
# .master("yarn") \
# .appName("PySpark") \
# .getOrCreate()

# 아래 코드로 바꿔주세요.
spark = SparkSession.builder \
.master("yarn") \
.appName("PySpark") \
.config("spark.executor.memory", "512M") \
.config("spark.yarn.am.memory", "512M") \
.config("spark.executor.cores", 2) \
.config("spark.executor.instances", 1) \
.config("spark.dynamicAllocation.enabled", False) \
.getOrCreate()

이때 각 config별로 설정되는 값은 여러분이 띄운 EMR에 따라 설정해줘야 합니다. 만약 여러분이 r3.xlarge를 선택했다면 8개의 vCPU, 30.5 GiB 메모리를 사용하기 때문에 저 설정 숫자들을 높게 잡아도 되지만, 만약 c4.large를 선택했다면 2개의 vCPU, 3.8 GiB 메모리를 사용하기 때문에 코드에서 설정한 CPU코어수 혹은 메모리 용량이 클러스터의 CPU개수와 메모리 용량을 초과할 경우 에러가 납니다.

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×