이벤트 데이터 파이프라인 제작하기
최근에 이벤트 관련 데이터 파이프라인을 분리하는 작업을 시작했다. 기존에는 서비스를 운영하면서 나오는 모든 이벤트들을 DB에 저장하는 것을 보고, 언젠가는 꼭 해결한다고 했는데 그걸 이제야 시작한다. 해당 로직을 구상하기 위해서 여러 방법들을 구상한 결과, 기존 AWS 도큐먼트에 있는 시스템을 이용해 보는 것이 좋겠다고 생각했다.
구상하기
현재 시스템은 특정 이벤트에 대한 내용을 클라이언트에서 4곳의 서버 (사용자 분석 툴)로 데이터를 보내주고 있다. 이 중 하나는 DB로 이벤트를 보내게 되는데, 이 시스템 자체가 상당히 비효율적이라는 생각이 들었다. DB는 주로 사용자의 데이터 및 프로덕트의 가변 정보들을 저장하는 곳인데, 이벤트에 대한 데이터를 DB에 저장하기에는 목적에도 맞지 않고 DB 및 WAS에도 영향을 주기 때문에 이를 개선할 필요가 있다고 생각이 들었다.
기본 플로우는 아래의 BI System 도큐먼트를 참고했고, 여기서 클라이언트에서 Kinesis로 바로 접속할 수 있도록 APi Gateway를 연결해 주려고 한다. 그리고 Elastic Search로 데이터를 업로드 하는 로직은 나중에 만들 예정이다. 아직 우리의 환경에서는 이벤트를 Elastic Search 같은 서치 엔진 까지는 도입할 필요가 없다고 생각이 되기 때문이다.
플로우는 다음과 같다.
1. 디바이스에서 API Gateway로 데이터를 전송한다.
2. API Gateway에서 받은 데이터는 정해진 라우팅 규칙에 따라서 Kineis Data Streams로 보내진다.
3. 해당 데이터는 Kinesis Data Firehose로 보낸다. 여기로 보내야 S3로 데이터를 전송할 수가 있다.
4. Kinesis Data Firehose로 받은 데이터는 설계 시 설정한 매핑 규칙으로 S3 버킷에 저장이 된다.
5. S3에 저장된 데이터를 기반으로 Athena에서 Database처럼 사용할 수 있다. SQL문도 사용할 수 있다.
설계하기
Kinesis Data Streams 부터 Athena까지는 아래 링크를 참고했다.
그리고 클라이언트에서 API Gateway로 데이터를 전송하고, API Gateway에서 Kinesis로 데이터를 전달하는 과정은 다음 링크를 참고했다. 여기에 더해서 API Key를 추가해서 보안의 강도를 높였다. 헤더에 x-api-key를 입력하면 데이터 요청이 잘 들어간다.
사용하기
이벤트에 대한 데이터를 입력한다. 아래 코드는 axios에 대한 사용 예시이다.
await axios.put(
"URL앤드포인트",
{
Data: {
createdAt: new Date(),
name: "name",
userId: 123,
},
PartitionKey: "some key",
},
{
headers: {
"x-api-key": "Key value",
},
}
);
위의 요청을 보내고 나면 정상적인 경우에 다음과 같은 응답이 올 것이다. (예시 데이터)
{
SequenceNumber: 'value',
ShardId: 'shardId-000000000000'
}
해당 요청을 보내면 몇 분 뒤에 S3 버킷에 데이터가 저장이 된다.
---- 작성중 ----