library(arrow) # 10.0.1
library(dplyr) # 1.0.10
library(data.table) # 1.14.6
arrowパッケージの基礎
この文書は2022年12月17日に開催されたR研究集会2022での発表「Rによる大規模データの処理」の補足資料です。 発表の中で扱った{arrow}
パッケージの使い方を紹介します。
スライド: https://speakerdeck.com/s_uryu/introduction-to-r-arrow
パッケージの読み込み
{arrow}
パッケージの他にいくつかのパッケージを利用します。
{arrow}パッケージを使った処理の流れ
{arrow}パッケージを使った処理は次の手順で行うことになります。
- データ(
.csv
や.parquet
、.arrow
)をArrow Tableオブジェクトに読み込む - dplyrパッケージのデータ関数による操作・集計
collect()
関数によるデータフレームへの変換
全体像を理解するために簡単な例を示します。
まず、データをArrowでのインメモリの処理を行うために読み込む必要があります。 {arrow}ではcsvやparquetなどの形式のファイルを読みこみ、Arrow Tableオブジェクトとします。 単一または複数のファイルを読み込む関数が用意されていますが、ここでは単一のparquetファイルを読み込むための関数read_parquet()
を使います。
<-
d read_parquet(
::here("data/zoo.parquet"),
here# as_dat_frame = FALSEでArrow Tableオブジェクトになる
as_data_frame = FALSE)
オブジェクトのクラスを確認します。
class(d)
[1] "Table" "ArrowTabular" "ArrowObject" "R6"
この状態ではデータの値を参照することはできません。 データの大きさ、データがどのような列を持っているのか(列名とデータタイプ)の情報が表示されるだけです。
d
Table
22 rows x 4 columns
$taxon <string>
$name <string>
$body_length_cm <double>
$weight_kg <double>
See $metadata for additional Schema metadata
Arrow Tableオブジェクトに対して、{dplyr}の関数を使ったデータ操作が適用できます。{arrow}では{dplyr}の他に{stringr}や{lubridate}、組み込みのRの関数をラップした関数が提供されており、データフレームを操作するようにデータへの処理を行うことができます。
# dplyrのデータ操作を行う
<-
result |>
d select(name, taxon, body_length_cm) |>
filter(taxon %in% c("霊長類", "齧歯類", "鳥類")) |>
group_by(taxon) |>
summarise(body_length_mean = mean(body_length_cm))
result
Table (query)
taxon: string
body_length_mean: double
See $.data for the source Arrow object
この状態でもデータの実の値は表示されません。 しかしデータに処理を加える前の状態とは異なることがわかります。 具体的には一行目の出力がTable
からTable (query)
と変化した点、 列名の表示が絞り込まれている点です。
では、記述した処理をデータに適用するにはどうするのでしょうか。 {arrow}ではデータベース上のデータを{dplyr}の関数を使って操作する際や{dplyr}をバックエンドにdata.tableオブジェクトを操作する{dtplyr}のように、collect()
関数によって処理を適用します。 これにより{arrow}のオブジェクトからデータフレームを得ることができます。
|>
result # データフレームとして返り値を得る
collect()
# A tibble: 3 × 2
taxon body_length_mean
<chr> <dbl>
1 鳥類 82.0
2 霊長類 66.5
3 齧歯類 87
{duckdb}パッケージとの連携
{arrow}ではArrow C++へのインターフェイスを提供し、list_compute_functions()
によってその関数の一覧を確認することができます。またArrow query engineと呼ばれる{dplyr}などの関数は?acero
により確認できます。 一方、その他の多くのRの関数は{arrow}で直接利用できません。 その場合、{arrow}は一度データをデータフレームに変換してから処理を継続するか、エラーで処理を停止させることになります。
例を示すために別のデータ(徳島県の2022年10月の断面交通量)を用意します。
<-
d read_parquet(
::here("data/typeB/36_tokushima/year=2022/month=10/part-0.parquet"),
hereas_data_frame = FALSE)
まずはデータフレームに変換する場合です。この場合、警告が出るものの処理は行われます。大きなデータを扱う場合には、Arrowの処理の恩恵を受けることができないために予想以上に時間がかかることがあるかもしれません。
|>
d select(datetime) |>
mutate(is_jholiday = zipangu::is_jholiday(datetime))
Warning: Expression zipangu::is_jholiday(datetime) not supported in Arrow;
pulling data into R
datetime is_jholiday
1: 2022-10-01 00:00:00 FALSE
2: 2022-10-01 00:00:00 FALSE
3: 2022-10-01 00:00:00 FALSE
4: 2022-10-01 00:00:00 FALSE
5: 2022-10-01 00:00:00 FALSE
---
2384769: 2022-10-27 02:00:00 FALSE
2384770: 2022-10-27 02:00:00 FALSE
2384771: 2022-10-27 02:00:00 FALSE
2384772: 2022-10-27 02:00:00 FALSE
2384773: 2022-10-27 02:00:00 FALSE
続いて処理が停止される例です。
|>
d group_by(location_no) |>
slice_min(order_by = traffic, n = 1)
Error: Slicing grouped data not supported in Arrow
このような時にはArrowオブジェクトを{duckbd}パッケージが提供する仮想的なDuckDBオブジェクトに変換することで処理を継続できることがあります。
|>
d group_by(location_no) |>
# {duckdb}へデータを渡す
to_duckdb() |>
slice_min(order_by = traffic, n = 1) |>
collect()
# A tibble: 25,237 × 10
# Groups: location_no [274]
datetime source_…¹ locat…² locat…³ meshc…⁴ link_…⁵ link_no traffic
<dttm> <chr> <int> <chr> <chr> <int> <int> <int>
1 2022-10-29 17:50:00 3028 54 西ノ丸… 513404 2 662 2
2 2022-10-29 19:50:00 3028 54 西ノ丸… 513404 2 662 2
3 2022-10-29 20:15:00 3028 54 西ノ丸… 513404 2 662 2
4 2022-10-29 21:40:00 3028 54 西ノ丸… 513404 2 662 2
5 2022-10-05 17:55:00 3028 54 西ノ丸… 513404 2 662 2
6 2022-10-05 18:05:00 3028 54 西ノ丸… 513404 2 662 2
7 2022-10-05 18:50:00 3028 54 西ノ丸… 513404 2 662 2
8 2022-10-05 18:55:00 3028 54 西ノ丸… 513404 2 662 2
9 2022-10-05 19:20:00 3028 54 西ノ丸… 513404 2 662 2
10 2022-10-05 20:15:00 3028 54 西ノ丸… 513404 2 662 2
# … with 25,227 more rows, 2 more variables: to_link_end_10m <chr>,
# link_ver <int>, and abbreviated variable names ¹source_code, ²location_no,
# ³location_name, ⁴meshcode10km, ⁵link_type
複数のファイルを一度に読み込む
{arrow}では複数のcsv、parquetファイルを読み込むための関数、open_dataset()
を用意しています。この関数の利用により、共通の列配列をもつデータを一度に読み込むことができます。
# 東京都の2021年のデータ(12ヶ月分)を読み込む
open_dataset(here::here("data/typeB/13_tokyo/year=2021/"))
FileSystemDataset with 12 Parquet files
datetime: timestamp[us, tz=Asia/Tokyo]
source_code: string
location_no: int32
location_name: string
meshcode10km: string
link_type: int32
link_no: int32
traffic: int32
to_link_end_10m: string
link_ver: int32
month: int32
See $metadata for additional Schema metadata
::dir_tree(
fs::here("data/typeB/13_tokyo/year=2021/")) here
/Users/suryu/Documents/projects2022/talk_221217_rjpusers/data/typeB/13_tokyo/year=2021/
├── month=1
│ └── part-0.parquet
├── month=10
│ └── part-0.parquet
├── month=11
│ └── part-0.parquet
├── month=12
│ └── part-0.parquet
├── month=2
│ └── part-0.parquet
├── month=3
│ └── part-0.parquet
├── month=4
│ └── part-0.parquet
├── month=5
│ └── part-0.parquet
├── month=6
│ └── part-0.parquet
├── month=7
│ └── part-0.parquet
├── month=8
│ └── part-0.parquet
└── month=9
└── part-0.parquet
複数のファイルを効率的に管理するための機構として、ArrowではHive形式(key=value
)のパーティショニングを採用しています。ここでのパーティショニングとは、一つの大きなデータセットを複数の細かなファイルに分割する戦略を意味します。例えば4年間全国の断面交通量情報データの場合では、データを年や月、地域に分けることが考えられます。これにより、必要なデータに素早くアクセス、処理の負担が軽減できることが期待できます。
次のコードは、年と月を表すyear
、month
の列をもつデータを年月でパーティショニングした状態でparquet形式で保存する例です。
# パーティショニングの例
#
|>
d write_dataset(path = "data",
partitioning = c("year", "month"))
これにより次のようなフォルダ構成ができあがります。
data/
- year=2021
- month=1
- part-0.parquet
- month=2
- part-0.parquet
...
- year=2022
- month=1
- part-0.parquet
- month=2
- part-0.parquet
スキーマの指定
Arrowではデータが列ごとに決まった型(まとまった配置)であることを前提とします。そのため列のデータタイプは細かく定義されることになります。 自動的にデータタイプが与えられますが、スキーマの指定によって任意のデータタイプを列に割り振ることができます。
source(here::here("R/schema.R")) # 断面交通量情報データのためのスキーマ定義
jartic_typeB_schema
Schema
datetime: timestamp[ms, tz=Asia/Tokyo]
source_code: string
location_no: int32
location_name: string
meshcode10km: string
link_type: int32
link_no: int32
traffic: int32
to_link_end_10m: string
link_ver: int32
year: int32
month: int32
open_dataset(here::here("data/typeB/13_tokyo/year=2021/month=1/"),
schema = jartic_typeB_schema)
FileSystemDataset with 1 Parquet file
datetime: timestamp[ms, tz=Asia/Tokyo]
source_code: string
location_no: int32
location_name: string
meshcode10km: string
link_type: int32
link_no: int32
traffic: int32
to_link_end_10m: string
link_ver: int32
year: int32
month: int32
session information
::session_info(info = "platform") sessioninfo
─ Session info ───────────────────────────────────────────────────────────────
setting value
version R version 4.2.1 (2022-06-23)
os macOS Ventura 13.1
system aarch64, darwin20
ui X11
language (EN)
collate en_US.UTF-8
ctype en_US.UTF-8
tz Asia/Tokyo
date 2022-12-21
pandoc 2.19.2 @ /Applications/RStudio.app/Contents/Resources/app/quarto/bin/tools/ (via rmarkdown)
──────────────────────────────────────────────────────────────────────────────