티스토리 뷰

Intro

Delta format에 대해서 알아봐야겠다는 생각을 하던 참에, 뿌리를 뽑아두면 좋을 것 같아서 delta가 풀고자하는 기본 개념과, 어떻게 구현되었는지 감을 잡기 위해서 간단한 동작을 체크해보았습니다.

Delta Lake의 탄색 배경 (feat. ChatGPT)

빅데이터 환경이 발전하면서 데이터 레이크(Data Lake) 가 대규모 데이터를 저장하는 핵심 기술로 자리 잡았습니다. 하지만 기존의 데이터 레이크에는 다음과 같은 문제점들이 있었습니다.

1. 데이터 무결성 부족

데이터 레이크는 주로 Apache Parquet, ORC, Avro 같은 파일 포맷을 사용하여 데이터를 저장하지만, 이들은 ACID(원자성, 일관성, 격리성, 지속성) 트랜잭션을 지원하지 않습니다. 따라서 다음과 같은 문제가 발생할 수 있습니다.

  • 데이터가 부분적으로 쓰이거나 손상될 가능성
  • 동일한 데이터를 중복 저장할 위험
  • 데이터 정합성 문제로 인한 비즈니스 로직 오류

2 스키마 변경이 어려움

데이터는 지속적으로 변화하지만, 기존 데이터 레이크는 스키마 진화(Schema Evolution) 를 효율적으로 지원하지 못했습니다. 예를 들어, 새로운 컬럼이 추가되거나 데이터 타입이 변경될 경우 데이터 손실 없이 적용하기 어려웠습니다.

3 성능 문제

데이터 레이크는 대량의 데이터를 저장할 수 있지만, 성능 측면에서는 단점이 있었습니다.

  • 파일 리스트 스캔 비용이 높음: Parquet 같은 파일 기반 데이터 저장소에서는 테이블의 변경 이력을 쉽게 추적할 수 없어 쿼리 성능이 저하됨.
  • 조각화 문제(Fragmentation Issue): 소규모 파일이 다수 존재하면 성능이 저하되고, 관리도 어려워짐.

4 실시간 데이터 처리가 어려움

데이터 웨어하우스와 다르게, 데이터 레이크는 실시간 분석 및 스트리밍 처리를 직접적으로 지원하지 않았습니다. 대부분의 데이터 레이크 시스템은 배치 처리(Batch Processing)에 최적화되어 있어 스트리밍 데이터와 배치 데이터를 효율적으로 결합하는 것이 어려웠습니다.

Delta Lake의 주요 기능

1. ACID 트랜잭션 지원

Delta Lake는 트랜잭션 로그(Transaction Log) 를 기반으로 ACID 보장을 제공합니다. 이를 통해 여러 사용자가 동시에 데이터를 읽고 쓰더라도 데이터 정합성을 유지할 수 있습니다.

  • 원자성(Atomicity): 작업이 완전히 수행되거나 전혀 수행되지 않음.
  • 일관성(Consistency): 데이터가 항상 유효한 상태를 유지.
  • 격리성(Isolation): 여러 트랜잭션이 동시에 수행되더라도 서로 영향을 주지 않음.
  • 지속성(Durability): 커밋된 데이터는 영구적으로 유지됨.

2. 시간여행(Time Travel)

Delta Lake는 데이터 변경 이력을 유지하여 과거의 데이터 상태를 조회할 수 있는 기능을 제공합니다. 이를 통해 데이터 복구 및 디버깅이 용이해집니다.

3 스키마 진화(Schema Evolution)

Delta Lake는 데이터의 구조가 변할 때 자동으로 스키마를 조정할 수 있습니다.

4 데이터 정리(Optimize & Vacuum)

Delta Lake는 소규모 파일을 병합하여 성능을 최적화하는 기능을 제공합니다.

  • OPTIMIZE: 작은 파일을 병합하여 쿼리 성능을 향상.
  • VACUUM: 오래된 파일을 정리하여 저장소 공간을 절약.

5 스트리밍 & 배치 통합

Delta Lake는 배치 처리와 스트리밍 처리를 동일한 방식으로 사용할 수 있도록 지원합니다. 

Delta format으로 table 생성, 추가, 삭제, 업데이트 해보기

Delta format으로 table을 간단하게 조작해보면서 기능들이 실제로 어떻게 작동하는지 확인해보려고 합니다.

Common

spark에서 delta format을 사용하기 위해서는 spark을 두개의 dependency를 설치해야 한다.

pyspark==3.5.4
delta-spark==3.3.0

그리고 spark session을 만들 때 아래와 같은 작업을 해주어야 delta를 사용해서 spark을 사용할 수 있다.

import pyspark
from delta import configure_spark_with_delta_pip

builder = pyspark.sql.SparkSession.builder. \\
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \\
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

configure_spark_with_delta에서 delta 관련된 package를 주입하는 것을 볼 수 있다.

이번에는 local에서 저장이 어떻게 되는지 체크할 것이기 때문에 delta table path도 정의해두고 시작하겠다

delta_table_path = "tmp/delta-table-partitioned"

Table 생성

# 예시 DataFrame 생성
data = [
    (1, "Alice", "Seoul", 3000),
    (2, "Bob",   "Seoul", 4000),
    (3, "Chris", "Busan", 3500),
    (4, "David", "Busan", 2000),
    (5, "Eve",   "Daegu", 2500),
]
columns = ["id", "name", "city", "salary"]

df = spark.createDataFrame(data, columns)

