arrowパッケージの基礎

この文書は2022年12月17日に開催されたR研究集会2022での発表「Rによる大規模データの処理」の補足資料です。 発表の中で扱った{arrow}パッケージの使い方を紹介します。

スライド: https://speakerdeck.com/s_uryu/introduction-to-r-arrow

ベンチマーク編

パッケージの読み込み

{arrow}パッケージの他にいくつかのパッケージを利用します。

library(arrow) # 10.0.1
library(dplyr) # 1.0.10
library(data.table) # 1.14.6

{arrow}パッケージを使った処理の流れ

{arrow}パッケージを使った処理は次の手順で行うことになります。

  1. データ(.csv.parquet.arrow)をArrow Tableオブジェクトに読み込む
  2. dplyrパッケージのデータ関数による操作・集計
  3. collect()関数によるデータフレームへの変換

全体像を理解するために簡単な例を示します。

まず、データをArrowでのインメモリの処理を行うために読み込む必要があります。 {arrow}ではcsvやparquetなどの形式のファイルを読みこみ、Arrow Tableオブジェクトとします。 単一または複数のファイルを読み込む関数が用意されていますが、ここでは単一のparquetファイルを読み込むための関数read_parquet()を使います。

d <- 
  read_parquet(
  here::here("data/zoo.parquet"),
  # as_dat_frame = FALSEでArrow Tableオブジェクトになる
  as_data_frame = FALSE)

オブジェクトのクラスを確認します。

class(d)
[1] "Table"        "ArrowTabular" "ArrowObject"  "R6"          

この状態ではデータの値を参照することはできません。 データの大きさ、データがどのような列を持っているのか(列名とデータタイプ)の情報が表示されるだけです。

d
Table
22 rows x 4 columns
$taxon <string>
$name <string>
$body_length_cm <double>
$weight_kg <double>

See $metadata for additional Schema metadata

Arrow Tableオブジェクトに対して、{dplyr}の関数を使ったデータ操作が適用できます。{arrow}では{dplyr}の他に{stringr}や{lubridate}、組み込みのRの関数をラップした関数が提供されており、データフレームを操作するようにデータへの処理を行うことができます。

# dplyrのデータ操作を行う
result <- 
  d |> 
  select(name, taxon, body_length_cm) |> 
  filter(taxon %in% c("霊長類", "齧歯類", "鳥類")) |>
  group_by(taxon) |> 
  summarise(body_length_mean = mean(body_length_cm))

result
Table (query)
taxon: string
body_length_mean: double

See $.data for the source Arrow object

この状態でもデータの実の値は表示されません。 しかしデータに処理を加える前の状態とは異なることがわかります。 具体的には一行目の出力がTableからTable (query)と変化した点、 列名の表示が絞り込まれている点です。

では、記述した処理をデータに適用するにはどうするのでしょうか。 {arrow}ではデータベース上のデータを{dplyr}の関数を使って操作する際や{dplyr}をバックエンドにdata.tableオブジェクトを操作する{dtplyr}のように、collect()関数によって処理を適用します。 これにより{arrow}のオブジェクトからデータフレームを得ることができます。

result |> 
  # データフレームとして返り値を得る
  collect()
# A tibble: 3 × 2
  taxon  body_length_mean
  <chr>             <dbl>
1 鳥類               82.0
2 霊長類             66.5
3 齧歯類             87  

{duckdb}パッケージとの連携

{arrow}ではArrow C++へのインターフェイスを提供し、list_compute_functions()によってその関数の一覧を確認することができます。またArrow query engineと呼ばれる{dplyr}などの関数は?aceroにより確認できます。 一方、その他の多くのRの関数は{arrow}で直接利用できません。 その場合、{arrow}は一度データをデータフレームに変換してから処理を継続するか、エラーで処理を停止させることになります。

例を示すために別のデータ(徳島県の2022年10月の断面交通量)を用意します。

d <-
  read_parquet(
    here::here("data/typeB/36_tokushima/year=2022/month=10/part-0.parquet"),
    as_data_frame = FALSE)

まずはデータフレームに変換する場合です。この場合、警告が出るものの処理は行われます。大きなデータを扱う場合には、Arrowの処理の恩恵を受けることができないために予想以上に時間がかかることがあるかもしれません。

d |> 
  select(datetime) |>
  mutate(is_jholiday = zipangu::is_jholiday(datetime))
Warning: Expression zipangu::is_jholiday(datetime) not supported in Arrow;
pulling data into R
                    datetime is_jholiday
      1: 2022-10-01 00:00:00       FALSE
      2: 2022-10-01 00:00:00       FALSE
      3: 2022-10-01 00:00:00       FALSE
      4: 2022-10-01 00:00:00       FALSE
      5: 2022-10-01 00:00:00       FALSE
     ---                                
2384769: 2022-10-27 02:00:00       FALSE
2384770: 2022-10-27 02:00:00       FALSE
2384771: 2022-10-27 02:00:00       FALSE
2384772: 2022-10-27 02:00:00       FALSE
2384773: 2022-10-27 02:00:00       FALSE

続いて処理が停止される例です。

d |> 
  group_by(location_no) |> 
  slice_min(order_by = traffic, n = 1)
Error: Slicing grouped data not supported in Arrow

このような時にはArrowオブジェクトを{duckbd}パッケージが提供する仮想的なDuckDBオブジェクトに変換することで処理を継続できることがあります。

d |> 
  group_by(location_no) |> 
  # {duckdb}へデータを渡す
  to_duckdb() |> 
  slice_min(order_by = traffic, n = 1) |> 
  collect()
