Skip to content

Commit

Permalink
Implement project node
Browse files Browse the repository at this point in the history
Implement project node. Improve error handling and general tidy up of code.
  • Loading branch information
davesearle committed Sep 7, 2023
1 parent c50892e commit 98cac51
Show file tree
Hide file tree
Showing 23 changed files with 328 additions and 104 deletions.
38 changes: 25 additions & 13 deletions csharp/src/Apache.Arrow.Acero/CLib.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public struct GArrowStringScalar { }
public struct GArrowInt8Scalar { }
public struct GArrowSortOptions { }
public struct GArrowSortKey { }
public struct GArrowProjectNodeOptions { }
public struct GArrowInt32Scalar { }

public enum GArrowJoinType
{
Expand All @@ -66,16 +68,16 @@ public enum GArrowSortOrder
public const string DllName = "libarrow-glib-1300.dll";

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_execute_plan_new")]
public static extern unsafe GArrowExecutePlan* garrow_execute_plan_new(GError** error);
public static extern unsafe GArrowExecutePlan* garrow_execute_plan_new(out GError** error);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_source_node_options_new_record_batch")]
public static extern unsafe GArrowSourceNodeOptions* garrow_source_node_options_new_record_batch(GArrowRecordBatch* record_batch);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_record_batch_import")]
public static extern unsafe GArrowRecordBatch* garrow_record_batch_import(CArrowArray* c_abi_array, GArrowSchema* schema, GError** error);
public static extern unsafe GArrowRecordBatch* garrow_record_batch_import(CArrowArray* c_abi_array, GArrowSchema* schema, out GError** error);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_schema_import")]
public static extern unsafe GArrowSchema* garrow_schema_import(CArrowSchema* c_abi_schema, GError** error);
public static extern unsafe GArrowSchema* garrow_schema_import(CArrowSchema* c_abi_schema, out GError** error);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_schema_get_field")]
public static extern unsafe GArrowField* garrow_schema_get_field(GArrowSchema* schema, uint i);
Expand All @@ -84,22 +86,22 @@ public enum GArrowSortOrder
public static extern unsafe bool garrow_field_is_nullable(GArrowField* field);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_hash_join_node_options_new")]
public static extern unsafe GArrowHashJoinNodeOptions* garrow_hash_join_node_options_new(GArrowJoinType type, IntPtr left_keys, uint n_left_keys, IntPtr right_keys, uint n_right_keys, GError** error);
public static extern unsafe GArrowHashJoinNodeOptions* garrow_hash_join_node_options_new(GArrowJoinType type, IntPtr left_keys, uint n_left_keys, IntPtr right_keys, uint n_right_keys, out GError** error);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_execute_plan_build_hash_join_node")]
public static extern unsafe GArrowExecuteNode* garrow_execute_plan_build_hash_join_node(GArrowExecutePlan* plan, GArrowExecuteNode* left, GArrowExecuteNode* right, GArrowHashJoinNodeOptions* options, GError** error);
public static extern unsafe GArrowExecuteNode* garrow_execute_plan_build_hash_join_node(GArrowExecutePlan* plan, GArrowExecuteNode* left, GArrowExecuteNode* right, GArrowHashJoinNodeOptions* options, out GError** error);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_execute_plan_build_source_node")]
public static extern unsafe GArrowExecuteNode* garrow_execute_plan_build_source_node(GArrowExecutePlan* plan, GArrowSourceNodeOptions* options, GError** error);
public static extern unsafe GArrowExecuteNode* garrow_execute_plan_build_source_node(GArrowExecutePlan* plan, GArrowSourceNodeOptions* options, out GError** error);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_execute_plan_validate")]
public static extern unsafe bool garrow_execute_plan_validate(GArrowExecutePlan* plan, GError** error);
public static extern unsafe bool garrow_execute_plan_validate(GArrowExecutePlan* plan, out GError** error);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_sink_node_options_new")]
public static extern unsafe GArrowSinkNodeOptions* garrow_sink_node_options_new();

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_execute_plan_build_sink_node")]
public static extern unsafe GArrowExecuteNode* garrow_execute_plan_build_sink_node(GArrowExecutePlan* plan, GArrowExecuteNode* input, GArrowSinkNodeOptions* options, GError** error);
public static extern unsafe GArrowExecuteNode* garrow_execute_plan_build_sink_node(GArrowExecutePlan* plan, GArrowExecuteNode* input, GArrowSinkNodeOptions* options, out GError** error);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_execute_plan_start")]
public static extern unsafe void garrow_execute_plan_start(GArrowExecutePlan* plan);
Expand All @@ -111,16 +113,16 @@ public enum GArrowSortOrder
public static extern unsafe GArrowRecordBatchReader* garrow_sink_node_options_get_reader(GArrowSinkNodeOptions* options, GArrowSchema* schema);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_record_batch_reader_export")]
public static extern unsafe CArrowArrayStream* garrow_record_batch_reader_export(GArrowRecordBatchReader* reader, GError** error);
public static extern unsafe CArrowArrayStream* garrow_record_batch_reader_export(GArrowRecordBatchReader* reader, out GError** error);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_record_batch_reader_import")]
public static extern unsafe GArrowRecordBatchReader* garrow_record_batch_reader_import(CArrowArrayStream* c_abi_array_stream, GError** error);
public static extern unsafe GArrowRecordBatchReader* garrow_record_batch_reader_import(CArrowArrayStream* c_abi_array_stream, out GError** error);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_source_node_options_new_record_batch_reader")]
public static extern unsafe GArrowSourceNodeOptions* garrow_source_node_options_new_record_batch_reader(GArrowRecordBatchReader* reader);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_execute_plan_build_filter_node")]
public static extern unsafe GArrowExecuteNode* garrow_execute_plan_build_filter_node(GArrowExecutePlan* plan, GArrowExecuteNode* input, GArrowFilterNodeOptions* options, GError** error);
public static extern unsafe GArrowExecuteNode* garrow_execute_plan_build_filter_node(GArrowExecutePlan* plan, GArrowExecuteNode* input, GArrowFilterNodeOptions* options, out GError** error);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_filter_node_options_new")]
public static extern unsafe GArrowFilterNodeOptions* garrow_filter_node_options_new(IntPtr expression);
Expand All @@ -129,7 +131,7 @@ public enum GArrowSortOrder
public static extern unsafe GArrowCallExpression* garrow_call_expression_new(IntPtr function, IntPtr arguments, GArrowFunctionOptions* options);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_field_expression_new")]
public static extern unsafe GArrowFieldExpression* garrow_field_expression_new(IntPtr reference, GError** error);
public static extern unsafe GArrowFieldExpression* garrow_field_expression_new(IntPtr reference, out GError** error);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_literal_expression_new")]
public static extern unsafe GArrowFieldExpression* garrow_literal_expression_new(GArrowDatum* datum);
Expand All @@ -156,10 +158,20 @@ public enum GArrowSortOrder
public static extern unsafe GArrowSortOptions* garrow_sort_options_new(GList* sort_keys);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_sort_key_new")]
public static extern unsafe GArrowSortKey* garrow_sort_key_new(IntPtr target, GArrowSortOrder order, GError** error);
public static extern unsafe GArrowSortKey* garrow_sort_key_new(IntPtr target, GArrowSortOrder order, out GError** error);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_execute_plan_build_node")]
public static extern unsafe GArrowExecuteNode* garrow_execute_plan_build_node(GArrowExecutePlan* plan, IntPtr factory_name, IntPtr inputs, IntPtr options, out GError **error);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_project_node_options_new")]
public static extern unsafe GArrowProjectNodeOptions* garrow_project_node_options_new(GList* expressions, IntPtr names, int n_names);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_execute_plan_build_project_node")]
public static extern unsafe GArrowExecuteNode* garrow_execute_plan_build_project_node(GArrowExecutePlan* plan, GArrowExecuteNode* input, GArrowProjectNodeOptions* options, out GError** error);


[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_int32_scalar_new")]
public static extern unsafe GArrowInt32Scalar* garrow_int32_scalar_new(int value);
}

[StructLayout(LayoutKind.Sequential)]
Expand Down
11 changes: 7 additions & 4 deletions csharp/src/Apache.Arrow.Acero/Declaration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public static IArrowArrayStream ToRecordBatchReader(Declaration declaration, Sch
new List<Declaration>
{
declaration,
new Declaration("table_sink", sinkNodeOptions)
new Declaration("sink", sinkNodeOptions)
});

sinkNode.AddToPlan(plan);
Expand All @@ -89,15 +89,15 @@ private ExecNode AddToPlan(ExecPlan plan)
{
var nodes = new List<ExecNode>();

foreach (var input in Inputs)
foreach (Declaration input in Inputs)
{
var node = input.AddToPlan(plan);
ExecNode node = input.AddToPlan(plan);
nodes.Add(node);
}

switch (_factoryName)
{
case "table_sink":
case "sink":
return new SinkNode(_options as SinkNodeOptions, plan, nodes);

case "record_batch_source":
Expand All @@ -115,6 +115,9 @@ private ExecNode AddToPlan(ExecPlan plan)
case "union":
return new UnionNode(plan, nodes);

case "project":
return new ProjectNode(_options as ProjectNodeOptions, plan, nodes);

default:
throw new Exception($"Unknown factory {_factoryName}");
}
Expand Down
14 changes: 14 additions & 0 deletions csharp/src/Apache.Arrow.Acero/ExceptionUtil.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;
using static Apache.Arrow.Acero.CLib;

namespace Apache.Arrow.Acero
{
public static class ExceptionUtil
{
public static unsafe void ThrowOnError(GError** error)
{
if ((IntPtr)error != IntPtr.Zero)
throw new GLib.GException((IntPtr)error);
}
}
}
1 change: 0 additions & 1 deletion csharp/src/Apache.Arrow.Acero/ExecNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,5 @@ namespace Apache.Arrow.Acero
public abstract class ExecNode
{
public abstract unsafe GArrowExecuteNode* GetPtr();

}
}
16 changes: 14 additions & 2 deletions csharp/src/Apache.Arrow.Acero/ExecPlan.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using static Apache.Arrow.Acero.CLib;

namespace Apache.Arrow.Acero
{
public class ExecPlan
Expand All @@ -21,12 +23,22 @@ public class ExecPlan

public unsafe ExecPlan()
{
_planPtr = CLib.garrow_execute_plan_new(null);
GError** error;

_planPtr = CLib.garrow_execute_plan_new(out error);

ExceptionUtil.ThrowOnError(error);
}

public unsafe bool Validate()
{
return CLib.garrow_execute_plan_validate(_planPtr, null);
GError** error;

var valid = CLib.garrow_execute_plan_validate(_planPtr, out error);

ExceptionUtil.ThrowOnError(error);

return valid;
}

public unsafe void StartProducing()
Expand Down
16 changes: 13 additions & 3 deletions csharp/src/Apache.Arrow.Acero/ExportUtil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

using Apache.Arrow.C;
using Apache.Arrow.Ipc;
using static Apache.Arrow.Acero.CLib;

namespace Apache.Arrow.Acero
{
Expand All @@ -29,7 +30,10 @@ internal static class ExportUtil
CArrowSchemaExporter.ExportSchema(schema, cSchema);

// Import CArrowSchema into a GArrowSchema
var gSchema = CLib.garrow_schema_import(cSchema, null);
GError** error;
var gSchema = CLib.garrow_schema_import(cSchema, out error);

ExceptionUtil.ThrowOnError(error);

return gSchema;
}
Expand All @@ -45,7 +49,10 @@ internal static class ExportUtil
CArrowArrayExporter.ExportRecordBatch(recordBatch, cArray);

// import the CArrowArray into a gArrowRecordBatch
var gRecordBatch = CLib.garrow_record_batch_import(cArray, schemaPtr, null);
GError** error;
var gRecordBatch = CLib.garrow_record_batch_import(cArray, schemaPtr, out error);

ExceptionUtil.ThrowOnError(error);

return gRecordBatch;
}
Expand All @@ -59,7 +66,10 @@ internal static class ExportUtil
CArrowArrayStreamExporter.ExportArrayStream(recordBatchReader, cArrayStream);

// next import the c arrow as a gArrowRecordBatch
var gRecordBatchReader = CLib.garrow_record_batch_reader_import(cArrayStream, null);
GError** error;
var gRecordBatchReader = CLib.garrow_record_batch_reader_import(cArrayStream, out error);

ExceptionUtil.ThrowOnError(error);

return gRecordBatchReader;
}
Expand Down
6 changes: 6 additions & 0 deletions csharp/src/Apache.Arrow.Acero/Expression.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,11 @@ namespace Apache.Arrow.Acero
public abstract class Expression
{
public abstract IntPtr GetPtr();

public override unsafe string ToString()
{
var strPtr = CLib.garrow_expression_to_string(GetPtr());
return StringUtil.PtrToStringUtf8((byte*)strPtr);
}
}
}
7 changes: 6 additions & 1 deletion csharp/src/Apache.Arrow.Acero/FieldExpression.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// limitations under the License.