# city 컬럼으로 파티셔닝하여 Delta 포맷으로 저장
df.write.format("delta") \\
    .partitionBy("city") \\
    .mode("overwrite") \\
    .save(delta_table_path)

Delta 테이블을 생성하면 _delta_log 디렉터리가 생성되며, 이곳에 메타데이터와 트랜잭션 로그가 저장됩니다.

├── _delta_log
│   ├── 00000000000000000000.crc
│   ├── 00000000000000000000.json
│   └── _commits
├── city=Busan
│   ├── part-00009-17d3573e-1c83-446d-a461-666dd0ddeeb7.c000.snappy.parquet
│   └── part-00012-afc8579b-a01a-4932-bf33-4f1428ddd93a.c000.snappy.parquet
├── city=Daegu
│   └── part-00015-4b3fc160-9c12-4404-ad17-d6966ce5983c.c000.snappy.parquet
└── city=Seoul
    ├── part-00003-f0689dc8-d84a-42b7-8403-204ed7982e7a.c000.snappy.parquet
    └── part-00006-e4611934-6c08-4ff5-9abd-41b3062b845a.c000.snappy.parquet
  • .crc 파일 (spec): 테이블의 무결성을 보장하는 체크섬 파일입니다. tableSizeBytes, numFiles, metadata, allFiles 목록을 포함합니다.
  • .json 파일(spec): Delta 로그 항목이 저장되는 파일입니다. 테이블의 메타데이터(metaData), 프로토콜(protocol), 데이터 파일 추가(add) 등의 작업이 기록됩니다.
// 00000000000000000000.crc
{
  "txnId": "f68f1422-3f2e-4b6c-a4a3-bc34e70aeeb2",
  "tableSizeBytes": 4841,
  "numFiles": 5,
  "numMetadata": 1,
  "numProtocol": 1,
  "setTransactions": [],
  "domainMetadata": [],
  "metadata": {
    "id": "89bb32d1-fe8d-45fa-85c8-05a49c160978",
    "format": {
      "provider": "parquet",
      "options": {}
    },
    "schemaString": "{\\"type\\":\\"struct\\",\\"fields\\":[{\\"name\\":\\"id\\",\\"type\\":\\"long\\",\\"nullable\\":true,\\"metadata\\":{}},{\\"name\\":\\"name\\",\\"type\\":\\"string\\",\\"nullable\\":true,\\"metadata\\":{}},{\\"name\\":\\"city\\",\\"type\\":\\"string\\",\\"nullable\\":true,\\"metadata\\":{}},{\\"name\\":\\"salary\\",\\"type\\":\\"long\\",\\"nullable\\":true,\\"metadata\\":{}}]}",
    "partitionColumns": [
      "city"
    ],
    "configuration": {},
    "createdTime": 1740915285394
  },
  "protocol": {
    "minReaderVersion": 1,
    "minWriterVersion": 2
  },
  "allFiles": [
    {
      "path": "city=Busan/part-00009-17d3573e-1c83-446d-a461-666dd0ddeeb7.c000.snappy.parquet",
      "partitionValues": {
        "city": "Busan"
      },
      "size": 974,
      "modificationTime": 1740915287382,
      "dataChange": false,
      "stats": "{\\"numRecords\\":1,\\"minValues\\":{\\"id\\":3,\\"name\\":\\"Chris\\",\\"salary\\":3500},\\"maxValues\\":{\\"id\\":3,\\"name\\":\\"Chris\\",\\"salary\\":3500},\\"nullCount\\":{\\"id\\":0,\\"name\\":0,\\"salary\\":0}}"
    },
    {
      "path": "city=Busan/part-00012-afc8579b-a01a-4932-bf33-4f1428ddd93a.c000.snappy.parquet",
      "partitionValues": {
        "city": "Busan"
      },
      "size": 974,
      "modificationTime": 1740915287382,
      "dataChange": false,
      "stats": "{\\"numRecords\\":1,\\"minValues\\":{\\"id\\":4,\\"name\\":\\"David\\",\\"salary\\":2000},\\"maxValues\\":{\\"id\\":4,\\"name\\":\\"David\\",\\"salary\\":2000},\\"nullCount\\":{\\"id\\":0,\\"name\\":0,\\"salary\\":0}}"
    },
    {
      "path": "city=Seoul/part-00006-e4611934-6c08-4ff5-9abd-41b3062b845a.c000.snappy.parquet",
      "partitionValues": {
        "city": "Seoul"
      },
      "size": 960,
      "modificationTime": 1740915287382,
      "dataChange": false,
      "stats": "{\\"numRecords\\":1,\\"minValues\\":{\\"id\\":2,\\"name\\":\\"Bob\\",\\"salary\\":4000},\\"maxValues\\":{\\"id\\":2,\\"name\\":\\"Bob\\",\\"salary\\":4000},\\"nullCount\\":{\\"id\\":0,\\"name\\":0,\\"salary\\":0}}"
    },
    {
      "path": "city=Daegu/part-00015-4b3fc160-9c12-4404-ad17-d6966ce5983c.c000.snappy.parquet",
      "partitionValues": {
        "city": "Daegu"
      },
      "size": 959,
      "modificationTime": 1740915287382,
      "dataChange": false,
      "stats": "{\\"numRecords\\":1,\\"minValues\\":{\\"id\\":5,\\"name\\":\\"Eve\\",\\"salary\\":2500},\\"maxValues\\":{\\"id\\":5,\\"name\\":\\"Eve\\",\\"salary\\":2500},\\"nullCount\\":{\\"id\\":0,\\"name\\":0,\\"salary\\":0}}"
    },
    {
      "path": "city=Seoul/part-00003-f0689dc8-d84a-42b7-8403-204ed7982e7a.c000.snappy.parquet",
      "partitionValues": {
        "city": "Seoul"
      },
      "size": 974,
      "modificationTime": 1740915287382,
      "dataChange": false,
      "stats": "{\\"numRecords\\":1,\\"minValues\\":{\\"id\\":1,\\"name\\":\\"Alice\\",\\"salary\\":3000},\\"maxValues\\":{\\"id\\":1,\\"name\\":\\"Alice\\",\\"salary\\":3000},\\"nullCount\\":{\\"id\\":0,\\"name\\":0,\\"salary\\":0}}"
    }
  ]
}

 

