diff --git a/csharp/src/Apache.Arrow.Acero/CLib.cs b/csharp/src/Apache.Arrow.Acero/CLib.cs index 098c4a4b11c5a..9811a976ffbbd 100644 --- a/csharp/src/Apache.Arrow.Acero/CLib.cs +++ b/csharp/src/Apache.Arrow.Acero/CLib.cs @@ -44,6 +44,8 @@ public struct GArrowStringScalar { } public struct GArrowInt8Scalar { } public struct GArrowSortOptions { } public struct GArrowSortKey { } + public struct GArrowProjectNodeOptions { } + public struct GArrowInt32Scalar { } public enum GArrowJoinType { @@ -66,16 +68,16 @@ public enum GArrowSortOrder public const string DllName = "libarrow-glib-1300.dll"; [DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_execute_plan_new")] - public static extern unsafe GArrowExecutePlan* garrow_execute_plan_new(GError** error); + public static extern unsafe GArrowExecutePlan* garrow_execute_plan_new(out GError** error); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_source_node_options_new_record_batch")] public static extern unsafe GArrowSourceNodeOptions* garrow_source_node_options_new_record_batch(GArrowRecordBatch* record_batch); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_record_batch_import")] - public static extern unsafe GArrowRecordBatch* garrow_record_batch_import(CArrowArray* c_abi_array, GArrowSchema* schema, GError** error); + public static extern unsafe GArrowRecordBatch* garrow_record_batch_import(CArrowArray* c_abi_array, GArrowSchema* schema, out GError** error); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_schema_import")] - public static extern unsafe GArrowSchema* garrow_schema_import(CArrowSchema* c_abi_schema, GError** error); + public static extern unsafe GArrowSchema* garrow_schema_import(CArrowSchema* c_abi_schema, out GError** error); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_schema_get_field")] public static extern unsafe GArrowField* garrow_schema_get_field(GArrowSchema* schema, uint i); @@ -84,22 +86,22 @@ public enum GArrowSortOrder public static extern unsafe bool garrow_field_is_nullable(GArrowField* field); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_hash_join_node_options_new")] - public static extern unsafe GArrowHashJoinNodeOptions* garrow_hash_join_node_options_new(GArrowJoinType type, IntPtr left_keys, uint n_left_keys, IntPtr right_keys, uint n_right_keys, GError** error); + public static extern unsafe GArrowHashJoinNodeOptions* garrow_hash_join_node_options_new(GArrowJoinType type, IntPtr left_keys, uint n_left_keys, IntPtr right_keys, uint n_right_keys, out GError** error); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_execute_plan_build_hash_join_node")] - public static extern unsafe GArrowExecuteNode* garrow_execute_plan_build_hash_join_node(GArrowExecutePlan* plan, GArrowExecuteNode* left, GArrowExecuteNode* right, GArrowHashJoinNodeOptions* options, GError** error); + public static extern unsafe GArrowExecuteNode* garrow_execute_plan_build_hash_join_node(GArrowExecutePlan* plan, GArrowExecuteNode* left, GArrowExecuteNode* right, GArrowHashJoinNodeOptions* options, out GError** error); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_execute_plan_build_source_node")] - public static extern unsafe GArrowExecuteNode* garrow_execute_plan_build_source_node(GArrowExecutePlan* plan, GArrowSourceNodeOptions* options, GError** error); + public static extern unsafe GArrowExecuteNode* garrow_execute_plan_build_source_node(GArrowExecutePlan* plan, GArrowSourceNodeOptions* options, out GError** error); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_execute_plan_validate")] - public static extern unsafe bool garrow_execute_plan_validate(GArrowExecutePlan* plan, GError** error); + public static extern unsafe bool garrow_execute_plan_validate(GArrowExecutePlan* plan, out GError** error); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_sink_node_options_new")] public static extern unsafe GArrowSinkNodeOptions* garrow_sink_node_options_new(); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_execute_plan_build_sink_node")] - public static extern unsafe GArrowExecuteNode* garrow_execute_plan_build_sink_node(GArrowExecutePlan* plan, GArrowExecuteNode* input, GArrowSinkNodeOptions* options, GError** error); + public static extern unsafe GArrowExecuteNode* garrow_execute_plan_build_sink_node(GArrowExecutePlan* plan, GArrowExecuteNode* input, GArrowSinkNodeOptions* options, out GError** error); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_execute_plan_start")] public static extern unsafe void garrow_execute_plan_start(GArrowExecutePlan* plan); @@ -111,16 +113,16 @@ public enum GArrowSortOrder public static extern unsafe GArrowRecordBatchReader* garrow_sink_node_options_get_reader(GArrowSinkNodeOptions* options, GArrowSchema* schema); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_record_batch_reader_export")] - public static extern unsafe CArrowArrayStream* garrow_record_batch_reader_export(GArrowRecordBatchReader* reader, GError** error); + public static extern unsafe CArrowArrayStream* garrow_record_batch_reader_export(GArrowRecordBatchReader* reader, out GError** error); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_record_batch_reader_import")] - public static extern unsafe GArrowRecordBatchReader* garrow_record_batch_reader_import(CArrowArrayStream* c_abi_array_stream, GError** error); + public static extern unsafe GArrowRecordBatchReader* garrow_record_batch_reader_import(CArrowArrayStream* c_abi_array_stream, out GError** error); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_source_node_options_new_record_batch_reader")] public static extern unsafe GArrowSourceNodeOptions* garrow_source_node_options_new_record_batch_reader(GArrowRecordBatchReader* reader); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_execute_plan_build_filter_node")] - public static extern unsafe GArrowExecuteNode* garrow_execute_plan_build_filter_node(GArrowExecutePlan* plan, GArrowExecuteNode* input, GArrowFilterNodeOptions* options, GError** error); + public static extern unsafe GArrowExecuteNode* garrow_execute_plan_build_filter_node(GArrowExecutePlan* plan, GArrowExecuteNode* input, GArrowFilterNodeOptions* options, out GError** error); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_filter_node_options_new")] public static extern unsafe GArrowFilterNodeOptions* garrow_filter_node_options_new(IntPtr expression); @@ -129,7 +131,7 @@ public enum GArrowSortOrder public static extern unsafe GArrowCallExpression* garrow_call_expression_new(IntPtr function, IntPtr arguments, GArrowFunctionOptions* options); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_field_expression_new")] - public static extern unsafe GArrowFieldExpression* garrow_field_expression_new(IntPtr reference, GError** error); + public static extern unsafe GArrowFieldExpression* garrow_field_expression_new(IntPtr reference, out GError** error); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_literal_expression_new")] public static extern unsafe GArrowFieldExpression* garrow_literal_expression_new(GArrowDatum* datum); @@ -156,10 +158,20 @@ public enum GArrowSortOrder public static extern unsafe GArrowSortOptions* garrow_sort_options_new(GList* sort_keys); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_sort_key_new")] - public static extern unsafe GArrowSortKey* garrow_sort_key_new(IntPtr target, GArrowSortOrder order, GError** error); + public static extern unsafe GArrowSortKey* garrow_sort_key_new(IntPtr target, GArrowSortOrder order, out GError** error); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_execute_plan_build_node")] public static extern unsafe GArrowExecuteNode* garrow_execute_plan_build_node(GArrowExecutePlan* plan, IntPtr factory_name, IntPtr inputs, IntPtr options, out GError **error); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_project_node_options_new")] + public static extern unsafe GArrowProjectNodeOptions* garrow_project_node_options_new(GList* expressions, IntPtr names, int n_names); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_execute_plan_build_project_node")] + public static extern unsafe GArrowExecuteNode* garrow_execute_plan_build_project_node(GArrowExecutePlan* plan, GArrowExecuteNode* input, GArrowProjectNodeOptions* options, out GError** error); + + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_int32_scalar_new")] + public static extern unsafe GArrowInt32Scalar* garrow_int32_scalar_new(int value); } [StructLayout(LayoutKind.Sequential)] diff --git a/csharp/src/Apache.Arrow.Acero/Declaration.cs b/csharp/src/Apache.Arrow.Acero/Declaration.cs index 4e427a30a15b5..d1610981a47d8 100644 --- a/csharp/src/Apache.Arrow.Acero/Declaration.cs +++ b/csharp/src/Apache.Arrow.Acero/Declaration.cs @@ -71,7 +71,7 @@ public static IArrowArrayStream ToRecordBatchReader(Declaration declaration, Sch new List { declaration, - new Declaration("table_sink", sinkNodeOptions) + new Declaration("sink", sinkNodeOptions) }); sinkNode.AddToPlan(plan); @@ -89,15 +89,15 @@ private ExecNode AddToPlan(ExecPlan plan) { var nodes = new List(); - foreach (var input in Inputs) + foreach (Declaration input in Inputs) { - var node = input.AddToPlan(plan); + ExecNode node = input.AddToPlan(plan); nodes.Add(node); } switch (_factoryName) { - case "table_sink": + case "sink": return new SinkNode(_options as SinkNodeOptions, plan, nodes); case "record_batch_source": @@ -115,6 +115,9 @@ private ExecNode AddToPlan(ExecPlan plan) case "union": return new UnionNode(plan, nodes); + case "project": + return new ProjectNode(_options as ProjectNodeOptions, plan, nodes); + default: throw new Exception($"Unknown factory {_factoryName}"); } diff --git a/csharp/src/Apache.Arrow.Acero/ExceptionUtil.cs b/csharp/src/Apache.Arrow.Acero/ExceptionUtil.cs new file mode 100644 index 0000000000000..5ea3dfde5cb43 --- /dev/null +++ b/csharp/src/Apache.Arrow.Acero/ExceptionUtil.cs @@ -0,0 +1,14 @@ +using System; +using static Apache.Arrow.Acero.CLib; + +namespace Apache.Arrow.Acero +{ + public static class ExceptionUtil + { + public static unsafe void ThrowOnError(GError** error) + { + if ((IntPtr)error != IntPtr.Zero) + throw new GLib.GException((IntPtr)error); + } + } +} diff --git a/csharp/src/Apache.Arrow.Acero/ExecNode.cs b/csharp/src/Apache.Arrow.Acero/ExecNode.cs index 95fe1b1d3cfee..0bd5f8d546088 100644 --- a/csharp/src/Apache.Arrow.Acero/ExecNode.cs +++ b/csharp/src/Apache.Arrow.Acero/ExecNode.cs @@ -20,6 +20,5 @@ namespace Apache.Arrow.Acero public abstract class ExecNode { public abstract unsafe GArrowExecuteNode* GetPtr(); - } } diff --git a/csharp/src/Apache.Arrow.Acero/ExecPlan.cs b/csharp/src/Apache.Arrow.Acero/ExecPlan.cs index a5822b53204f7..16a778519746f 100644 --- a/csharp/src/Apache.Arrow.Acero/ExecPlan.cs +++ b/csharp/src/Apache.Arrow.Acero/ExecPlan.cs @@ -13,6 +13,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +using static Apache.Arrow.Acero.CLib; + namespace Apache.Arrow.Acero { public class ExecPlan @@ -21,12 +23,22 @@ public class ExecPlan public unsafe ExecPlan() { - _planPtr = CLib.garrow_execute_plan_new(null); + GError** error; + + _planPtr = CLib.garrow_execute_plan_new(out error); + + ExceptionUtil.ThrowOnError(error); } public unsafe bool Validate() { - return CLib.garrow_execute_plan_validate(_planPtr, null); + GError** error; + + var valid = CLib.garrow_execute_plan_validate(_planPtr, out error); + + ExceptionUtil.ThrowOnError(error); + + return valid; } public unsafe void StartProducing() diff --git a/csharp/src/Apache.Arrow.Acero/ExportUtil.cs b/csharp/src/Apache.Arrow.Acero/ExportUtil.cs index 61a091e09267e..59f7a81e910c7 100644 --- a/csharp/src/Apache.Arrow.Acero/ExportUtil.cs +++ b/csharp/src/Apache.Arrow.Acero/ExportUtil.cs @@ -15,6 +15,7 @@ using Apache.Arrow.C; using Apache.Arrow.Ipc; +using static Apache.Arrow.Acero.CLib; namespace Apache.Arrow.Acero { @@ -29,7 +30,10 @@ internal static class ExportUtil CArrowSchemaExporter.ExportSchema(schema, cSchema); // Import CArrowSchema into a GArrowSchema - var gSchema = CLib.garrow_schema_import(cSchema, null); + GError** error; + var gSchema = CLib.garrow_schema_import(cSchema, out error); + + ExceptionUtil.ThrowOnError(error); return gSchema; } @@ -45,7 +49,10 @@ internal static class ExportUtil CArrowArrayExporter.ExportRecordBatch(recordBatch, cArray); // import the CArrowArray into a gArrowRecordBatch - var gRecordBatch = CLib.garrow_record_batch_import(cArray, schemaPtr, null); + GError** error; + var gRecordBatch = CLib.garrow_record_batch_import(cArray, schemaPtr, out error); + + ExceptionUtil.ThrowOnError(error); return gRecordBatch; } @@ -59,7 +66,10 @@ internal static class ExportUtil CArrowArrayStreamExporter.ExportArrayStream(recordBatchReader, cArrayStream); // next import the c arrow as a gArrowRecordBatch - var gRecordBatchReader = CLib.garrow_record_batch_reader_import(cArrayStream, null); + GError** error; + var gRecordBatchReader = CLib.garrow_record_batch_reader_import(cArrayStream, out error); + + ExceptionUtil.ThrowOnError(error); return gRecordBatchReader; } diff --git a/csharp/src/Apache.Arrow.Acero/Expression.cs b/csharp/src/Apache.Arrow.Acero/Expression.cs index fc2e25ecff77b..df420e0623e22 100644 --- a/csharp/src/Apache.Arrow.Acero/Expression.cs +++ b/csharp/src/Apache.Arrow.Acero/Expression.cs @@ -20,5 +20,11 @@ namespace Apache.Arrow.Acero public abstract class Expression { public abstract IntPtr GetPtr(); + + public override unsafe string ToString() + { + var strPtr = CLib.garrow_expression_to_string(GetPtr()); + return StringUtil.PtrToStringUtf8((byte*)strPtr); + } } } diff --git a/csharp/src/Apache.Arrow.Acero/FieldExpression.cs b/csharp/src/Apache.Arrow.Acero/FieldExpression.cs index d380768889f14..b792bee32eebb 100644 --- a/csharp/src/Apache.Arrow.Acero/FieldExpression.cs +++ b/csharp/src/Apache.Arrow.Acero/FieldExpression.cs @@ -14,6 +14,7 @@ // limitations under the License. using System; +using static Apache.Arrow.Acero.CLib; namespace Apache.Arrow.Acero { @@ -25,7 +26,11 @@ public unsafe FieldExpression(string field) { var reference = (nint)StringUtil.ToCStringUtf8(field); - _ptr = (nint)CLib.garrow_field_expression_new(reference, null); + GError** error; + + _ptr = (IntPtr)CLib.garrow_field_expression_new(reference, out error); + + ExceptionUtil.ThrowOnError(error); } public override IntPtr GetPtr() diff --git a/csharp/src/Apache.Arrow.Acero/FilterNode.cs b/csharp/src/Apache.Arrow.Acero/FilterNode.cs index 294855f61632e..ec7e098654818 100644 --- a/csharp/src/Apache.Arrow.Acero/FilterNode.cs +++ b/csharp/src/Apache.Arrow.Acero/FilterNode.cs @@ -14,6 +14,7 @@ // limitations under the License. using System.Collections.Generic; +using static Apache.Arrow.Acero.CLib; namespace Apache.Arrow.Acero { @@ -22,8 +23,12 @@ public class FilterNode : ExecNode private unsafe CLib.GArrowExecuteNode* _nodePtr; public unsafe FilterNode(FilterNodeOptions options, ExecPlan plan, List nodes) - { - _nodePtr = CLib.garrow_execute_plan_build_filter_node(plan.GetPtr(), nodes[0].GetPtr(), options.GetPtr(), null); + { + GError** error; + + _nodePtr = CLib.garrow_execute_plan_build_filter_node(plan.GetPtr(), nodes[0].GetPtr(), options.GetPtr(), out error); + + ExceptionUtil.ThrowOnError(error); } public override unsafe CLib.GArrowExecuteNode* GetPtr() diff --git a/csharp/src/Apache.Arrow.Acero/FilterNodeOptions.cs b/csharp/src/Apache.Arrow.Acero/FilterNodeOptions.cs index 7545524136ffd..ae7eecbdc1a4b 100644 --- a/csharp/src/Apache.Arrow.Acero/FilterNodeOptions.cs +++ b/csharp/src/Apache.Arrow.Acero/FilterNodeOptions.cs @@ -13,7 +13,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -using System; using static Apache.Arrow.Acero.CLib; namespace Apache.Arrow.Acero @@ -25,11 +24,6 @@ public class FilterNodeOptions : ExecNodeOptions public unsafe FilterNodeOptions(Expression expr) { _optionsPtr = CLib.garrow_filter_node_options_new(expr.GetPtr()); - - var strPtr = CLib.garrow_expression_to_string(expr.GetPtr()); - var str = StringUtil.PtrToStringUtf8((byte*)strPtr); - - Console.WriteLine(str); } internal unsafe GArrowFilterNodeOptions* GetPtr() diff --git a/csharp/src/Apache.Arrow.Acero/Function.cs b/csharp/src/Apache.Arrow.Acero/Function.cs index 7f3c0d3d53fde..1418c8b458724 100644 --- a/csharp/src/Apache.Arrow.Acero/Function.cs +++ b/csharp/src/Apache.Arrow.Acero/Function.cs @@ -14,7 +14,6 @@ // limitations under the License. using System; -using System.Runtime.InteropServices; namespace Apache.Arrow.Acero { @@ -23,24 +22,20 @@ public class Function : Expression private IntPtr _ptr; public unsafe Function(string functionName, Expression lhs, Expression rhs) + : this(functionName, new[] { lhs, rhs }) { - var functionNamePtr = StringUtil.ToCStringUtf8(functionName); + } - var rhsItem = new GList - { - data = rhs.GetPtr() - }; + public unsafe Function(string functionName, params Expression[] args) + { + var functionNamePtr = StringUtil.ToCStringUtf8(functionName); - var lhsItem = new GList - { - data = lhs.GetPtr(), - next = &rhsItem - }; + var list = new GLib.List(IntPtr.Zero); - IntPtr glistPtr = Marshal.AllocHGlobal(Marshal.SizeOf()); - Marshal.StructureToPtr(lhsItem, glistPtr, false); + foreach (var arg in args) + list.Append(arg.GetPtr()); - _ptr = (nint)CLib.garrow_call_expression_new((IntPtr)functionNamePtr, glistPtr, null); + _ptr = (IntPtr)CLib.garrow_call_expression_new((IntPtr)functionNamePtr, list.Handle, null); } public override IntPtr GetPtr() diff --git a/csharp/src/Apache.Arrow.Acero/HashJoinNode.cs b/csharp/src/Apache.Arrow.Acero/HashJoinNode.cs index cd5e0f1fe458e..56dfc814a2253 100644 --- a/csharp/src/Apache.Arrow.Acero/HashJoinNode.cs +++ b/csharp/src/Apache.Arrow.Acero/HashJoinNode.cs @@ -14,6 +14,7 @@ // limitations under the License. using System.Collections.Generic; +using static Apache.Arrow.Acero.CLib; namespace Apache.Arrow.Acero { @@ -23,10 +24,14 @@ public class HashJoinNode : ExecNode public unsafe HashJoinNode(HashJoinNodeOptions options, ExecPlan plan, List inputs) { - var left = inputs[0]; - var right = inputs[1]; + ExecNode left = inputs[0]; + ExecNode right = inputs[1]; - _nodePtr = CLib.garrow_execute_plan_build_hash_join_node(plan.GetPtr(), left.GetPtr(), right.GetPtr(), options.GetPtr(), null); + GError** error; + + _nodePtr = CLib.garrow_execute_plan_build_hash_join_node(plan.GetPtr(), left.GetPtr(), right.GetPtr(), options.GetPtr(), out error); + + ExceptionUtil.ThrowOnError(error); } public override unsafe CLib.GArrowExecuteNode* GetPtr() diff --git a/csharp/src/Apache.Arrow.Acero/HashJoinNodeOptions.cs b/csharp/src/Apache.Arrow.Acero/HashJoinNodeOptions.cs index 61647213de24a..4884543905617 100644 --- a/csharp/src/Apache.Arrow.Acero/HashJoinNodeOptions.cs +++ b/csharp/src/Apache.Arrow.Acero/HashJoinNodeOptions.cs @@ -13,9 +13,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -using System; -using System.Collections.Generic; -using System.Runtime.InteropServices; using static Apache.Arrow.Acero.CLib; namespace Apache.Arrow.Acero @@ -26,27 +23,14 @@ public class HashJoinNodeOptions : ExecNodeOptions public unsafe HashJoinNodeOptions(GArrowJoinType joinType, string[] leftKeys, string[] rightKeys) { - var leftKeysPtr = GetStringArrayPtr(leftKeys); - var rightKeysPtr = GetStringArrayPtr(rightKeys); + var leftKeysPtr = StringUtil.GetStringArrayPtr(leftKeys); + var rightKeysPtr = StringUtil.GetStringArrayPtr(rightKeys); - _optionsPtr = garrow_hash_join_node_options_new(joinType, leftKeysPtr, (uint)leftKeys.Length, rightKeysPtr, (uint)rightKeys.Length, null); - } - - public IntPtr GetStringArrayPtr(params string[] arguments) - { - List allocatedMemory = new List(); - - int sizeOfIntPtr = Marshal.SizeOf(typeof(IntPtr)); - IntPtr pointersToArguments = Marshal.AllocHGlobal(sizeOfIntPtr * arguments.Length); + GError** error; - for (int i = 0; i < arguments.Length; ++i) - { - IntPtr pointerToArgument = Marshal.StringToHGlobalAnsi(arguments[i]); - allocatedMemory.Add(pointerToArgument); - Marshal.WriteIntPtr(pointersToArguments, i * sizeOfIntPtr, pointerToArgument); - } + _optionsPtr = garrow_hash_join_node_options_new(joinType, leftKeysPtr, (uint)leftKeys.Length, rightKeysPtr, (uint)rightKeys.Length, out error); - return pointersToArguments; + ExceptionUtil.ThrowOnError(error); } internal unsafe GArrowHashJoinNodeOptions* GetPtr() diff --git a/csharp/src/Apache.Arrow.Acero/LiteralExpression.cs b/csharp/src/Apache.Arrow.Acero/LiteralExpression.cs index 436fdfc881a13..080cd59145a96 100644 --- a/csharp/src/Apache.Arrow.Acero/LiteralExpression.cs +++ b/csharp/src/Apache.Arrow.Acero/LiteralExpression.cs @@ -24,12 +24,20 @@ public class LiteralExpression : Expression public unsafe LiteralExpression(string literal) { - var dataPtr = (nint)StringUtil.ToCStringUtf8(literal); + var dataPtr = (IntPtr)StringUtil.ToCStringUtf8(literal); var bufferPtr = CLib.garrow_buffer_new(dataPtr, literal.Length); var scalarPtr = CLib.garrow_string_scalar_new(bufferPtr); - var datumPtr = (nint)CLib.garrow_scalar_datum_new((nint)scalarPtr); + var datumPtr = (IntPtr)CLib.garrow_scalar_datum_new((IntPtr)scalarPtr); - _ptr = (nint)CLib.garrow_literal_expression_new((GArrowDatum*)datumPtr); + _ptr = (IntPtr)CLib.garrow_literal_expression_new((GArrowDatum*)datumPtr); + } + + public unsafe LiteralExpression(int literal) + { + var scalarPtr = CLib.garrow_int32_scalar_new(literal); + var datumPtr = (IntPtr)CLib.garrow_scalar_datum_new((IntPtr)scalarPtr); + + _ptr = (IntPtr)CLib.garrow_literal_expression_new((GArrowDatum*)datumPtr); } public override IntPtr GetPtr() diff --git a/csharp/src/Apache.Arrow.Acero/ProjectNode.cs b/csharp/src/Apache.Arrow.Acero/ProjectNode.cs new file mode 100644 index 0000000000000..7e95b8ea787c8 --- /dev/null +++ b/csharp/src/Apache.Arrow.Acero/ProjectNode.cs @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System.Collections.Generic; +using static Apache.Arrow.Acero.CLib; + +namespace Apache.Arrow.Acero +{ + public class ProjectNode : ExecNode + { + private unsafe CLib.GArrowExecuteNode* _nodePtr; + + public unsafe ProjectNode(ProjectNodeOptions options, ExecPlan plan, List nodes) + { + GError** error; + + _nodePtr = CLib.garrow_execute_plan_build_project_node(plan.GetPtr(), nodes[0].GetPtr(), options.GetPtr(), out error); + + ExceptionUtil.ThrowOnError(error); + } + + public override unsafe CLib.GArrowExecuteNode* GetPtr() + { + return _nodePtr; + } + } +} diff --git a/csharp/src/Apache.Arrow.Acero/ProjectNodeOptions.cs b/csharp/src/Apache.Arrow.Acero/ProjectNodeOptions.cs new file mode 100644 index 0000000000000..36607ca9d8c16 --- /dev/null +++ b/csharp/src/Apache.Arrow.Acero/ProjectNodeOptions.cs @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using static Apache.Arrow.Acero.CLib; + +namespace Apache.Arrow.Acero +{ + public class ProjectNodeOptions : ExecNodeOptions + { + private unsafe GArrowProjectNodeOptions* _optionsPtr; + + public unsafe ProjectNodeOptions(List expressions, List names) + { + var list = new GLib.List(IntPtr.Zero); + + foreach (var expression in expressions) + list.Append(expression.GetPtr()); + + var namesPtr = StringUtil.GetStringArrayPtr(names.ToArray()); + + _optionsPtr = CLib.garrow_project_node_options_new((GList*)list.Handle, namesPtr, names.Count); + } + + internal unsafe GArrowProjectNodeOptions* GetPtr() + { + return _optionsPtr; + } + } +} diff --git a/csharp/src/Apache.Arrow.Acero/RecordBatchReaderSourceNode.cs b/csharp/src/Apache.Arrow.Acero/RecordBatchReaderSourceNode.cs index e070c06b0fbdb..fa9302695e69a 100644 --- a/csharp/src/Apache.Arrow.Acero/RecordBatchReaderSourceNode.cs +++ b/csharp/src/Apache.Arrow.Acero/RecordBatchReaderSourceNode.cs @@ -13,6 +13,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +using static Apache.Arrow.Acero.CLib; + namespace Apache.Arrow.Acero { public class RecordBatchReaderSourceNode : ExecNode @@ -21,7 +23,11 @@ public class RecordBatchReaderSourceNode : ExecNode public unsafe RecordBatchReaderSourceNode(RecordBatchReaderSourceNodeOptions options, ExecPlan plan) { - _nodePtr = CLib.garrow_execute_plan_build_source_node(plan.GetPtr(), options.GetPtr(), null); + GError** error; + + _nodePtr = CLib.garrow_execute_plan_build_source_node(plan.GetPtr(), options.GetPtr(), out error); + + ExceptionUtil.ThrowOnError(error); } public override unsafe CLib.GArrowExecuteNode* GetPtr() diff --git a/csharp/src/Apache.Arrow.Acero/RecordBatchSourceNode.cs b/csharp/src/Apache.Arrow.Acero/RecordBatchSourceNode.cs index 7418af51a4218..0dc6797c83cbf 100644 --- a/csharp/src/Apache.Arrow.Acero/RecordBatchSourceNode.cs +++ b/csharp/src/Apache.Arrow.Acero/RecordBatchSourceNode.cs @@ -13,6 +13,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +using static Apache.Arrow.Acero.CLib; + namespace Apache.Arrow.Acero { public class RecordBatchSourceNode : ExecNode @@ -21,7 +23,11 @@ public class RecordBatchSourceNode : ExecNode public unsafe RecordBatchSourceNode(RecordBatchSourceNodeOptions options, ExecPlan plan) { - _nodePtr = CLib.garrow_execute_plan_build_source_node(plan.GetPtr(), options.GetPtr(), null); + GError** error; + + _nodePtr = CLib.garrow_execute_plan_build_source_node(plan.GetPtr(), options.GetPtr(), out error); + + ExceptionUtil.ThrowOnError(error); } public override unsafe CLib.GArrowExecuteNode* GetPtr() diff --git a/csharp/src/Apache.Arrow.Acero/SinkNode.cs b/csharp/src/Apache.Arrow.Acero/SinkNode.cs index 4a11473d51858..b9084d7b9632f 100644 --- a/csharp/src/Apache.Arrow.Acero/SinkNode.cs +++ b/csharp/src/Apache.Arrow.Acero/SinkNode.cs @@ -14,6 +14,7 @@ // limitations under the License. using System.Collections.Generic; +using static Apache.Arrow.Acero.CLib; namespace Apache.Arrow.Acero { @@ -23,7 +24,11 @@ public class SinkNode : ExecNode public unsafe SinkNode(SinkNodeOptions _options, ExecPlan plan, List inputs) { - _nodePtr = CLib.garrow_execute_plan_build_sink_node(plan.GetPtr(), inputs[0].GetPtr(), _options.GetPtr(), null); + GError** error; + + _nodePtr = CLib.garrow_execute_plan_build_sink_node(plan.GetPtr(), inputs[0].GetPtr(), _options.GetPtr(), out error); + + ExceptionUtil.ThrowOnError(error); } public unsafe override CLib.GArrowExecuteNode* GetPtr() diff --git a/csharp/src/Apache.Arrow.Acero/SinkNodeOptions.cs b/csharp/src/Apache.Arrow.Acero/SinkNodeOptions.cs index 4eb46fb71febc..6c9b060bcdf7d 100644 --- a/csharp/src/Apache.Arrow.Acero/SinkNodeOptions.cs +++ b/csharp/src/Apache.Arrow.Acero/SinkNodeOptions.cs @@ -15,6 +15,7 @@ using Apache.Arrow.C; using Apache.Arrow.Ipc; +using static Apache.Arrow.Acero.CLib; namespace Apache.Arrow.Acero { @@ -33,7 +34,13 @@ public unsafe IArrowArrayStream GetRecordBatchReader() { var schemaPtr = ExportUtil.ExportAndGetSchemaPtr(_schema); var recordBatchReaderPtr = CLib.garrow_sink_node_options_get_reader(_optionsPtr, schemaPtr); - var arrayStreamPtr = CLib.garrow_record_batch_reader_export(recordBatchReaderPtr, null); + + GError** error; + + var arrayStreamPtr = CLib.garrow_record_batch_reader_export(recordBatchReaderPtr, out error); + + ExceptionUtil.ThrowOnError(error); + var arrayStream = CArrowArrayStreamImporter.ImportArrayStream(arrayStreamPtr); return arrayStream; diff --git a/csharp/src/Apache.Arrow.Acero/StringUtil.cs b/csharp/src/Apache.Arrow.Acero/StringUtil.cs index 29154a494b8b6..bfce48dc96db2 100644 --- a/csharp/src/Apache.Arrow.Acero/StringUtil.cs +++ b/csharp/src/Apache.Arrow.Acero/StringUtil.cs @@ -1,9 +1,28 @@ -using System.Runtime.InteropServices; +using System.Collections.Generic; +using System; +using System.Runtime.InteropServices; namespace Apache.Arrow.Acero { internal static class StringUtil { + public static IntPtr GetStringArrayPtr(params string[] arguments) + { + List allocatedMemory = new List(); + + int sizeOfIntPtr = Marshal.SizeOf(typeof(IntPtr)); + IntPtr pointersToArguments = Marshal.AllocHGlobal(sizeOfIntPtr * arguments.Length); + + for (int i = 0; i < arguments.Length; ++i) + { + IntPtr pointerToArgument = Marshal.StringToHGlobalAnsi(arguments[i]); + allocatedMemory.Add(pointerToArgument); + Marshal.WriteIntPtr(pointersToArguments, i * sizeOfIntPtr, pointerToArgument); + } + + return pointersToArguments; + } + public static unsafe byte* ToCStringUtf8(string str) { var utf8 = System.Text.Encoding.UTF8; diff --git a/csharp/src/Apache.Arrow.Acero/UnionNode.cs b/csharp/src/Apache.Arrow.Acero/UnionNode.cs index 7dad911fc2785..a2382e3c7bd11 100644 --- a/csharp/src/Apache.Arrow.Acero/UnionNode.cs +++ b/csharp/src/Apache.Arrow.Acero/UnionNode.cs @@ -33,10 +33,7 @@ public unsafe UnionNode(ExecPlan plan, List nodes) _nodePtr = garrow_execute_plan_build_node(plan.GetPtr(), factoryNamePtr, list.Handle, (IntPtr)optionsPtr2, out error); - if ((IntPtr)error != IntPtr.Zero) - { - throw new GLib.GException((IntPtr)error); - } + ExceptionUtil.ThrowOnError(error); } public override unsafe GArrowExecuteNode* GetPtr() => _nodePtr; diff --git a/csharp/test/Apache.Arrow.Acero.Tests/DeclarationTests.cs b/csharp/test/Apache.Arrow.Acero.Tests/DeclarationTests.cs index a6a32806ff100..db941e437060e 100644 --- a/csharp/test/Apache.Arrow.Acero.Tests/DeclarationTests.cs +++ b/csharp/test/Apache.Arrow.Acero.Tests/DeclarationTests.cs @@ -15,6 +15,7 @@ using Apache.Arrow.Ipc; using Apache.Arrow.Types; +using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; @@ -58,9 +59,9 @@ public async Task TestRecordBatchSource() """, rowCount: 3); } - private void AssertTable(Table table, string expected, int? rowCount = null) + private void AssertTable(Table table, string expected, int? rowCount = null, int columnPadding = 10) { - var actual = PrintPrintTable(table); + var actual = PrintPrintTable(table, columnPadding); _output.WriteLine(actual); @@ -89,8 +90,15 @@ public async Task TestHashJoin() options: hashJoinOptions, inputs: new List { left, right }); // act - var schema = GetOutputSchema(); - var result = hashJoin.ToRecordBatchReader(schema); + var result = hashJoin.ToRecordBatchReader( + new Schema.Builder() + .Field(new Field("customerId", StringType.Default, true)) + .Field(new Field("firstName", StringType.Default, true)) + .Field(new Field("lastName", StringType.Default, true)) + .Field(new Field("orderId", StringType.Default, true)) + .Field(new Field("customerId", StringType.Default, true)) + .Field(new Field("productId", StringType.Default, true)) + .Build()); // assert var table = await ConvertStreamToTable(result); @@ -171,8 +179,6 @@ public async Task TestMultiFilter() public async Task TestUnion() { // arrange - var schema = TestData.GetCustomersSchema(); - var left = new Declaration("record_batch_source", new RecordBatchSourceNodeOptions(TestData.GetCustomersRecordBatch())); @@ -182,6 +188,7 @@ public async Task TestUnion() var union = new Declaration("union", inputs: new List { left, right }); // act + var schema = TestData.GetCustomersSchema(); var result = union.ToRecordBatchReader(schema); // assert @@ -199,6 +206,45 @@ public async Task TestUnion() """, rowCount: 6); } + [Fact] + public async Task TestProjection() + { + // arrange + var customers = new Declaration("record_batch_source", + new RecordBatchSourceNodeOptions(TestData.GetCustomersRecordBatch())); + + var project = Declaration.FromSequence(new List { + customers, + new Declaration("project", new ProjectNodeOptions( + new List { + new Function( + "binary_join_element_wise", + new FieldExpression("lastName"), + new FieldExpression("firstName"), + new LiteralExpression(", ") + ) + }, + new List { "fullName" })) + }); + + // act + var result = project.ToRecordBatchReader( + new Schema.Builder() + .Field(new Field("fullName", StringType.Default, true)) + .Build()); + + // assert + var table = await ConvertStreamToTable(result); + + AssertTable(table, + """ + fullName | + Skywalker, Luke | + Leia, Princess | + Kenobi, Obi-Wan | + """, rowCount: 3, columnPadding: 15); + } + private async Task ConvertStreamToTable(IArrowArrayStream result) { Schema schema = null; @@ -219,13 +265,13 @@ private async Task
ConvertStreamToTable(IArrowArrayStream result) return Table.TableFromRecordBatches(schema, recordBatches); } - public static string PrintPrintTable(Table table) + public static string PrintPrintTable(Table table, int columnPadding = 10) { var sb = new StringBuilder(); for (var i = 0; i < table.ColumnCount; i++) { - sb.Append(table.Column(i).Name.PadRight(10) + " | "); + sb.Append(table.Column(i).Name.PadRight(columnPadding) + " | "); } sb.AppendLine(); @@ -241,8 +287,24 @@ public static string PrintPrintTable(Table table) if (sliced.Data.Array(k).Length == 0) continue; - var data = sliced.Data.Array(k) as StringArray; - sb.Append(data.GetString(0).PadRight(10) + " | "); + var data = sliced.Data.Array(k); + string value; + + switch (data) + { + case StringArray stringArray: + value = stringArray.GetString(0); + break; + + case Int32Array int32Array: + value = int32Array.GetValue(0).ToString(); + break; + + default: + throw new Exception("Array type not supported"); + } + + sb.Append(value.PadRight(columnPadding) + " | "); } } sb.AppendLine(); @@ -256,17 +318,5 @@ public class TestResult public string TableAsString { get; set; } public int RowCount { get; set; } } - - public Schema GetOutputSchema() - { - return new Schema.Builder() - .Field(new Field("customerId", StringType.Default, true)) - .Field(new Field("firstName", StringType.Default, true)) - .Field(new Field("lastName", StringType.Default, true)) - .Field(new Field("orderId", StringType.Default, true)) - .Field(new Field("customerId", StringType.Default, true)) - .Field(new Field("productId", StringType.Default, true)) - .Build(); - } } }