using System;
using static Apache.Arrow.Acero.CLib;

namespace Apache.Arrow.Acero
{
Expand All @@ -25,7 +26,11 @@ public unsafe FieldExpression(string field)
{
var reference = (nint)StringUtil.ToCStringUtf8(field);

_ptr = (nint)CLib.garrow_field_expression_new(reference, null);
GError** error;

_ptr = (IntPtr)CLib.garrow_field_expression_new(reference, out error);

ExceptionUtil.ThrowOnError(error);
}

public override IntPtr GetPtr()
Expand Down
9 changes: 7 additions & 2 deletions csharp/src/Apache.Arrow.Acero/FilterNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// limitations under the License.

using System.Collections.Generic;
using static Apache.Arrow.Acero.CLib;

namespace Apache.Arrow.Acero
{
Expand All @@ -22,8 +23,12 @@ public class FilterNode : ExecNode
private unsafe CLib.GArrowExecuteNode* _nodePtr;

public unsafe FilterNode(FilterNodeOptions options, ExecPlan plan, List<ExecNode> nodes)
{
_nodePtr = CLib.garrow_execute_plan_build_filter_node(plan.GetPtr(), nodes[0].GetPtr(), options.GetPtr(), null);
{
GError** error;

_nodePtr = CLib.garrow_execute_plan_build_filter_node(plan.GetPtr(), nodes[0].GetPtr(), options.GetPtr(), out error);

ExceptionUtil.ThrowOnError(error);
}

public override unsafe CLib.GArrowExecuteNode* GetPtr()
Expand Down
6 changes: 0 additions & 6 deletions csharp/src/Apache.Arrow.Acero/FilterNodeOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using static Apache.Arrow.Acero.CLib;

namespace Apache.Arrow.Acero
Expand All @@ -25,11 +24,6 @@ public class FilterNodeOptions : ExecNodeOptions
public unsafe FilterNodeOptions(Expression expr)
{
_optionsPtr = CLib.garrow_filter_node_options_new(expr.GetPtr());

var strPtr = CLib.garrow_expression_to_string(expr.GetPtr());
var str = StringUtil.PtrToStringUtf8((byte*)strPtr);

Console.WriteLine(str);
}

internal unsafe GArrowFilterNodeOptions* GetPtr()
Expand Down
23 changes: 9 additions & 14 deletions csharp/src/Apache.Arrow.Acero/Function.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
// limitations under the License.

using System;
using System.Runtime.InteropServices;

namespace Apache.Arrow.Acero
{
Expand All @@ -23,24 +22,20 @@ public class Function : Expression
private IntPtr _ptr;

public unsafe Function(string functionName, Expression lhs, Expression rhs)
: this(functionName, new[] { lhs, rhs })
{
var functionNamePtr = StringUtil.ToCStringUtf8(functionName);
}

var rhsItem = new GList
{
data = rhs.GetPtr()
};
public unsafe Function(string functionName, params Expression[] args)
{
var functionNamePtr = StringUtil.ToCStringUtf8(functionName);

var lhsItem = new GList
{
data = lhs.GetPtr(),
next = &rhsItem
};
var list = new GLib.List(IntPtr.Zero);

IntPtr glistPtr = Marshal.AllocHGlobal(Marshal.SizeOf<GList>());
Marshal.StructureToPtr<GList>(lhsItem, glistPtr, false);
foreach (var arg in args)
list.Append(arg.GetPtr());

_ptr = (nint)CLib.garrow_call_expression_new((IntPtr)functionNamePtr, glistPtr, null);
_ptr = (IntPtr)CLib.garrow_call_expression_new((IntPtr)functionNamePtr, list.Handle, null);
}

public override IntPtr GetPtr()
Expand Down
11 changes: 8 additions & 3 deletions csharp/src/Apache.Arrow.Acero/HashJoinNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// limitations under the License.

using System.Collections.Generic;
using static Apache.Arrow.Acero.CLib;

namespace Apache.Arrow.Acero
{
Expand All @@ -23,10 +24,14 @@ public class HashJoinNode : ExecNode

public unsafe HashJoinNode(HashJoinNodeOptions options, ExecPlan plan, List<ExecNode> inputs)
{
var left = inputs[0];
var right = inputs[1];
ExecNode left = inputs[0];
ExecNode right = inputs[1];

_nodePtr = CLib.garrow_execute_plan_build_hash_join_node(plan.GetPtr(), left.GetPtr(), right.GetPtr(), options.GetPtr(), null);
GError** error;

_nodePtr = CLib.garrow_execute_plan_build_hash_join_node(plan.GetPtr(), left.GetPtr(), right.GetPtr(), options.GetPtr(), out error);

ExceptionUtil.ThrowOnError(error);
}

public override unsafe CLib.GArrowExecuteNode* GetPtr()
Expand Down
Loading

0 comments on commit 98cac51

Please sign in to comment.