1. 기본 환경 설정
최종 구성
Kafka 클러스터는 EKS 위에 Strimzi Operator를 이용해 구성하였고, 여기에 Kafka Connect와 Confluent S3 Sink Connector를 배포하여 데이터를 실시간으로 S3로 적재하는 구조를 만들었다. 구성은 크게 세 부분으로 나눌 수 있다.
- Kafka Connect 설정
- S3 Sink Connector 설정
- EKS Pod Identity를 통한 권한 관리
1. 1 Kafka Connect 설정
Kafka Connect는 Strimzi KafkaConnect CRD로 배포하였다. Confluent S3 Sink 플러그인을 직접 포함한 이미지를 빌드하고, DockerHub에 푸시하여 사용했다.
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect
namespace: kafka
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.9.0
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092
config:
group.id: my-connect-cluster
config.storage.topic: my-connect-configs
offset.storage.topic: my-connect-offsets
status.storage.topic: my-connect-status
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: "false"
build:
output:
type: docker
image: hahxowns/kafka-connect-s3:latest
pushSecret: dockerhub-secret
plugins:
- name: confluent-s3-sink
artifacts:
- type: maven
repository: <https://packages.confluent.io/maven/>
group: io.confluent
artifact: kafka-connect-s3
version: 10.6.5
1.2 S3 Sink Connector 설정
실시간 리뷰 데이터를 S3로 수집하는 KafkaConnector 리소스를 별도로 생성했다.
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: sink-s3-realtime-collection
namespace: kafka
labels:
strimzi.io/cluster: my-connect
spec:
class: io.confluent.connect.s3.S3SinkConnector
tasksMax: 2
config:
topics: realtime-review-collection-topic
s3.bucket.name: hihypipe-raw-data
s3.region: ap-northeast-2
format.class: io.confluent.connect.s3.format.json.JsonFormat
storage.class: io.confluent.connect.s3.storage.S3Storage
partitioner.class: io.confluent.connect.storage.partitioner.DefaultPartitioner
flush.size: "1"
aws.credentials.provider.class: com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: "false"
1.3 EKS Pod Identity 설정
S3 접근을 위해 EKS Pod Identity를 사용했다. IRSA 기반으로 ServiceAccount에 직접 IAM Role을 연결하려 했으나, Strimzi가 주기적으로 ServiceAccount를 덮어쓰는 문제가 있어 Pod Identity 방식을 적용하였다.
# Service Account와 IAM Role 연결
aws eks create-pod-identity-association \\
--cluster-name hihypipe-eks-cluster \\
--role-arn arn:aws:iam::150297826798:role/hihypipe-kafka-connect-pod-identity \\
--namespace kafka \\
--service-account my-connect-connect
IAM Role 신뢰 정책은 다음과 같이 설정하였다.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowEksAuthToAssumeRoleForPodIdentity",
"Effect": "Allow",
"Principal": {
"Service": "pods.eks.amazonaws.com"
},
"Action": [
"sts:AssumeRole",
"sts:TagSession"
]
}
]
}
1.4 S3 IAM 정책
아래의 내용으로 IAM정책을 생성 후 hihypipe-kafka-connect-pod-identity이라는 role에 연결시켜 주었다.
이 Role은 EKS Pod Identity Association을 통해 kafka 네임스페이스의 ServiceAccount my-connect-connect에 매핑되어, 최종적으로 my-connect-connect-0 Kafka Connect Pod가 S3에 접근할 때 사용된다.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject",
"s3:ListBucket",
"s3:GetBucketLocation",
"s3:ListBucketMultipartUploads",
"s3:AbortMultipartUpload",
"s3:ListMultipartUploadParts"
],
"Resource": [
"arn:aws:s3:::hihypipe-raw-data",
"arn:aws:s3:::hihypipe-raw-data/*"
]
}
]
}
2. 배포
# Kafka Connect 배포
kubectl apply -f kubernetes/namespaces/kafka/base/kafka-connect.yaml
# S3 Sink Connector 배포
kubectl apply -f kubernetes/namespaces/kafka/base/kafka-s3-sink-connector.yaml
3. Test
테스트 메시지 전송
echo '{"review_id": "test-001", "user_id": "user-123", "rating": 5, "comment": "테스트 리뷰입니다", "timestamp": "2025-09-09T10:50:00Z"}' \
| kubectl -n kafka run kafka-producer --image=confluentinc/cp-kafka:latest --rm -i --restart=Never -- \
kafka-console-producer --bootstrap-server my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092 \
--topic realtime-review-collection-topic
S3 저장 확인
aws s3 ls s3://hihypipe-raw-data/topics/realtime-review-collection-topic/partition=0/
4. Trouble Shooting
실제 구축 과정에서 여러 문제가 발생했고, 다음과 같이 해결하였다.
4.1 Kafka Connect 빌드 이슈
- 문제: Confluent S3 Sink 10.5.7 버전 사용 시 NoSuchMethodError 발생
- 원인: Kafka 3.9.0과 플러그인 간 버전 불일치
- Confluent용이 아닌 Aiven용 설정과 혼합 사용해 문제 발생
- 해결: 버전을 여러 차례 바꿔 테스트 후 10.6.5 버전에서 안정적으로 동작
4.2 S3 Sink Connector 연동 실패
- 문제: 멀티파트 업로드 권한 누락으로 AccessDeniedException 발생
- 원인: S3 정책 부족 + IRSA 동기화 이슈
- Strimzi가 계속 동기화 되는지 모르고 계속 IRSA에 수동으로 연결시켜주고 왜 안되는거야 하고 있었음
- 해결: S3 정책 확장 및 EKS Pod Identity로 전환
4.3 EKS Pod Identity 권한 문제
- 문제: IAM Role 신뢰정책이 잘못되어 AccessDeniedException 발생
- 해결: AWS 공식 문서 기준 신뢰정책 적용 (Sid 필드 포함)
4.4 AWS 자격증명 문제
- 문제: WebIdentityTokenCredentialsProvider 관련 오류
- 원인: Confluent S3 Sink가 EKS Pod Identity를 인식하지 못함
- 해결: 자격증명 제공자를 EC2ContainerCredentialsProviderWrapper로 명시
4.5 참고 자료
'Data Engineering > Kafka' 카테고리의 다른 글
[Kafka] Kubernetes환경에서 Kafka 구성하기 (with Strimzi) (4) | 2025.08.10 |
---|---|
[Kafka] Kafka 설치 및 KRaft로 Cluster 구성하기 (0) | 2025.07.23 |
[Kafka] Kafka 기본 개념 이해하기 (2) | 2025.07.22 |
[Kafka] 스트리밍 데이터 처리와 Event-Driven Architecture 이해하기 (2) | 2025.07.22 |