실제 생성된 crc 파일을 내용중에 눈에 띄는 내용을 적어보면

  • tableSizeBytes: 4841 bytes → 테이블 크기를 나타냅니다.
  • numFiles: 5개의 데이터 파일이 등록되었습니다.
  • allFiles: 추가된 데이터 파일 목록과 해당 크기가 기록되었습니다.
// 00000000000000000000.json
{
  "commitInfo": {
    "timestamp": 1740915287984,
    "operation": "WRITE",
    "operationParameters": {
      "mode": "Overwrite",
      "partitionBy": "[\\"city\\"]"
    },
    "isolationLevel": "Serializable",
    "isBlindAppend": false,
    "operationMetrics": {
      "numFiles": "5",
      "numOutputRows": "5",
      "numOutputBytes": "4841"
    },
    "engineInfo": "Apache-Spark/3.5.5 Delta-Lake/3.3.0",
    "txnId": "f68f1422-3f2e-4b6c-a4a3-bc34e70aeeb2"
  }
}
{
  "metaData": {
    "id": "89bb32d1-fe8d-45fa-85c8-05a49c160978",
    "format": {
      "provider": "parquet",
      "options": {}
    },
    "schemaString": "{\\"type\\":\\"struct\\",\\"fields\\":[{\\"name\\":\\"id\\",\\"type\\":\\"long\\",\\"nullable\\":true,\\"metadata\\":{}},{\\"name\\":\\"name\\",\\"type\\":\\"string\\",\\"nullable\\":true,\\"metadata\\":{}},{\\"name\\":\\"city\\",\\"type\\":\\"string\\",\\"nullable\\":true,\\"metadata\\":{}},{\\"name\\":\\"salary\\",\\"type\\":\\"long\\",\\"nullable\\":true,\\"metadata\\":{}}]}",
    "partitionColumns": [
      "city"
    ],
    "configuration": {},
    "createdTime": 1740915285394
  }
}
{
  "protocol": {
    "minReaderVersion": 1,
    "minWriterVersion": 2
  }
}
{
  "add": {
    "path": "city=Seoul/part-00003-f0689dc8-d84a-42b7-8403-204ed7982e7a.c000.snappy.parquet",
    "partitionValues": {
      "city": "Seoul"
    },
    "size": 974,
    "modificationTime": 1740915287382,
    "dataChange": true,
    "stats": "{\\"numRecords\\":1,\\"minValues\\":{\\"id\\":1,\\"name\\":\\"Alice\\",\\"salary\\":3000},\\"maxValues\\":{\\"id\\":1,\\"name\\":\\"Alice\\",\\"salary\\":3000},\\"nullCount\\":{\\"id\\":0,\\"name\\":0,\\"salary\\":0}}"
  }
}
{
  "add": {
    "path": "city=Seoul/part-00006-e4611934-6c08-4ff5-9abd-41b3062b845a.c000.snappy.parquet",
    "partitionValues": {
      "city": "Seoul"
    },
    "size": 960,
    "modificationTime": 1740915287382,
    "dataChange": true,
    "stats": "{\\"numRecords\\":1,\\"minValues\\":{\\"id\\":2,\\"name\\":\\"Bob\\",\\"salary\\":4000},\\"maxValues\\":{\\"id\\":2,\\"name\\":\\"Bob\\",\\"salary\\":4000},\\"nullCount\\":{\\"id\\":0,\\"name\\":0,\\"salary\\":0}}"
  }
}
{
  "add": {
    "path": "city=Busan/part-00009-17d3573e-1c83-446d-a461-666dd0ddeeb7.c000.snappy.parquet",
    "partitionValues": {
      "city": "Busan"
    },
    "size": 974,
    "modificationTime": 1740915287382,
    "dataChange": true,
    "stats": "{\\"numRecords\\":1,\\"minValues\\":{\\"id\\":3,\\"name\\":\\"Chris\\",\\"salary\\":3500},\\"maxValues\\":{\\"id\\":3,\\"name\\":\\"Chris\\",\\"salary\\":3500},\\"nullCount\\":{\\"id\\":0,\\"name\\":0,\\"salary\\":0}}"
  }
}
{
  "add": {
    "path": "city=Busan/part-00012-afc8579b-a01a-4932-bf33-4f1428ddd93a.c000.snappy.parquet",
    "partitionValues": {
      "city": "Busan"
    },
    "size": 974,
    "modificationTime": 1740915287382,
    "dataChange": true,
    "stats": "{\\"numRecords\\":1,\\"minValues\\":{\\"id\\":4,\\"name\\":\\"David\\",\\"salary\\":2000},\\"maxValues\\":{\\"id\\":4,\\"name\\":\\"David\\",\\"salary\\":2000},\\"nullCount\\":{\\"id\\":0,\\"name\\":0,\\"salary\\":0}}"
  }
}
{
  "add": {
    "path": "city=Daegu/part-00015-4b3fc160-9c12-4404-ad17-d6966ce5983c.c000.snappy.parquet",
    "partitionValues": {
      "city": "Daegu"
    },
    "size": 959,
    "modificationTime": 1740915287382,
    "dataChange": true,
    "stats": "{\\"numRecords\\":1,\\"minValues\\":{\\"id\\":5,\\"name\\":\\"Eve\\",\\"salary\\":2500},\\"maxValues\\":{\\"id\\":5,\\"name\\":\\"Eve\\",\\"salary\\":2500},\\"nullCount\\":{\\"id\\":0,\\"name\\":0,\\"salary\\":0}}"
  }
}

 

 

