[Kafka] EKS에서 Strimzi Kafka Connect와 S3 Sink Connector 설정 및 트러블슈팅

1. 기본 환경 설정

최종 구성

Kafka 클러스터는 EKS 위에 Strimzi Operator를 이용해 구성하였고, 여기에 Kafka Connect와 Confluent S3 Sink Connector를 배포하여 데이터를 실시간으로 S3로 적재하는 구조를 만들었다. 구성은 크게 세 부분으로 나눌 수 있다.

  1. Kafka Connect 설정
  2. S3 Sink Connector 설정
  3. EKS Pod Identity를 통한 권한 관리

1. 1 Kafka Connect 설정

Kafka Connect는 Strimzi KafkaConnect CRD로 배포하였다. Confluent S3 Sink 플러그인을 직접 포함한 이미지를 빌드하고, DockerHub에 푸시하여 사용했다.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect
  namespace: kafka
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.9.0
  replicas: 1
  bootstrapServers: my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092
  config:
    group.id: my-connect-cluster
    config.storage.topic: my-connect-configs
    offset.storage.topic: my-connect-offsets
    status.storage.topic: my-connect-status
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable: "false"
  build:
    output:
      type: docker
      image: hahxowns/kafka-connect-s3:latest
      pushSecret: dockerhub-secret
    plugins:
      - name: confluent-s3-sink
        artifacts:
          - type: maven
            repository: <https://packages.confluent.io/maven/>
            group: io.confluent
            artifact: kafka-connect-s3
            version: 10.6.5
 

1.2 S3 Sink Connector 설정

실시간 리뷰 데이터를 S3로 수집하는 KafkaConnector 리소스를 별도로 생성했다.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: sink-s3-realtime-collection
  namespace: kafka
  labels:
    strimzi.io/cluster: my-connect
spec:
  class: io.confluent.connect.s3.S3SinkConnector
  tasksMax: 2
  config:
    topics: realtime-review-collection-topic
    s3.bucket.name: hihypipe-raw-data
    s3.region: ap-northeast-2
    format.class: io.confluent.connect.s3.format.json.JsonFormat
    storage.class: io.confluent.connect.s3.storage.S3Storage
    partitioner.class: io.confluent.connect.storage.partitioner.DefaultPartitioner
    flush.size: "1"
    aws.credentials.provider.class: com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable: "false"
 

1.3 EKS Pod Identity 설정

S3 접근을 위해 EKS Pod Identity를 사용했다. IRSA 기반으로 ServiceAccount에 직접 IAM Role을 연결하려 했으나, Strimzi가 주기적으로 ServiceAccount를 덮어쓰는 문제가 있어 Pod Identity 방식을 적용하였다.

# Service Account와 IAM Role 연결
aws eks create-pod-identity-association \\
  --cluster-name hihypipe-eks-cluster \\
  --role-arn arn:aws:iam::150297826798:role/hihypipe-kafka-connect-pod-identity \\
  --namespace kafka \\
  --service-account my-connect-connect

 

 

IAM Role 신뢰 정책은 다음과 같이 설정하였다.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "AllowEksAuthToAssumeRoleForPodIdentity",
      "Effect": "Allow",
      "Principal": {
        "Service": "pods.eks.amazonaws.com"
      },
      "Action": [
        "sts:AssumeRole",
        "sts:TagSession"
      ]
    }
  ]
}

1.4 S3 IAM 정책

아래의 내용으로 IAM정책을 생성 후  hihypipe-kafka-connect-pod-identity이라는 role에 연결시켜 주었다.

이 Role은 EKS Pod Identity Association을 통해 kafka 네임스페이스의 ServiceAccount my-connect-connect에 매핑되어, 최종적으로 my-connect-connect-0 Kafka Connect Pod가 S3에 접근할 때 사용된다.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:PutObject",
        "s3:DeleteObject",
        "s3:ListBucket",
        "s3:GetBucketLocation",
        "s3:ListBucketMultipartUploads",
        "s3:AbortMultipartUpload",
        "s3:ListMultipartUploadParts"
      ],
      "Resource": [
        "arn:aws:s3:::hihypipe-raw-data",
        "arn:aws:s3:::hihypipe-raw-data/*"
      ]
    }
  ]
}

2. 배포

# Kafka Connect 배포
kubectl apply -f kubernetes/namespaces/kafka/base/kafka-connect.yaml

# S3 Sink Connector 배포
kubectl apply -f kubernetes/namespaces/kafka/base/kafka-s3-sink-connector.yaml

 


3. Test

테스트 메시지 전송

echo '{"review_id": "test-001", "user_id": "user-123", "rating": 5, "comment": "테스트 리뷰입니다", "timestamp": "2025-09-09T10:50:00Z"}' \
| kubectl -n kafka run kafka-producer --image=confluentinc/cp-kafka:latest --rm -i --restart=Never -- \
kafka-console-producer --bootstrap-server my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092 \
--topic realtime-review-collection-topic

 

S3 저장 확인

aws s3 ls s3://hihypipe-raw-data/topics/realtime-review-collection-topic/partition=0/

4. Trouble Shooting

실제 구축 과정에서 여러 문제가 발생했고, 다음과 같이 해결하였다.

4.1 Kafka Connect 빌드 이슈

  • 문제: Confluent S3 Sink 10.5.7 버전 사용 시 NoSuchMethodError 발생
  • 원인: Kafka 3.9.0과 플러그인 간 버전 불일치
    • Confluent용이 아닌 Aiven용 설정과 혼합 사용해 문제 발생
  • 해결: 버전을 여러 차례 바꿔 테스트 후 10.6.5 버전에서 안정적으로 동작

 

4.2 S3 Sink Connector 연동 실패

  • 문제: 멀티파트 업로드 권한 누락으로 AccessDeniedException 발생
  • 원인: S3 정책 부족 + IRSA 동기화 이슈
    • Strimzi가 계속 동기화 되는지 모르고 계속 IRSA에 수동으로 연결시켜주고 왜 안되는거야 하고 있었음
  • 해결: S3 정책 확장 및 EKS Pod Identity로 전환

 

4.3 EKS Pod Identity 권한 문제

  • 문제: IAM Role 신뢰정책이 잘못되어 AccessDeniedException 발생
  • 해결: AWS 공식 문서 기준 신뢰정책 적용 (Sid 필드 포함)

 

4.4 AWS 자격증명 문제

  • 문제: WebIdentityTokenCredentialsProvider 관련 오류
  • 원인: Confluent S3 Sink가 EKS Pod Identity를 인식하지 못함
  • 해결: 자격증명 제공자를 EC2ContainerCredentialsProviderWrapper로 명시

 

4.5 참고 자료