# A tibble: 25,237 × 10
# Groups:   location_no [274]
   datetime            source_…¹ locat…² locat…³ meshc…⁴ link_…⁵ link_no traffic
   <dttm>              <chr>       <int> <chr>   <chr>     <int>   <int>   <int>
 1 2022-10-29 17:50:00 3028           54 西ノ丸… 513404        2     662       2
 2 2022-10-29 19:50:00 3028           54 西ノ丸… 513404        2     662       2
 3 2022-10-29 20:15:00 3028           54 西ノ丸… 513404        2     662       2
 4 2022-10-29 21:40:00 3028           54 西ノ丸… 513404        2     662       2
 5 2022-10-05 17:55:00 3028           54 西ノ丸… 513404        2     662       2
 6 2022-10-05 18:05:00 3028           54 西ノ丸… 513404        2     662       2
 7 2022-10-05 18:50:00 3028           54 西ノ丸… 513404        2     662       2
 8 2022-10-05 18:55:00 3028           54 西ノ丸… 513404        2     662       2
 9 2022-10-05 19:20:00 3028           54 西ノ丸… 513404        2     662       2
10 2022-10-05 20:15:00 3028           54 西ノ丸… 513404        2     662       2
# … with 25,227 more rows, 2 more variables: to_link_end_10m <chr>,
#   link_ver <int>, and abbreviated variable names ¹​source_code, ²​location_no,
#   ³​location_name, ⁴​meshcode10km, ⁵​link_type

複数のファイルを一度に読み込む

{arrow}では複数のcsv、parquetファイルを読み込むための関数、open_dataset()を用意しています。この関数の利用により、共通の列配列をもつデータを一度に読み込むことができます。

# 東京都の2021年のデータ(12ヶ月分)を読み込む
open_dataset(here::here("data/typeB/13_tokyo/year=2021/"))
FileSystemDataset with 12 Parquet files
datetime: timestamp[us, tz=Asia/Tokyo]
source_code: string
location_no: int32
location_name: string
meshcode10km: string
link_type: int32
link_no: int32
traffic: int32
to_link_end_10m: string
link_ver: int32
month: int32

See $metadata for additional Schema metadata
fs::dir_tree(
  here::here("data/typeB/13_tokyo/year=2021/"))
/Users/suryu/Documents/projects2022/talk_221217_rjpusers/data/typeB/13_tokyo/year=2021/
├── month=1
│   └── part-0.parquet
├── month=10
│   └── part-0.parquet
├── month=11
│   └── part-0.parquet
├── month=12
│   └── part-0.parquet
├── month=2
│   └── part-0.parquet
├── month=3
│   └── part-0.parquet
├── month=4
│   └── part-0.parquet
├── month=5
│   └── part-0.parquet
├── month=6
│   └── part-0.parquet
├── month=7
│   └── part-0.parquet
├── month=8
│   └── part-0.parquet
└── month=9
    └── part-0.parquet

複数のファイルを効率的に管理するための機構として、ArrowではHive形式(key=value)のパーティショニングを採用しています。ここでのパーティショニングとは、一つの大きなデータセットを複数の細かなファイルに分割する戦略を意味します。例えば4年間全国の断面交通量情報データの場合では、データを年や月、地域に分けることが考えられます。これにより、必要なデータに素早くアクセス、処理の負担が軽減できることが期待できます。

次のコードは、年と月を表すyearmonthの列をもつデータを年月でパーティショニングした状態でparquet形式で保存する例です。

# パーティショニングの例
# 
d |> 
  write_dataset(path = "data",
                partitioning = c("year", "month"))

これにより次のようなフォルダ構成ができあがります。

data/
  - year=2021
      - month=1
          - part-0.parquet
      - month=2
          - part-0.parquet
  ...
  - year=2022
      - month=1
          - part-0.parquet
      - month=2
          - part-0.parquet

スキーマの指定

Arrowではデータが列ごとに決まった型(まとまった配置)であることを前提とします。そのため列のデータタイプは細かく定義されることになります。 自動的にデータタイプが与えられますが、スキーマの指定によって任意のデータタイプを列に割り振ることができます。

source(here::here("R/schema.R")) # 断面交通量情報データのためのスキーマ定義

jartic_typeB_schema
Schema
datetime: timestamp[ms, tz=Asia/Tokyo]
source_code: string
location_no: int32
location_name: string
meshcode10km: string
link_type: int32
link_no: int32
traffic: int32
to_link_end_10m: string
link_ver: int32
year: int32
month: int32
open_dataset(here::here("data/typeB/13_tokyo/year=2021/month=1/"),
               schema = jartic_typeB_schema)
FileSystemDataset with 1 Parquet file
datetime: timestamp[ms, tz=Asia/Tokyo]
source_code: string
location_no: int32
location_name: string
meshcode10km: string
link_type: int32
link_no: int32
traffic: int32
to_link_end_10m: string
link_ver: int32
year: int32
month: int32

session information

sessioninfo::session_info(info = "platform")
─ Session info ───────────────────────────────────────────────────────────────
 setting  value
 version  R version 4.2.1 (2022-06-23)
 os       macOS Ventura 13.1
 system   aarch64, darwin20
 ui       X11
 language (EN)
 collate  en_US.UTF-8
 ctype    en_US.UTF-8
 tz       Asia/Tokyo
 date     2022-12-21
 pandoc   2.19.2 @ /Applications/RStudio.app/Contents/Resources/app/quarto/bin/tools/ (via rmarkdown)

──────────────────────────────────────────────────────────────────────────────