Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Vended Credentials for Azure Data Lake Store #1146

Open
c-thiel opened this issue Sep 8, 2024 · 5 comments
Open

Support Vended Credentials for Azure Data Lake Store #1146

c-thiel opened this issue Sep 8, 2024 · 5 comments

Comments

@c-thiel
Copy link
Contributor

c-thiel commented Sep 8, 2024

Feature Request / Improvement

Vended-Credentials for Azure Data Lake Store are supported by Java. For getTable / createTable endpoints, the catalog returns a "config" that looks like:

    "config": {
        "adls.sas-token.<storage-account-name>.dfs.core.windows.net": "sv=2023-11-03&st=2024-09-08T11%3A34%3A08Z&....(rest of SAS Token)"
    }

This is currently not respected by Pyiceberg. Instead we get the error:

ValueError: unable to connect to account for Must provide either a connection_string or account_name with credentials!!

Full Traceback:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
File [/opt/conda/lib/python3.11/site-packages/adlfs/spec.py:515](http://localhost:8888/opt/conda/lib/python3.11/site-packages/adlfs/spec.py#line=514), in AzureBlobFileSystem.do_connect(self)
    514     else:
--> 515         raise ValueError(
    516             "Must provide either a connection_string or account_name with credentials!!"
    517         )
    519 except RuntimeError:

ValueError: Must provide either a connection_string or account_name with credentials!!

During handling of the above exception, another exception occurred:

ValueError                                Traceback (most recent call last)
File [/opt/conda/lib/python3.11/site-packages/pyiceberg/table/__init__.py:509](http://localhost:8888/opt/conda/lib/python3.11/site-packages/pyiceberg/table/__init__.py#line=508), in Transaction.append(self, df, snapshot_properties)
    506 data_files = _dataframe_to_data_files(
    507     table_metadata=self.table_metadata, write_uuid=append_files.commit_uuid, df=df, io=self._table.io
    508 )
--> 509 for data_file in data_files:
    510     append_files.append_data_file(data_file)

File [/opt/conda/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py:2354](http://localhost:8888/opt/conda/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py#line=2353), in _dataframe_to_data_files(table_metadata, df, io, write_uuid, counter)
   2353 if table_metadata.spec().is_unpartitioned():
-> 2354     yield from write_file(
   2355         io=io,
   2356         table_metadata=table_metadata,
   2357         tasks=iter([
   2358             WriteTask(write_uuid=write_uuid, task_id=next(counter), record_batches=batches, schema=task_schema)
   2359             for batches in bin_pack_arrow_table(df, target_file_size)
   2360         ]),
   2361     )
   2362 else:

File [/opt/conda/lib/python3.11/concurrent/futures/_base.py:619](http://localhost:8888/opt/conda/lib/python3.11/concurrent/futures/_base.py#line=618), in Executor.map.<locals>.result_iterator()
    618 if timeout is None:
--> 619     yield _result_or_cancel(fs.pop())
    620 else:

File [/opt/conda/lib/python3.11/concurrent/futures/_base.py:317](http://localhost:8888/opt/conda/lib/python3.11/concurrent/futures/_base.py#line=316), in _result_or_cancel(***failed resolving arguments***)
    316 try:
--> 317     return fut.result(timeout)
    318 finally:

File [/opt/conda/lib/python3.11/concurrent/futures/_base.py:456](http://localhost:8888/opt/conda/lib/python3.11/concurrent/futures/_base.py#line=455), in Future.result(self, timeout)
    455 elif self._state == FINISHED:
--> 456     return self.__get_result()
    457 else:

File [/opt/conda/lib/python3.11/concurrent/futures/_base.py:401](http://localhost:8888/opt/conda/lib/python3.11/concurrent/futures/_base.py#line=400), in Future.__get_result(self)
    400 try:
--> 401     raise self._exception
    402 finally:
    403     # Break a reference cycle with the exception in self._exception

File [/opt/conda/lib/python3.11/concurrent/futures/thread.py:58](http://localhost:8888/opt/conda/lib/python3.11/concurrent/futures/thread.py#line=57), in _WorkItem.run(self)
     57 try:
---> 58     result = self.fn(*self.args, **self.kwargs)
     59 except BaseException as exc:

File [/opt/conda/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py:2173](http://localhost:8888/opt/conda/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py#line=2172), in write_file.<locals>.write_parquet(task)
   2172 file_path = f'{table_metadata.location}[/data/](http://localhost:8888/data/){task.generate_data_file_path("parquet")}'
-> 2173 fo = io.new_output(file_path)
   2174 with fo.create(overwrite=True) as fos:

File [/opt/conda/lib/python3.11/site-packages/pyiceberg/io/fsspec.py:331](http://localhost:8888/opt/conda/lib/python3.11/site-packages/pyiceberg/io/fsspec.py#line=330), in FsspecFileIO.new_output(self, location)
    330 uri = urlparse(location)
--> 331 fs = self.get_fs(uri.scheme)
    332 return FsspecOutputFile(location=location, fs=fs)

File [/opt/conda/lib/python3.11/site-packages/pyiceberg/io/fsspec.py:355](http://localhost:8888/opt/conda/lib/python3.11/site-packages/pyiceberg/io/fsspec.py#line=354), in FsspecFileIO._get_fs(self, scheme)
    354     raise ValueError(f"No registered filesystem for scheme: {scheme}")
--> 355 return self._scheme_to_fs[scheme](self.properties)

File [/opt/conda/lib/python3.11/site-packages/pyiceberg/io/fsspec.py:179](http://localhost:8888/opt/conda/lib/python3.11/site-packages/pyiceberg/io/fsspec.py#line=178), in _adlfs(properties)
    177 from adlfs import AzureBlobFileSystem
--> 179 return AzureBlobFileSystem(
    180     connection_string=properties.get(ADLFS_CONNECTION_STRING),
    181     account_name=properties.get(ADLFS_ACCOUNT_NAME),
    182     account_key=properties.get(ADLFS_ACCOUNT_KEY),
    183     sas_token=properties.get(ADLFS_SAS_TOKEN),
    184     tenant_id=properties.get(ADLFS_TENANT_ID),
    185     client_id=properties.get(ADLFS_CLIENT_ID),
    186     client_secret=properties.get(ADLFS_ClIENT_SECRET),
    187 )

File [/opt/conda/lib/python3.11/site-packages/fsspec/spec.py:80](http://localhost:8888/opt/conda/lib/python3.11/site-packages/fsspec/spec.py#line=79), in _Cached.__call__(cls, *args, **kwargs)
     79 else:
---> 80     obj = super().__call__(*args, **kwargs)
     81     # Setting _fs_token here causes some static linters to complain.

File [/opt/conda/lib/python3.11/site-packages/adlfs/spec.py:344](http://localhost:8888/opt/conda/lib/python3.11/site-packages/adlfs/spec.py#line=343), in AzureBlobFileSystem.__init__(self, account_name, account_key, connection_string, credential, sas_token, request_session, socket_timeout, blocksize, client_id, client_secret, tenant_id, anon, location_mode, loop, asynchronous, default_fill_cache, default_cache_type, version_aware, assume_container_exists, max_concurrency, timeout, connection_timeout, read_timeout, account_host, **kwargs)
    339     (
    340         self.credential,
    341         self.sync_credential,
    342     ) = self._get_default_azure_credential(**kwargs)
--> 344 self.do_connect()
    345 weakref.finalize(self, sync, self.loop, close_service_client, self)

File [/opt/conda/lib/python3.11/site-packages/adlfs/spec.py:525](http://localhost:8888/opt/conda/lib/python3.11/site-packages/adlfs/spec.py#line=524), in AzureBlobFileSystem.do_connect(self)
    524 except Exception as e:
--> 525     raise ValueError(f"unable to connect to account for {e}")

ValueError: unable to connect to account for Must provide either a connection_string or account_name with credentials!!

During handling of the above exception, another exception occurred:

ValueError                                Traceback (most recent call last)
File [/opt/conda/lib/python3.11/site-packages/adlfs/spec.py:515](http://localhost:8888/opt/conda/lib/python3.11/site-packages/adlfs/spec.py#line=514), in AzureBlobFileSystem.do_connect(self)
    514     else:
--> 515         raise ValueError(
    516             "Must provide either a connection_string or account_name with credentials!!"
    517         )
    519 except RuntimeError:

ValueError: Must provide either a connection_string or account_name with credentials!!

During handling of the above exception, another exception occurred:

ValueError                                Traceback (most recent call last)
Cell In[8], line 1
----> 1 table.append(pa_df)

File [/opt/conda/lib/python3.11/site-packages/pyiceberg/table/__init__.py:1578](http://localhost:8888/opt/conda/lib/python3.11/site-packages/pyiceberg/table/__init__.py#line=1577), in Table.append(self, df, snapshot_properties)
   1570 """
   1571 Shorthand API for appending a PyArrow table to the table.
   1572 
   (...)
   1575     snapshot_properties: Custom properties to be added to the snapshot summary
   1576 """
   1577 with self.transaction() as tx:
-> 1578     tx.append(df=df, snapshot_properties=snapshot_properties)

File [/opt/conda/lib/python3.11/site-packages/pyiceberg/table/__init__.py:503](http://localhost:8888/opt/conda/lib/python3.11/site-packages/pyiceberg/table/__init__.py#line=502), in Transaction.append(self, df, snapshot_properties)
    500 update_snapshot = self.update_snapshot(snapshot_properties=snapshot_properties)
    501 append_method = update_snapshot.merge_append if manifest_merge_enabled else update_snapshot.fast_append
--> 503 with append_method() as append_files:
    504     # skip writing data files if the dataframe is empty
    505     if df.shape[0] > 0:
    506         data_files = _dataframe_to_data_files(
    507             table_metadata=self.table_metadata, write_uuid=append_files.commit_uuid, df=df, io=self._table.io
    508         )

File [/opt/conda/lib/python3.11/site-packages/pyiceberg/table/__init__.py:2094](http://localhost:8888/opt/conda/lib/python3.11/site-packages/pyiceberg/table/__init__.py#line=2093), in UpdateTableMetadata.__exit__(self, _, value, traceback)
   2092 def __exit__(self, _: Any, value: Any, traceback: Any) -> None:
   2093     """Close and commit the change."""
-> 2094     self.commit()

File [/opt/conda/lib/python3.11/site-packages/pyiceberg/table/__init__.py:2090](http://localhost:8888/opt/conda/lib/python3.11/site-packages/pyiceberg/table/__init__.py#line=2089), in UpdateTableMetadata.commit(self)
   2089 def commit(self) -> None:
-> 2090     self._transaction._apply(*self._commit())

File [/opt/conda/lib/python3.11/site-packages/pyiceberg/table/__init__.py:3220](http://localhost:8888/opt/conda/lib/python3.11/site-packages/pyiceberg/table/__init__.py#line=3219), in _SnapshotProducer._commit(self)
   3210 summary = self._summary(self.snapshot_properties)
   3212 manifest_list_file_path = _generate_manifest_list_path(
   3213     location=self._transaction.table_metadata.location,
   3214     snapshot_id=self._snapshot_id,
   3215     attempt=0,
   3216     commit_uuid=self.commit_uuid,
   3217 )
   3218 with write_manifest_list(
   3219     format_version=self._transaction.table_metadata.format_version,
-> 3220     output_file=self._io.new_output(manifest_list_file_path),
   3221     snapshot_id=self._snapshot_id,
   3222     parent_snapshot_id=self._parent_snapshot_id,
   3223     sequence_number=next_sequence_number,
   3224 ) as writer:
   3225     writer.add_manifests(new_manifests)
   3227 snapshot = Snapshot(
   3228     snapshot_id=self._snapshot_id,
   3229     parent_snapshot_id=self._parent_snapshot_id,
   (...)
   3233     schema_id=self._transaction.table_metadata.current_schema_id,
   3234 )

File [/opt/conda/lib/python3.11/site-packages/pyiceberg/io/fsspec.py:331](http://localhost:8888/opt/conda/lib/python3.11/site-packages/pyiceberg/io/fsspec.py#line=330), in FsspecFileIO.new_output(self, location)
    322 """Get an FsspecOutputFile instance to write bytes to the file at the given location.
    323 
    324 Args:
   (...)
    328     FsspecOutputFile: An FsspecOutputFile instance for the given location.
    329 """
    330 uri = urlparse(location)
--> 331 fs = self.get_fs(uri.scheme)
    332 return FsspecOutputFile(location=location, fs=fs)

File [/opt/conda/lib/python3.11/site-packages/pyiceberg/io/fsspec.py:355](http://localhost:8888/opt/conda/lib/python3.11/site-packages/pyiceberg/io/fsspec.py#line=354), in FsspecFileIO._get_fs(self, scheme)
    353 if scheme not in self._scheme_to_fs:
    354     raise ValueError(f"No registered filesystem for scheme: {scheme}")
--> 355 return self._scheme_to_fs[scheme](self.properties)

File [/opt/conda/lib/python3.11/site-packages/pyiceberg/io/fsspec.py:179](http://localhost:8888/opt/conda/lib/python3.11/site-packages/pyiceberg/io/fsspec.py#line=178), in _adlfs(properties)
    176 def _adlfs(properties: Properties) -> AbstractFileSystem:
    177     from adlfs import AzureBlobFileSystem
--> 179     return AzureBlobFileSystem(
    180         connection_string=properties.get(ADLFS_CONNECTION_STRING),
    181         account_name=properties.get(ADLFS_ACCOUNT_NAME),
    182         account_key=properties.get(ADLFS_ACCOUNT_KEY),
    183         sas_token=properties.get(ADLFS_SAS_TOKEN),
    184         tenant_id=properties.get(ADLFS_TENANT_ID),
    185         client_id=properties.get(ADLFS_CLIENT_ID),
    186         client_secret=properties.get(ADLFS_ClIENT_SECRET),
    187     )

File [/opt/conda/lib/python3.11/site-packages/fsspec/spec.py:80](http://localhost:8888/opt/conda/lib/python3.11/site-packages/fsspec/spec.py#line=79), in _Cached.__call__(cls, *args, **kwargs)
     78     return cls._cache[token]
     79 else:
---> 80     obj = super().__call__(*args, **kwargs)
     81     # Setting _fs_token here causes some static linters to complain.
     82     obj._fs_token_ = token

File [/opt/conda/lib/python3.11/site-packages/adlfs/spec.py:344](http://localhost:8888/opt/conda/lib/python3.11/site-packages/adlfs/spec.py#line=343), in AzureBlobFileSystem.__init__(self, account_name, account_key, connection_string, credential, sas_token, request_session, socket_timeout, blocksize, client_id, client_secret, tenant_id, anon, location_mode, loop, asynchronous, default_fill_cache, default_cache_type, version_aware, assume_container_exists, max_concurrency, timeout, connection_timeout, read_timeout, account_host, **kwargs)
    333 if (
    334     self.credential is None
    335     and self.anon is False
    336     and self.sas_token is None
    337     and self.account_key is None
    338 ):
    339     (
    340         self.credential,
    341         self.sync_credential,
    342     ) = self._get_default_azure_credential(**kwargs)
--> 344 self.do_connect()
    345 weakref.finalize(self, sync, self.loop, close_service_client, self)
    347 if self.credential is not None:

File [/opt/conda/lib/python3.11/site-packages/adlfs/spec.py:525](http://localhost:8888/opt/conda/lib/python3.11/site-packages/adlfs/spec.py#line=524), in AzureBlobFileSystem.do_connect(self)
    522     self.do_connect()
    524 except Exception as e:
--> 525     raise ValueError(f"unable to connect to account for {e}")

ValueError: unable to connect to account for Must provide either a connection_string or account_name with credentials!!
@sungwy
Copy link
Collaborator

sungwy commented Sep 9, 2024

Hi @c-thiel thank you for raising this issue. I'm not an expert in Azure Data Lake Store, but I could help look into this issue together.

adls.sas-token..dfs.core.windows.net

It looks like the prefix for the secret name you posted is "adls" whereas the secret name we expect is "adlfs".

ADLFS_SAS_TOKEN = "adlfs.sas-token"

Could you confirm if this is the case? This could be a typo on your comment above, or on the REST Catalog server side, which should be fixed

@c-thiel
Copy link
Contributor Author

c-thiel commented Sep 10, 2024

@sungwy in my comment as well as in the catalog I am using "adls.sas-token" which is exactly what Java and Spark expect:
https://github.com/apache/iceberg/blob/4873b4b7534de0bcda2e1e0366ffcf83943dc906/azure/src/main/java/org/apache/iceberg/azure/AzureProperties.java#L33 .
We can easily send both from catalog side - but it would be great if we wouldn't have to.

Is there a reason for pyiceberg not using the same prefix as java?

@ndrluis
Copy link
Collaborator

ndrluis commented Sep 10, 2024

The adlfs prefix is wrong, we already have a PR to fix the prefix in #961

@sungwy
Copy link
Collaborator

sungwy commented Sep 10, 2024

The adlfs prefix is wrong, we already have a PR to fix the prefix in #961

Thanks for pointing that out @ndrluis . Let me do a pass through it and get it merged in to resolve this issue.

@sungwy
Copy link
Collaborator

sungwy commented Sep 10, 2024

@sungwy in my comment as well as in the catalog I am using "adls.sas-token" which is exactly what Java and Spark expect: https://github.com/apache/iceberg/blob/4873b4b7534de0bcda2e1e0366ffcf83943dc906/azure/src/main/java/org/apache/iceberg/azure/AzureProperties.java#L33 . We can easily send both from catalog side - but it would be great if we wouldn't have to.

Is there a reason for pyiceberg not using the same prefix as java?

Thanks for pointing that out @c-thiel ! Like I mentioned, I'm not too familiar with the Azure Data Lake integration, but it looks like @ndrluis has the right solution ready for this issue 🙂

The PR has been merged into main - would be able to help confirm if the fix in main resolves this issue @c-thiel ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants