-
Notifications
You must be signed in to change notification settings - Fork 1
/
when-datapipes-meets-gcs.html
218 lines (169 loc) · 15.1 KB
/
when-datapipes-meets-gcs.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8" />
<meta http-equiv="X-UA-Compatible" content="IE=edge" />
<meta name="HandheldFriendly" content="True" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<meta name="robots" content="" />
<link href="https://fonts.googleapis.com/css?family=Source+Code+Pro|Source+Sans+Pro:300,400,400i,700" rel="stylesheet">
<link rel="stylesheet" type="text/css" href="/theme/stylesheet/style.min.css">
<link rel="stylesheet" type="text/css" href="/theme/pygments/github.min.css">
<link rel="stylesheet" type="text/css" href="/theme/font-awesome/css/font-awesome.min.css">
<link rel="shortcut icon" href="/images/favicon.ico" type="image/x-icon">
<link rel="icon" href="/images/favicon.ico" type="image/x-icon">
<meta name="author" content="Sephi Berry" />
<meta name="description" content="The reality when using pipelines infrastructure" />
<meta name="keywords" content="python pytorch GCP">
<meta property="og:site_name" content="Geo Berry"/>
<meta property="og:title" content="When Datapipes Meets GCS"/>
<meta property="og:description" content="The reality when using pipelines infrastructure"/>
<meta property="og:locale" content="en_US"/>
<meta property="og:url" content="/when-datapipes-meets-gcs.html"/>
<meta property="og:type" content="article"/>
<meta property="article:published_time" content="2022-07-06 00:00:00+03:00"/>
<meta property="article:modified_time" content=""/>
<meta property="article:author" content="/author/sephi-berry.html">
<meta property="article:section" content="posts pipeline"/>
<meta property="article:tag" content="python pytorch GCP"/>
<meta property="og:image" content="/images/avatar_osnx.png">
<title>Geo Berry – When Datapipes Meets GCS</title>
</head>
<body>
<aside>
<div>
<a href="">
<img src="/images/avatar_osnx.png" alt="Sephi's Blog" title="Sephi's Blog">
</a>
<h1><a href="">Sephi's Blog</a></h1>
<p>Data Engineer | Project Manager | Geo-Spatial Specialist</p>
<nav>
<ul class="list">
<li><a href="/pages/about.html#about">About</a></li>
</ul>
</nav>
<ul class="social">
<li><a class="sc-linkedin" href="https://www.linkedin.com/in/berrygis" target="_blank"><i class="fa fa-linkedin"></i></a></li>
<li><a class="sc-github" href="https://github.com/sephib" target="_blank"><i class="fa fa-github"></i></a></li>
<li><a class="sc-twitter" href="https://twitter.com/geosephi" target="_blank"><i class="fa fa-twitter"></i></a></li>
</ul>
</div>
</aside>
<main>
<article class="single">
<header>
<h1 id="when-datapipes-meets-gcs">When Datapipes Meets GCS</h1>
<p>
Posted on Wed 06 July 2022 in <a href="/category/posts-pipeline.html">posts pipeline</a>
</p>
</header>
<div>
<!-- status: draft -->
<p><a rel="pytorch_gcs_logo"><img src="images/Pytorch_GCS_logo.png" width=700 height=300 /></a> </p>
<h1>Background</h1>
<p>Working in the ML arena requires a smart usage of data, in addition to maximum flexibility while manipulation of datasets. A common way to do so is to use <code>pipeline</code>s that allow for a structural manner to manage these processes.<br>
Lately we wanted to run some image manipulation using the <a href="https://pytorch.org">pytorch framework</a>. Since our data is stored on GCP in their buckets (GCS), we thought that we will be able to use <a href="https://pytorch.org/data/main/index.html">pytorch datapipes</a> as our pipeline framework. Of-the-bat it seems simple since the <a href="https://pytorch.org/data/main/torchdata.datapipes.iter.html#io-datapipes">IO datapipes</a> seems to be comprehensive, however as usual - once the implementation starts we found some issue. </p>
<h1>Our Use Case</h1>
<p>We had some images that we needed to convert into embeddings and save them into a bucket. So with the inspiration of <a href="https://pytorch.org/data/main/examples.html">the documented examples</a> we built our <code>datapipe</code> as follows: </p>
<div class="highlight"><pre><span></span><code><span class="k">def</span> <span class="nf">image_datapipe</span><span class="p">(</span><span class="n">root_dir</span><span class="p">):</span>
<span class="n">datapipe</span> <span class="o">=</span> <span class="n">FSSpecFileLister</span><span class="p">(</span><span class="n">root</span><span class="o">=</span><span class="n">root_dir</span><span class="p">)</span>
<span class="n">datapipe</span> <span class="o">=</span> <span class="n">datapipe</span><span class="o">.</span><span class="n">open_file_by_fsspec</span><span class="p">(</span><span class="n">mode</span><span class="o">=</span><span class="s2">"rb"</span><span class="p">)</span>
<span class="n">datapipe</span> <span class="o">=</span> <span class="n">datapipe</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="n">PIL_open</span><span class="p">)</span>
<span class="n">datapipe</span> <span class="o">=</span> <span class="n">datapipe</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="n">row_emb_processor</span><span class="p">)</span>
<span class="n">datapipe</span> <span class="o">=</span> <span class="n">datapipe</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="n">post_process</span><span class="p">)</span>
<span class="n">datapipe</span> <span class="o">=</span> <span class="n">datapipe</span><span class="o">.</span><span class="n">save_by_fsspec</span><span class="p">(</span><span class="n">filepath_fn</span><span class="o">=</span><span class="n">filepath_fn</span><span class="p">,</span> <span class="n">mode</span><span class="o">=</span><span class="s2">"wb"</span><span class="p">)</span>
<span class="k">return</span> <span class="n">datapipe</span>
</code></pre></div>
<h2>Security - looked over (as usual???)</h2>
<ol>
<li><code>FSSpecFileLister</code> is an object to access the files in a filesystem.</li>
<li>
<p>In order to access the GCS filesystem the pytorch's <code>open_file_by_fsspec</code> function uses the library <a href="https://filesystem-spec.readthedocs.io/en/latest/">fsspec</a> with <a href="https://github.com/pytorch/data/blob/cd38927904836f6f67ce33bfaee094fff4078402/torchdata/datapipes/iter/load/fsspec.py#L128">following code</a> </p>
<p><code>python
fs, path = fsspec.core.url_to_fs(root, **self.kwargs)</code> </p>
</li>
</ol>
<p>The problem with this function is that it does not take into account the possibility to pass credential for authentication - <a href="https://discuss.pytorch.org/t/using-a-google-cloud-storage-bucket-for-dataset/146253">see thread on pytorch discuss forum</a> </p>
<ol>
<li>In order to solve this issue we wrapped the <code>fsspec.core.url_to_fs</code> function into an internal function, while using the </li>
</ol>
<div class="highlight"><pre><span></span><code><span class="k">def</span> <span class="nf">_url_to_fs</span><span class="p">(</span><span class="n">root</span><span class="p">,</span> <span class="n">token</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">):</span>
<span class="n">fs</span><span class="p">,</span> <span class="n">path</span> <span class="o">=</span> <span class="n">fsspec</span><span class="o">.</span><span class="n">core</span><span class="o">.</span><span class="n">url_to_fs</span><span class="p">(</span><span class="n">root</span><span class="p">,</span> <span class="n">token</span><span class="o">=</span><span class="n">token_path</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">)</span>
<span class="c1"># hotfix - since the GCS fsspec implementation can return ('gcs', 'gs') as protocol</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">fs</span><span class="o">.</span><span class="n">protocol</span><span class="p">,</span> <span class="n">Tuple</span><span class="p">)</span> <span class="ow">and</span> <span class="nb">len</span><span class="p">(</span><span class="n">fs</span><span class="o">.</span><span class="n">protocol</span><span class="p">)</span> <span class="o">></span> <span class="mi">1</span><span class="p">:</span>
<span class="n">fs</span><span class="o">.</span><span class="n">protocol</span> <span class="o">=</span> <span class="n">fs</span><span class="o">.</span><span class="n">protocol</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span>
<span class="k">return</span> <span class="n">fs</span><span class="p">,</span> <span class="n">path</span>
</code></pre></div>
<ul>
<li>BTW for <code>AWS S3</code> there is a dedicated <a href="https://pytorch.org/data/main/generated/torchdata.datapipes.iter.S3FileLister.html#torchdata.datapipes.iter.S3FileLister">S3FileListener handler</a></li>
</ul>
<h2>Now for the rest of the pipeline</h2>
<ol>
<li>Now we can add to our pipeline additional functionality.<br>
First we will open our image as a stream using <code>PIL.Image</code>: </li>
</ol>
<div class="highlight"><pre><span></span><code><span class="k">def</span> <span class="nf">PIL_open</span><span class="p">(</span><span class="n">data</span><span class="p">):</span>
<span class="k">return</span> <span class="p">(</span>
<span class="n">path_name</span><span class="o">=</span><span class="n">data</span><span class="p">[</span><span class="mi">0</span><span class="p">],</span>
<span class="n">file_stream</span><span class="o">=</span><span class="n">Image</span><span class="o">.</span><span class="n">open</span><span class="p">(</span><span class="n">gfs</span><span class="o">.</span><span class="n">open</span><span class="p">(</span><span class="n">data</span><span class="p">[</span><span class="mi">0</span><span class="p">]))</span>
<span class="p">)</span>
</code></pre></div>
<ul>
<li>
<p>the <code>gfs</code> is using the implementation of <code>fsspec</code> for GCS - <a href="https://gcsfs.readthedocs.io/en/latest/">gcsfs</a></p>
</li>
<li>
<p>In the next step we will create embeddings from our image stream. We used a wrapped <a href="https://github.com/openai/CLIP">CLIP model</a> (<code>image_processor</code>). This is always a good practice in order to allow for the possibility to replace the model in the future. </p>
</li>
</ul>
<div class="highlight"><pre><span></span><code><span class="k">def</span> <span class="nf">row_emb_processor</span><span class="p">(</span><span class="n">data</span><span class="p">):</span>
<span class="n">data</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span> <span class="o">=</span> <span class="n">image_processor</span><span class="o">.</span><span class="n">embed</span><span class="p">(</span><span class="n">data</span><span class="p">[</span><span class="mi">1</span><span class="p">])</span>
<span class="n">data</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span> <span class="o">=</span> <span class="n">data</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span><span class="o">.</span><span class="n">squeeze</span><span class="p">()</span><span class="o">.</span><span class="n">numpy</span><span class="p">()</span><span class="o">.</span><span class="n">tolist</span><span class="p">()</span>
<span class="k">return</span> <span class="n">data</span>
</code></pre></div>
<ol>
<li>Since we want to save the embeddings in a <code>parquet</code> format, we will use the <code>post_process</code> step for this purpose, while allowing to consume the embeddings as <code>pandas dataframe</code>s. </li>
</ol>
<div class="highlight"><pre><span></span><code><span class="k">def</span> <span class="nf">post_process</span><span class="p">(</span><span class="n">data</span><span class="p">):</span>
<span class="n">df</span> <span class="o">=</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">([[</span><span class="n">data</span><span class="p">[</span><span class="mi">1</span><span class="p">]]],</span>
<span class="n">index</span><span class="o">=</span><span class="p">[</span><span class="n">data</span><span class="p">[</span><span class="mi">0</span><span class="p">]],</span>
<span class="n">columns</span><span class="o">=</span><span class="p">[</span><span class="s2">"emb_CLIP"</span><span class="p">],</span>
<span class="p">)</span>
<span class="k">return</span> <span class="p">(</span><span class="n">data</span><span class="p">,</span> <span class="n">df</span><span class="o">.</span><span class="n">to_parquet</span><span class="p">())</span>
</code></pre></div>
<ol>
<li>Lastly, on our final step we will use the <code>save_by_fsspec</code> method to save the embeddings back into a GCS bucket. Since we already fixed the <code>url_to_fs</code> accessing the bucket is straight forward. All we need is to supply the target name of the file. </li>
</ol>
<div class="highlight"><pre><span></span><code><span class="k">def</span> <span class="nf">filepath_fn</span><span class="p">(</span><span class="n">data</span><span class="p">):</span>
<span class="k">return</span> <span class="sa">f</span><span class="s2">"gs://bucket/folder/asset_</span><span class="si">{</span><span class="n">data</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="si">}</span><span class="s2">.parq"</span>
</code></pre></div>
<h1>Additional Thoughts</h1>
<ul>
<li>Moving the data between the various pipe steps can be easier when defining a <code>dataclass</code> object - thus easily referencing the required property of the image in the various stages.<br>
e.g. instead of calling <code>data[1]</code> for the embedding - way not use <em><code>data.file_stream</code></em>. Hopefully this will be elaborated in a different post.</li>
</ul>
<h1>Summary</h1>
<p>As a strong advocate for embracing <em>pipelines</em> where ever we can, the implementation of the various stages can be challenging. There is no place to accumulate any technical debt in the security realm - thus solving the secure access between <code>pytorch datapipes</code> and <code>gcs</code> will allow for reuse and agility in future projects. </p>
</div>
<div class="tag-cloud">
<p>
<a href="/tag/python-pytorch-gcp.html">python pytorch GCP</a>
</p>
</div>
</article>
<footer>
<p>© </p>
<p>Powered by <a href="http://getpelican.com" target="_blank">Pelican</a> - <a href="https://github.com/alexandrevicenzi/flex" target="_blank">Flex</a> theme by <a href="http://alexandrevicenzi.com" target="_blank">Alexandre Vicenzi</a></p> </footer>
</main>
<script type="application/ld+json">
{
"@context" : "http://schema.org",
"@type" : "Blog",
"name": " Geo Berry ",
"url" : "",
"image": "/images/avatar_osnx.png",
"description": "Sephi's Thoughts and Writings"
}
</script>
</body>
</html>