json 파일에는 여러개의 operation이 개행되어서 적혀져 있는데, 이번 테이블 생성 작업에서 한 내용과 함께 첫 metadata도 적어주는 것을 볼 수 있다.

  • operation: "WRITE" → 새로운 테이블이 생성되었음을 나타냅니다.
  • partitionBy: "city" → 테이블이 city 컬럼을 기준으로 파티셔닝되었습니다.
  • numFiles: "5" → 5개의 파티션 파일이 저장되었음을 의미합니다.
  • txnId: "f68f1422-3f2e-4b6c-a4a3-bc34e70aeeb2" → 트랜잭션 ID가 부여되었습니다.

데이터 추가하기

아래 코드에서는 새로운 데이터를 생성한 후, 기존 Delta 테이블에 append 모드로 추가합니다.

columns = ["id", "name", "city", "salary"]
new_data = [
    (6, "Frank",  "Seoul", 4500),
    (7, "Grace",  "Busan", 2700),
]
df_new = spark.createDataFrame(new_data, columns)

df_new.write.format("delta") \\
    .mode("append") \\
    .save(delta_table_path)

디렉터리 구조 변경 확인

데이터를 추가한 후, _delta_log 내부에 새로운 트랜잭션 파일과 새로운 파일이 생성된 걸 확인할 수 있습니다.

├── _delta_log
│   ├── 00000000000000000000.crc
│   ├── 00000000000000000000.json
│   ├── 00000000000000000001.crc # 추가된 파일
│   ├── 00000000000000000001.json # 추가된 파일
│   └── _commits
├── city=Busan
│   ├── part-00009-17d3573e-1c83-446d-a461-666dd0ddeeb7.c000.snappy.parquet
│   ├── part-00012-afc8579b-a01a-4932-bf33-4f1428ddd93a.c000.snappy.parquet
│   └── part-00015-57f2b5e6-e5ec-4c84-aecc-59b58d6d99ae.c000.snappy.parquet # 추가된 파일
├── city=Daegu
│   └── part-00015-4b3fc160-9c12-4404-ad17-d6966ce5983c.c000.snappy.parquet
└── city=Seoul
    ├── part-00003-f0689dc8-d84a-42b7-8403-204ed7982e7a.c000.snappy.parquet
    ├── part-00006-e4611934-6c08-4ff5-9abd-41b3062b845a.c000.snappy.parquet
    └── part-00007-092eb046-a2d6-491b-957d-eee67c40bf16.c000.snappy.parquet # 추가된 파일

 

CRC 파일

{
  "txnId": "155a92eb-11b0-4ec0-886c-c7f477150dba",
  "tableSizeBytes": 6788,
  "numFiles": 7,
  ...
  "allFiles": [
    ...
    {
      "path": "city=Busan/part-00015-57f2b5e6-e5ec-4c84-aecc-59b58d6d99ae.c000.snappy.parquet",
      "partitionValues": {
        "city": "Busan"
      },
      "size": 974,
      "modificationTime": 1740915322822,
      "dataChange": false,
      "stats": "{\\"numRecords\\":1,\\"minValues\\":{\\"id\\":7,\\"name\\":\\"Grace\\",\\"salary\\":2700},\\"maxValues\\":{\\"id\\":7,\\"name\\":\\"Grace\\",\\"salary\\":2700},\\"nullCount\\":{\\"id\\":0,\\"name\\":0,\\"salary\\":0}}"
    },
    ...
    {
      "path": "city=Seoul/part-00007-092eb046-a2d6-491b-957d-eee67c40bf16.c000.snappy.parquet",
      "partitionValues": {
        "city": "Seoul"
      },
      "size": 973,
      "modificationTime": 1740915322821,
      "dataChange": false,
      "stats": "{\\"numRecords\\":1,\\"minValues\\":{\\"id\\":6,\\"name\\":\\"Frank\\",\\"salary\\":4500},\\"maxValues\\":{\\"id\\":6,\\"name\\":\\"Frank\\",\\"salary\\":4500},\\"nullCount\\":{\\"id\\":0,\\"name\\":0,\\"salary\\":0}}"
    }
  ]
}

변경된 사항

  • numFiles: 7 → 기존 5개에서 2개 증가하여 7개의 데이터 파일이 존재함.
  • tableSizeBytes: 6788 → 테이블 크기가 증가함.
  • allFiles에 새로운 데이터 파일 2개가 추가됨.

JSON 파일

