Skip to content

Commit

Permalink
Eliminate reference cycles to improve Ctx cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
ihnorton committed Dec 11, 2019
1 parent c5157dc commit e597cd0
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 51 deletions.
2 changes: 1 addition & 1 deletion tiledb/indexing.pxd
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from .libtiledb cimport Array, ArraySchema, Query

cdef class DomainIndexer:
cdef Array array
cdef object array_ref
cdef ArraySchema schema
cdef Query query
15 changes: 11 additions & 4 deletions tiledb/indexing.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ IF TILEDBPY_MODULAR:

import numpy as np
from .array import DenseArray, SparseArray
import weakref

def _index_as_tuple(idx):
"""Forces scalar index objects to a tuple representation"""
Expand All @@ -25,14 +26,21 @@ cdef class DomainIndexer(object):
return indexer

def __init__(self, Array array, query = None):
self.array = array
self.array_ref = weakref.ref(array)
self.schema = array.schema
self.query = query

@property
def array(self):
assert self.array_ref() is not None, \
"Internal error: invariant violation (index[] with dead array_ref)"
return self.array_ref()

def __getitem__(self, object idx):
# implements domain-based indexing: slice by domain coordinates, not 0-based python indexing

cdef Domain dom = self.schema.domain
cdef ArraySchema schema = self.array.schema
cdef Domain dom = schema.domain
cdef ndim = dom.ndim
cdef list attr_names = list()

Expand All @@ -59,7 +67,7 @@ cdef class DomainIndexer(object):
assert isinstance(subidx, slice)
subarray[i] = subidx.start, subidx.stop

attr_names = list(self.schema.attr(i).name for i in range(self.schema.nattr))
attr_names = list(schema.attr(i).name for i in range(schema.nattr))

order = None
# TODO make coords optional for array.domain_index. there are no kwargs in slicing[], so
Expand All @@ -85,7 +93,6 @@ cdef class DomainIndexer(object):
else:
raise ValueError("order must be 'C' (TILEDB_ROW_MAJOR), 'F' (TILEDB_COL_MAJOR), or 'G' (TILEDB_GLOBAL_ORDER)")


if isinstance(self.array, SparseArray):
return (<SparseArrayImpl>self.array)._read_sparse_subarray(subarray, attr_names, layout)
elif isinstance(self.array, DenseArray):
Expand Down
9 changes: 4 additions & 5 deletions tiledb/libtiledb.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -1098,6 +1098,7 @@ cdef class ArraySchema(object):
cdef _attr_idx(self, int idx)

cdef class Array(object):
cdef object __weakref__
cdef Ctx ctx
cdef tiledb_array_t* ptr
cdef unicode uri
Expand All @@ -1106,13 +1107,11 @@ cdef class Array(object):
cdef object view_attr # can be None
cdef object key # can be None
cdef object schema
cdef DomainIndexer domain_index

# TODO make this a cython type TBD
cdef DomainIndexer domain_index
cdef object multi_index

cdef object last_fragment_info
cdef Metadata meta
cdef object last_fragment_info

cdef _ndarray_is_varlen(self, np.ndarray array)
cdef _unpack_varlen_query(self, ReadQuery read, unicode name)
Expand Down Expand Up @@ -1149,7 +1148,7 @@ cdef class ReadQuery(object):
cdef object _offsets

cdef class Metadata(object):
cdef Array array
cdef object array_ref

cdef class TileDBError(Exception):
pass
Expand Down
81 changes: 47 additions & 34 deletions tiledb/metadata.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ IF TILEDBPY_MODULAR:
from .libtiledb import *
from .libtiledb cimport *

import weakref

from cython.operator cimport dereference as deref
from cpython.long cimport PyLong_AsLongLong
from cpython.float cimport PyFloat_AsDouble
Expand All @@ -15,8 +17,6 @@ cdef extern from "<utility>" namespace "std" nogil:
cdef unique_ptr[vector[char]] move(unique_ptr[vector[char]])
#cdef unique_ptr[vector[char]] make_unique(vector[char])