{
  "commitInfo": {
    "timestamp": 1740915322878,
    "operation": "WRITE",
    "operationParameters": {
      "mode": "Append",
      "partitionBy": "[]"
    },
    "readVersion": 0,
    "isolationLevel": "Serializable",
    "isBlindAppend": true,
    "operationMetrics": {
      "numFiles": "2",
      "numOutputRows": "2",
      "numOutputBytes": "1947"
    },
    "engineInfo": "Apache-Spark/3.5.5 Delta-Lake/3.3.0",
    "txnId": "155a92eb-11b0-4ec0-886c-c7f477150dba"
  }
}
{
  "add": {
    "path": "city=Seoul/part-00007-092eb046-a2d6-491b-957d-eee67c40bf16.c000.snappy.parquet",
    "partitionValues": {
      "city": "Seoul"
    },
    "size": 973,
    "modificationTime": 1740915322821,
    "dataChange": true,
    "stats": "{\\"numRecords\\":1,\\"minValues\\":{\\"id\\":6,\\"name\\":\\"Frank\\",\\"salary\\":4500},\\"maxValues\\":{\\"id\\":6,\\"name\\":\\"Frank\\",\\"salary\\":4500},\\"nullCount\\":{\\"id\\":0,\\"name\\":0,\\"salary\\":0}}"
  }
}
{
  "add": {
    "path": "city=Busan/part-00015-57f2b5e6-e5ec-4c84-aecc-59b58d6d99ae.c000.snappy.parquet",
    "partitionValues": {
      "city": "Busan"
    },
    "size": 974,
    "modificationTime": 1740915322822,
    "dataChange": true,
    "stats": "{\\"numRecords\\":1,\\"minValues\\":{\\"id\\":7,\\"name\\":\\"Grace\\",\\"salary\\":2700},\\"maxValues\\":{\\"id\\":7,\\"name\\":\\"Grace\\",\\"salary\\":2700},\\"nullCount\\":{\\"id\\":0,\\"name\\":0,\\"salary\\":0}}"
  }
}

  • operation: "WRITE" → 새로운 데이터가 추가되었음을 나타냄.
  • mode: "Append" → 기존 데이터를 덮어쓰지 않고 추가됨.
  • numFiles: "2" → 두 개의 새로운 파일이 추가됨.
  • numOutputRows: "2" → 2개의 새로운 레코드가 삽입됨.
  • txnId: 트랜잭션 ID가 새롭게 생성됨.

삭제

Delta 테이블에서 특정 조건을 만족하는 데이터를 삭제시에 어떻게 바뀌는지 확인해봅니다.

# DeltaTable 객체 생성
from delta import DeltaTable

from common import delta_table_path
from common import spark

delta_table = DeltaTable.forPath(spark, delta_table_path)

delta_table.delete(condition="salary <= 2000")

spark.read.format("delta").load(delta_table_path).show()

디렉터리 구조 변경 확인

삭제 작업을 수행한 후, _delta_log 내부에 새로운 트랜잭션 파일이 생성되었습니다.

.
├── _delta_log
│   ├── 00000000000000000000.crc
│   ├── 00000000000000000000.json
│   ├── 00000000000000000001.crc
│   ├── 00000000000000000001.json
│   ├── 00000000000000000002.crc # 추가된 파일
│   ├── 00000000000000000002.json # 추가된 파일
│   └── _commits
├── city=Busan
│   ├── part-00009-17d3573e-1c83-446d-a461-666dd0ddeeb7.c000.snappy.parquet
│   ├── part-00012-afc8579b-a01a-4932-bf33-4f1428ddd93a.c000.snappy.parquet
│   └── part-00015-57f2b5e6-e5ec-4c84-aecc-59b58d6d99ae.c000.snappy.parquet
├── city=Daegu
│   └── part-00015-4b3fc160-9c12-4404-ad17-d6966ce5983c.c000.snappy.parquet
└── city=Seoul
    ├── part-00003-f0689dc8-d84a-42b7-8403-204ed7982e7a.c000.snappy.parquet
    ├── part-00006-e4611934-6c08-4ff5-9abd-41b3062b845a.c000.snappy.parquet
    └── part-00007-092eb046-a2d6-491b-957d-eee67c40bf16.c000.snappy.parquet

CRC 파일

 

{
  "txnId": "bf5e804f-f762-454a-b169-c75f690202d3",
  "tableSizeBytes": 5814,
  "numFiles": 6,
  ...
  "allFiles": [
  // part-00012-afc8579b-a01a-4932-bf33-4f1428ddd93a.c000.snappy.parquet 사라짐
    ...
  ]
}

변경된 사항

  • numFiles: 7 → 6 → 삭제된 파일 수 반영됨.
  • tableSizeBytes: 파일 크기 감소 → 삭제된 파일 크기만큼 테이블 크기가 줄어듦.
  • allFiles 목록에서 삭제된 파일이 제거됨.

JSON 파일