cdef object metadata_value_check(val):
if isinstance(val, int):
if val > numeric_limits[int64_t].max():
Expand Down Expand Up @@ -168,10 +168,12 @@ cdef object unpack_metadata_val(tiledb_datatype_t value_type,
return None


cdef object put_metadata(tiledb_ctx_t* ctx_ptr,
tiledb_array_t* array_ptr,
cdef object put_metadata(Array array,
key, val):

cdef tiledb_array_t* array_ptr = array.ptr
cdef tiledb_ctx_t* ctx_ptr = array.ctx.ptr

cdef int rc = TILEDB_OK
cdef vector[char] data_buf = vector[char]()
cdef const char* key_cstr
Expand Down Expand Up @@ -200,10 +202,12 @@ cdef object put_metadata(tiledb_ctx_t* ctx_ptr,

return None

cdef object get_metadata(tiledb_ctx_t* ctx_ptr,
tiledb_array_t* array_ptr,
cdef object get_metadata(array: Array,
key: unicode):

cdef tiledb_array_t* array_ptr = array.ptr
cdef tiledb_ctx_t* ctx_ptr = array.ctx.ptr

cdef:
const char* key_ptr
object key_utf8
Expand Down Expand Up @@ -231,12 +235,12 @@ cdef object get_metadata(tiledb_ctx_t* ctx_ptr,

return unpack_metadata_val(value_type, value_num, value)

cdef put_metadata_dict(tiledb_ctx_t* ctx_ptr, tiledb_array_t* array_ptr, kv):
cdef put_metadata_dict(Array array, kv):

for k,v in kv.iteritems():
put_metadata(ctx_ptr, array_ptr, k, v)
put_metadata(array, k, v)

cdef object load_metadata(tiledb_ctx_t* ctx_ptr, tiledb_array_t* array_ptr, unpack=True):
cdef object load_metadata(Array array, unpack=True):
"""
Load array metadata dict or keys
Expand All @@ -245,6 +249,8 @@ cdef object load_metadata(tiledb_ctx_t* ctx_ptr, tiledb_array_t* array_ptr, unpa
:param unpack: unpack the values into dictionary
:return: dict(k: v) if unpack else list(k)
"""
cdef tiledb_ctx_t* ctx_ptr = array.ctx.ptr
cdef tiledb_array_t* array_ptr = array.ptr

cdef uint64_t metadata_num
rc = tiledb_array_get_metadata_num(ctx_ptr, array_ptr, &metadata_num)
Expand Down Expand Up @@ -296,9 +302,14 @@ cdef object load_metadata(tiledb_ctx_t* ctx_ptr, tiledb_array_t* array_ptr, unpa


cdef class Metadata(object):

def __init__(self, array):
self.array = array
self.array_ref = weakref.ref(array)

@property
def array(self):
assert self.array_ref() is not None, \
"Internal error: invariant violation ([] from gc'd Array)"
return self.array_ref()

def __setitem__(self, key, value):
"""
Expand All @@ -312,7 +323,7 @@ cdef class Metadata(object):
raise ValueError("Unexpected key type '{}': expected str "
"type".format(type(key)))

put_metadata(self.array.ctx.ptr, self.array.ptr, key, value)
put_metadata(self.array, key, value)

def __getitem__(self, key):
"""
Expand All @@ -326,7 +337,7 @@ cdef class Metadata(object):

# `get_metadata` expects unicode
key = ustring(key)
v = get_metadata(self.array.ctx.ptr, self.array.ptr, key)
v = get_metadata(self.array, key)

if v is None:
raise TileDBError("Failed to unpack value for key: '{}'".format(key))
Expand Down Expand Up @@ -359,7 +370,7 @@ cdef class Metadata(object):
# TODO: ensure that the array is not x-locked?

cdef uint32_t rc = 0

cdef tiledb_ctx_t* ctx_ptr = (<Array?>self.array).ctx.ptr
cdef:
tiledb_encryption_type_t key_type = TILEDB_NO_ENCRYPTION
void* key_ptr = NULL
Expand All @@ -378,15 +389,15 @@ cdef class Metadata(object):
key_len = <uint32_t> PyBytes_GET_SIZE(bkey)

rc = tiledb_array_consolidate_metadata_with_key(
self.array.ctx.ptr,
ctx_ptr,
buri,
key_type,
key_ptr,
key_len,
NULL)

if rc != TILEDB_OK:
_raise_ctx_err(self.array.ctx.ptr, rc)
_raise_ctx_err(ctx_ptr, rc)

def __delitem__(self, key):
"""
Expand All @@ -400,29 +411,30 @@ cdef class Metadata(object):
:param key:
:return:
"""
cdef:
tiledb_ctx_t* ctx_ptr = self.array.ctx.ptr
const char* key_ptr
object key_utf8
int32_t rc
cdef tiledb_ctx_t* ctx_ptr = (<Array>self.array).ctx.ptr
cdef tiledb_array_t* array_ptr = (<Array>self.array).ptr
cdef const char* key_ptr
cdef object key_utf8
cdef int32_t rc

key_utf8 = key.encode('UTF-8')
key_ptr = <const char*>key_utf8

rc = tiledb_array_delete_metadata(self.array.ctx.ptr, self.array.ptr, key_ptr)
rc = tiledb_array_delete_metadata(ctx_ptr, array_ptr, key_ptr)
if rc != TILEDB_OK:
_raise_ctx_err(ctx_ptr, rc)

def __len__(self):
cdef:
int32_t rc
uint64_t num
cdef tiledb_ctx_t* ctx_ptr = (<Array>self.array).ctx.ptr
cdef tiledb_array_t* array_ptr = (<Array>self.array).ptr
cdef int32_t rc
cdef uint64_t num

rc = tiledb_array_get_metadata_num(
self.array.ctx.ptr, self.array.ptr, &num)
ctx_ptr, array_ptr, &num)

if rc != TILEDB_OK:
_raise_ctx_err(self.array.ctx.ptr, rc)
_raise_ctx_err(ctx_ptr, rc)

return <int>num

Expand All @@ -432,20 +444,19 @@ cdef class Metadata(object):
:return: List of keys
"""
return load_metadata(self.array.ctx.ptr, self.array.ptr, unpack=False)
return load_metadata(self.array, unpack=False)

def values(self):
# TODO this should be an iterator

data = load_metadata(self.array.ctx.ptr, self.array.ptr, unpack=True)
data = load_metadata(self.array, unpack=False)
return data.values()

def pop(self, key, default=None):
raise NotImplementedError("dict.pop requires read-write access to array")

def items(self):
# TODO this should be an iterator
data = load_metadata(self.array.ctx.ptr, self.array.ptr, unpack=True)
data = load_metadata(self.array, unpack=True)
return tuple( (k, data[k]) for k in data.keys() )

def _set_numpy(self, key, np.ndarray arr, datatype = None):
Expand All @@ -458,6 +469,8 @@ cdef class Metadata(object):
:param arr: 1d NumPy ndarray
:return:
"""
cdef tiledb_ctx_t* ctx_ptr = (<Array>self.array).ctx.ptr
cdef tiledb_array_t* array_ptr = (<Array>self.array).ptr

cdef:
int32_t rc = TILEDB_OK
Expand Down Expand Up @@ -489,12 +502,12 @@ cdef class Metadata(object):
cdef const char* data_ptr = <const char*>np.PyArray_DATA(arr)

rc = tiledb_array_put_metadata(
self.array.ctx.ptr,
self.array.ptr,
ctx_ptr,
array_ptr,
key_cstr,
tiledb_type,
value_num,
data_ptr)

if rc != TILEDB_OK:
_raise_ctx_err(self.array.ctx.ptr, rc)
_raise_ctx_err(ctx_ptr, rc)
14 changes: 9 additions & 5 deletions tiledb/multirange_indexing.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import tiledb
from tiledb import Array, ArraySchema
import os, numpy as np
import sys
import sys, weakref

try:
from tiledb.libtiledb import multi_index
Expand Down Expand Up @@ -74,12 +74,16 @@ class MultiRangeIndexer(object):
#def __init__(self, array: Array, query = None):

def __init__(self, array, query = None):
self.array = array
# TODO remove
if hasattr(array, 'schema'):
self.schema = array.schema
self.array_ref = weakref.ref(array)
self.schema = array.schema
self.query = query

@property
def array(self):
assert self.array_ref() is not None, \
"Internal error: invariant violation (indexing call w/ dead array_ref)"
return self.array_ref()

def getitem_ranges(self, idx):
dom = self.schema.domain
ndim = dom.ndim
Expand Down
8 changes: 6 additions & 2 deletions tiledb/tests/test_libtiledb.py
Original file line number Diff line number Diff line change
Expand Up @@ -2419,7 +2419,7 @@ def test_open(self):
with tiledb.open(uri, config=config) as A:
self.assertEqual(A._ctx_().config(), config)

#@unittest.skipIf(platform.system() != 'Linux', "")
@unittest.skipIf(platform.system() == 'Windows', "")
def test_ctx_thread_cleanup(self):
from concurrent.futures import ThreadPoolExecutor
config = {
Expand All @@ -2434,7 +2434,11 @@ def test_ctx_thread_cleanup(self):

for n in range(0, 10):
if n > 0:
self.assertEqual(start_threads, len(thisproc.threads()))
try:
self.assertEqual(start_threads, len(thisproc.threads()))
except AssertionError as exc:
print("Failed on iteration: ", n)
raise exc
with tiledb.DenseArray(uri, ctx=tiledb.Ctx(config)) as A:
res = A[:]
if n == 0:
Expand Down

0 comments on commit e597cd0

Please sign in to comment.