{
  "commitInfo": {
    "timestamp": 1740915358874,
    "operation": "DELETE",
    "operationParameters": {
      "predicate": "[\\"(salary#3L <= 2000)\\"]"
    },
    "readVersion": 1,
    "isolationLevel": "Serializable",
    "isBlindAppend": false,
    "operationMetrics": {
      "numRemovedFiles": "1",
      "numRemovedBytes": "974",
      "numCopiedRows": "0",
      "numDeletionVectorsAdded": "0",
      "numDeletionVectorsRemoved": "0",
      "numAddedChangeFiles": "0",
      "executionTimeMs": "3711",
      "numDeletionVectorsUpdated": "0",
      "numDeletedRows": "1",
      "scanTimeMs": "3531",
      "numAddedFiles": "0",
      "numAddedBytes": "0",
      "rewriteTimeMs": "180"
    },
    "engineInfo": "Apache-Spark/3.5.5 Delta-Lake/3.3.0",
    "txnId": "bf5e804f-f762-454a-b169-c75f690202d3"
  }
}
{
  "remove": {
    "path": "city=Busan/part-00012-afc8579b-a01a-4932-bf33-4f1428ddd93a.c000.snappy.parquet",
    "deletionTimestamp": 1740915358863,
    "dataChange": true,
    "extendedFileMetadata": true,
    "partitionValues": {
      "city": "Busan"
    },
    "size": 974
  }
}

  • operation: "DELETE" → 삭제 작업이 수행되었음을 나타냄.
  • predicate: "(salary#3L <= 2000)" → 삭제 조건이 명시됨.
  • numRemovedFiles: 1개 파일 삭제됨.
  • numRemovedBytes: 974 바이트 삭제됨.
  • remove 섹션에서 삭제된 파일 경로(path) 및 크기(size)가 기록됨.

Delta Lake에서 데이터를 삭제하면 실제 파일이 즉시 제거되지 않고, 삭제되었다는 정보만 기록됩니다. 이는 Delta Lake가 append-only(추가 전용) 방식으로 데이터를 관리하기 때문입니다.

데이터 업데이트 하기

Delta 테이블에서 특정 조건을 만족하는 데이터를 업데이트하는 과정을 테스트해보겠습니다.

delta_table = DeltaTable.forPath(spark, delta_table_path)

delta_table.update(
    condition="city = 'Busan'",
    set={"salary": "salary + 500"}
)

spark.read.format("delta").load(delta_table_path).show()

Delta 로그 파일 구조

업데이트 작업을 수행한 후 _delta_log 내부에 새로운 트랜잭션 파일이 생성됩니다.

.
├── _delta_log
│   ├── 00000000000000000000.crc
│   ├── 00000000000000000000.json
│   ├── 00000000000000000001.crc
│   ├── 00000000000000000001.json
│   ├── 00000000000000000002.crc
│   ├── 00000000000000000002.json
**│   ├── 00000000000000000003.crc
│   ├── 00000000000000000003.json**
│   └── _commits
├── city=Busan
**│   ├── part-00000-60aa7706-de3c-4065-97d9-e95c11bb326a.c000.snappy.parquet
│   ├── part-00001-7eddb677-d160-49cd-8c33-2edd16231f11.c000.snappy.parquet**
│   ├── part-00009-17d3573e-1c83-446d-a461-666dd0ddeeb7.c000.snappy.parquet
│   ├── part-00012-afc8579b-a01a-4932-bf33-4f1428ddd93a.c000.snappy.parquet
│   └── part-00015-57f2b5e6-e5ec-4c84-aecc-59b58d6d99ae.c000.snappy.parquet
├── city=Daegu
│   └── part-00015-4b3fc160-9c12-4404-ad17-d6966ce5983c.c000.snappy.parquet
└── city=Seoul
    ├── part-00003-f0689dc8-d84a-42b7-8403-204ed7982e7a.c000.snappy.parquet
    ├── part-00006-e4611934-6c08-4ff5-9abd-41b3062b845a.c000.snappy.parquet
    └── part-00007-092eb046-a2d6-491b-957d-eee67c40bf16.c000.snappy.parquet

.crc 파일 변경 내용

{
  "txnId": "e28a4de0-3bdf-4687-9f04-25d6f1c82051",
  "tableSizeBytes": 5813,
 ...
  "allFiles": [
    {
      "path": "city=Busan/part-00001-7eddb677-d160-49cd-8c33-2edd16231f11.c000.snappy.parquet",
      "partitionValues": {
        "city": "Busan"
      },
      "size": 973,
      "modificationTime": 1740915405422,
      "dataChange": false,
      "stats": "{\\"numRecords\\":1,\\"minValues\\":{\\"id\\":7,\\"name\\":\\"Grace\\",\\"salary\\":3200},\\"maxValues\\":{\\"id\\":7,\\"name\\":\\"Grace\\",\\"salary\\":3200},\\"nullCount\\":{\\"id\\":0,\\"name\\":0,\\"salary\\":0}}"
    },
    {
      "path": "city=Busan/part-00000-60aa7706-de3c-4065-97d9-e95c11bb326a.c000.snappy.parquet",
      "partitionValues": {
        "city": "Busan"
      },
      "size": 974,
      "modificationTime": 1740915405422,
      "dataChange": false,
      "stats": "{\\"numRecords\\":1,\\"minValues\\":{\\"id\\":3,\\"name\\":\\"Chris\\",\\"salary\\":4000},\\"maxValues\\":{\\"id\\":3,\\"name\\":\\"Chris\\",\\"salary\\":4000},\\"nullCount\\":{\\"id\\":0,\\"name\\":0,\\"salary\\":0}}"
    },
    // city=Busan/part-00009-17d3573e-1c83-446d-a461-666dd0ddeeb7.c000.snappy.parquet
    // city=Busan/part-00015-57f2b5e6-e5ec-4c84-aecc-59b58d6d99ae.c000.snappy.parquet
    ...
  ]
}

  • tableSizeBytes: 파일 크기 업데이트 → 변경된 데이터가 반영됨.
  • allFiles 목록에서 새로운 업데이트된 파일이 추가됨.

.json 파일 내용

{
  "commitInfo": {
    "timestamp": 1740915405456,
    "operation": "UPDATE",
    "operationParameters": {
      "predicate": "[\\"(city#2 = Busan)\\"]"
    },
    "readVersion": 2,
    "isolationLevel": "Serializable",
    "isBlindAppend": false,
    "operationMetrics": {
      "numRemovedFiles": "2",
      "numRemovedBytes": "1948",
      "numCopiedRows": "0",
      "numDeletionVectorsAdded": "0",
      "numDeletionVectorsRemoved": "0",
      "numAddedChangeFiles": "0",
      "executionTimeMs": "3555",
      "numDeletionVectorsUpdated": "0",
      "scanTimeMs": "2914",
      "numAddedFiles": "2",
      "numUpdatedRows": "2",
      "numAddedBytes": "1947",
      "rewriteTimeMs": "640"
    },
    "engineInfo": "Apache-Spark/3.5.5 Delta-Lake/3.3.0",
    "txnId": "e28a4de0-3bdf-4687-9f04-25d6f1c82051"
  }
}
{
  "add": {
    "path": "city=Busan/part-00000-60aa7706-de3c-4065-97d9-e95c11bb326a.c000.snappy.parquet",
    "partitionValues": {
      "city": "Busan"
    },
    "size": 974,
    "modificationTime": 1740915405422,
    "dataChange": true,
    "stats": "{\\"numRecords\\":1,\\"minValues\\":{\\"id\\":3,\\"name\\":\\"Chris\\",\\"salary\\":4000},\\"maxValues\\":{\\"id\\":3,\\"name\\":\\"Chris\\",\\"salary\\":4000},\\"nullCount\\":{\\"id\\":0,\\"name\\":0,\\"salary\\":0}}"
  }
}
{
  "add": {
    "path": "city=Busan/part-00001-7eddb677-d160-49cd-8c33-2edd16231f11.c000.snappy.parquet",
    "partitionValues": {
      "city": "Busan"
    },
    "size": 973,
    "modificationTime": 1740915405422,
    "dataChange": true,
    "stats": "{\\"numRecords\\":1,\\"minValues\\":{\\"id\\":7,\\"name\\":\\"Grace\\",\\"salary\\":3200},\\"maxValues\\":{\\"id\\":7,\\"name\\":\\"Grace\\",\\"salary\\":3200},\\"nullCount\\":{\\"id\\":0,\\"name\\":0,\\"salary\\":0}}"
  }
}
{
  "remove": {
    "path": "city=Busan/part-00009-17d3573e-1c83-446d-a461-666dd0ddeeb7.c000.snappy.parquet",
    "deletionTimestamp": 1740915405446,
    "dataChange": true,
    "extendedFileMetadata": true,
    "partitionValues": {
      "city": "Busan"
    },
    "size": 974
  }
}
{
  "remove": {
    "path": "city=Busan/part-00015-57f2b5e6-e5ec-4c84-aecc-59b58d6d99ae.c000.snappy.parquet",
    "deletionTimestamp": 1740915405446,
    "dataChange": true,
    "extendedFileMetadata": true,
    "partitionValues": {
      "city": "Busan"
    },
    "size": 974
  }
}

  • operation: UPDATE → 데이터가 업데이트되었음을 나타냄.
  • predicate: city = 'Busan' → 특정 조건을 만족하는 데이터가 변경됨.
  • numRemovedFiles: 2개 파일 삭제됨.
  • numAddedFiles: 2개 파일 추가됨.
  • numUpdatedRows: 2개의 행이 업데이트됨.

update도 마찬가지로, 데이터 파일 자체가 삭제되지는 않고 삭제된 기록만 추가된는 것을 볼 수 있다.

Time Travel 하기

마지막으로 time travel 기능을 어떻게 쓸 수 있는지 테스트 해봅시다.

from delta import DeltaTable

from common import delta_table_path
from common import spark

delta_table = DeltaTable.forPath(spark, delta_table_path)
delta_table.history().show(truncate=False)
timestamps = delta_table.history().orderBy("timestamp", ascending=True).select("timestamp").collect()

print("version으로 특정 버전의 테이블을 읽을 수 있습니다.")
spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path).show()
print("version 1")
spark.read.format("delta").option("versionAsOf", 1).load(delta_table_path).show()
print("version 2")
spark.read.format("delta").option("versionAsOf", 2).load(delta_table_path).show()
print("version 3")
spark.read.format("delta").option("versionAsOf", 3).load(delta_table_path).show()

print("timestamp로 특정 시점의 테이블을 읽을 수 있습니다.")
spark.read.format("delta").option("timestampAsOf", timestamps[0][0]).load(delta_table_path).show()
spark.read.format("delta").option("timestampAsOf", timestamps[1][0]).load(delta_table_path).show()
spark.read.format("delta").option("timestampAsOf", timestamps[2][0]).load(delta_table_path).show()
spark.read.format("delta").option("timestampAsOf", timestamps[3][0]).load(delta_table_path).show()

Output

+-------+-----------------------+------+--------+---------+--------------------------------------------+----+--------+---------+-----------+--------------+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+-----------------------------------+
|version|timestamp              |userId|userName|operation|operationParameters                         |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                                                                                                                                                                                                                                                                                              |userMetadata|engineInfo                         |
+-------+-----------------------+------+--------+---------+--------------------------------------------+----+--------+---------+-----------+--------------+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+-----------------------------------+
|3      |2025-03-02 20:36:45.515|NULL  |NULL    |UPDATE   |{predicate -> ["(city#2 = Busan)"]}         |NULL|NULL    |NULL     |2          |Serializable  |false        |{numRemovedFiles -> 2, numRemovedBytes -> 1948, numCopiedRows -> 0, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 3555, numDeletionVectorsUpdated -> 0, scanTimeMs -> 2914, numAddedFiles -> 2, numUpdatedRows -> 2, numAddedBytes -> 1947, rewriteTimeMs -> 640}|NULL        |Apache-Spark/3.5.5 Delta-Lake/3.3.0|
|2      |2025-03-02 20:35:58.939|NULL  |NULL    |DELETE   |{predicate -> ["(salary#3L <= 2000)"]}      |NULL|NULL    |NULL     |1          |Serializable  |false        |{numRemovedFiles -> 1, numRemovedBytes -> 974, numCopiedRows -> 0, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 3711, numDeletionVectorsUpdated -> 0, numDeletedRows -> 1, scanTimeMs -> 3531, numAddedFiles -> 0, numAddedBytes -> 0, rewriteTimeMs -> 180}    |NULL        |Apache-Spark/3.5.5 Delta-Lake/3.3.0|
|1      |2025-03-02 20:35:22.939|NULL  |NULL    |WRITE    |{mode -> Append, partitionBy -> []}         |NULL|NULL    |NULL     |0          |Serializable  |true         |{numFiles -> 2, numOutputRows -> 2, numOutputBytes -> 1947}                                                                                                                                                                                                                                                                   |NULL        |Apache-Spark/3.5.5 Delta-Lake/3.3.0|
|0      |2025-03-02 20:34:48.104|NULL  |NULL    |WRITE    |{mode -> Overwrite, partitionBy -> ["city"]}|NULL|NULL    |NULL     |NULL       |Serializable  |false        |{numFiles -> 5, numOutputRows -> 5, numOutputBytes -> 4841}                                                                                                                                                                                                                                                                   |NULL        |Apache-Spark/3.5.5 Delta-Lake/3.3.0|
+-------+-----------------------+------+--------+---------+--------------------------------------------+----+--------+---------+-----------+--------------+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+-----------------------------------+

version으로 특정 버전의 테이블을 읽을 수 있습니다.
+---+-----+-----+------+
| id| name| city|salary|
+---+-----+-----+------+
|  3|Chris|Busan|  3500|
|  4|David|Busan|  2000|
|  1|Alice|Seoul|  3000|
|  2|  Bob|Seoul|  4000|
|  5|  Eve|Daegu|  2500|
+---+-----+-----+------+

+---+-----+-----+------+
| id| name| city|salary|
+---+-----+-----+------+
|  3|Chris|Busan|  3500|
|  7|Grace|Busan|  2700|
|  4|David|Busan|  2000|
|  1|Alice|Seoul|  3000|
|  6|Frank|Seoul|  4500|
|  2|  Bob|Seoul|  4000|
|  5|  Eve|Daegu|  2500|
+---+-----+-----+------+

+---+-----+-----+------+
| id| name| city|salary|
+---+-----+-----+------+
|  3|Chris|Busan|  3500|
|  7|Grace|Busan|  2700|
|  1|Alice|Seoul|  3000|
|  6|Frank|Seoul|  4500|
|  2|  Bob|Seoul|  4000|
|  5|  Eve|Daegu|  2500|
+---+-----+-----+------+

+---+-----+-----+------+
| id| name| city|salary|
+---+-----+-----+------+
|  3|Chris|Busan|  4000|
|  1|Alice|Seoul|  3000|
|  7|Grace|Busan|  3200|
|  6|Frank|Seoul|  4500|
|  2|  Bob|Seoul|  4000|
|  5|  Eve|Daegu|  2500|
+---+-----+-----+------+

timestamp로 특정 시점의 테이블을 읽을 수 있습니다.
+---+-----+-----+------+
| id| name| city|salary|
+---+-----+-----+------+
|  3|Chris|Busan|  3500|
|  4|David|Busan|  2000|
|  1|Alice|Seoul|  3000|
|  2|  Bob|Seoul|  4000|
|  5|  Eve|Daegu|  2500|
+---+-----+-----+------+

+---+-----+-----+------+
| id| name| city|salary|
+---+-----+-----+------+
|  3|Chris|Busan|  3500|
|  7|Grace|Busan|  2700|
|  4|David|Busan|  2000|
|  1|Alice|Seoul|  3000|
|  6|Frank|Seoul|  4500|
|  2|  Bob|Seoul|  4000|
|  5|  Eve|Daegu|  2500|
+---+-----+-----+------+

+---+-----+-----+------+
| id| name| city|salary|
+---+-----+-----+------+
|  3|Chris|Busan|  3500|
|  7|Grace|Busan|  2700|
|  1|Alice|Seoul|  3000|
|  6|Frank|Seoul|  4500|
|  2|  Bob|Seoul|  4000|
|  5|  Eve|Daegu|  2500|
+---+-----+-----+------+

+---+-----+-----+------+
| id| name| city|salary|
+---+-----+-----+------+
|  3|Chris|Busan|  4000|
|  1|Alice|Seoul|  3000|
|  7|Grace|Busan|  3200|
|  6|Frank|Seoul|  4500|
|  2|  Bob|Seoul|  4000|
|  5|  Eve|Daegu|  2500|
+---+-----+-----+------+

실제로 원하는 타이밍으로 옮겨갈 수 있다는 것을 확인할 수 있다.

Outro

이번 글에서는 Delta Lake의 기본 개념부터 테이블 생성, 데이터 추가, 삭제, 업데이트, 그리고 Time Travel 기능까지 다루며 Delta가 제공하는 핵심 기능을 실습해보았습니다. 생각보다 delta table을 사용하는게 일반적인 parquet로 table을 다루는 것보다 복잡하지 않고, spark에서 쉽게 사용할 수 있다는 것을 느꼈다. 이번에는 기본적인 기능들에 대해서만 확인을 해봤다. 이번에 다루지 못한 내용들이 있는데, PROTOCOL.MD를 보면서 더 챙겨보려고 합다.

반응형