Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configurable packet size maximum #234

Merged
merged 2 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions lib/protocol/Connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var reply = require('./reply');
var createExecuteTask = require('./ExecuteTask').create;
var ReplySegment = reply.Segment;
var part = require('./part');
const { constants } = require('buffer');
var MessageType = common.MessageType;
var MessageTypeName = common.MessageTypeName;
var SegmentKind = common.SegmentKind;
Expand All @@ -36,12 +37,10 @@ var debug = util.debuglog('hdb');
var trace = util.tracelog();

var EMPTY_BUFFER = common.EMPTY_BUFFER;
var MAX_PACKET_SIZE = common.MAX_PACKET_SIZE;
var DEFAULT_PACKET_SIZE = common.DEFAULT_PACKET_SIZE;
var PACKET_HEADER_LENGTH = common.PACKET_HEADER_LENGTH;
var SEGMENT_HEADER_LENGTH = common.SEGMENT_HEADER_LENGTH;
var PART_HEADER_LENGTH = common.PART_HEADER_LENGTH;
var MAX_AVAILABLE_SIZE = MAX_PACKET_SIZE -
PACKET_HEADER_LENGTH - SEGMENT_HEADER_LENGTH - PART_HEADER_LENGTH;

module.exports = Connection;

Expand Down Expand Up @@ -161,6 +160,17 @@ Object.defineProperties(Connection.prototype, {
get: function shouldUseCesu8() {
return (this._settings.useCesu8 === true);
}
},
packetSize: {
get: function getPacketSize() {
return this._settings.packetSize || DEFAULT_PACKET_SIZE;
}
},
packetSizeLimit: {
get: function getPacketSizeLimit() {
let limit = this._settings.packetSizeLimit || this.packetSize;
return Math.max(limit, this.packetSize);
}
}
});

Expand Down Expand Up @@ -301,8 +311,11 @@ Connection.prototype.send = function send(message, receive) {
debug('send', message);
trace('REQUEST', message);

var size = MAX_PACKET_SIZE - PACKET_HEADER_LENGTH;
var size = this.packetSizeLimit - PACKET_HEADER_LENGTH;
var buffer = message.toBuffer(size);
if(buffer.length > size) {
return receive(new Error('Packet size limit exceeded'));
}
var packet = new Buffer(PACKET_HEADER_LENGTH + buffer.length);
buffer.copy(packet, PACKET_HEADER_LENGTH);

Expand Down Expand Up @@ -346,8 +359,9 @@ Connection.prototype.setStatementContext = function setStatementContext(options)
}
};

Connection.prototype.getAvailableSize = function getAvailableSize() {
var availableSize = MAX_AVAILABLE_SIZE;
Connection.prototype.getAvailableSize = function getAvailableSize(forLobs = false) {
var totalSize = forLobs ? this.packetSize : this.packetSizeLimit;
var availableSize = totalSize - PACKET_HEADER_LENGTH - SEGMENT_HEADER_LENGTH - PART_HEADER_LENGTH;
if (this._statementContext) {
availableSize -= this._statementContext.size;
}
Expand Down
19 changes: 12 additions & 7 deletions lib/protocol/ExecuteTask.js
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ ExecuteTask.prototype.pushReply = function pushReply(reply) {
}
};

ExecuteTask.prototype.getParameters = function getParameters(availableSize, cb) {
ExecuteTask.prototype.getParameters = function getParameters(availableSize, availableSizeForLOBs, cb) {
var self = this;
var bytesWritten = 0;
var row = 0;
Expand Down Expand Up @@ -176,24 +176,29 @@ ExecuteTask.prototype.getParameters = function getParameters(availableSize, cb)
}
}
var remainingSize = availableSize - bytesWritten;
var remainingSizeForLOBs = availableSizeForLOBs - bytesWritten;
if (self.writer.length > remainingSize) {
if (self.autoCommit) {
self.needFinializeTransaction = true;
self.autoCommit = false;
}
return cb(null, args);
if(args.length === 0) {
return cb(new Error('Failed to set parameters, maximum packet size exceeded.'));
} else {
return cb(null, args);
}
}
self.writer.getParameters(remainingSize, handleParameters);
self.writer.getParameters(remainingSizeForLOBS, handleParameters);
}

next();
};

ExecuteTask.prototype.sendExecute = function sendExecute(cb) {
var self = this;
var availableSize = self.connection.getAvailableSize();
availableSize -= STATEMENT_ID_PART_LENGTH;
self.getParameters(availableSize, function send(err, parameters) {
var availableSize = self.connection.getAvailableSize(false) - STATEMENT_ID_PART_LENGTH;
var availableSizeForLOBs = self.connection.getAvailableSize(true) - STATEMENT_ID_PART_LENGTH;
self.getParameters(availableSize, availableSizeForLOBs, function send(err, parameters) {
if (err) {
return cb(err);
}
Expand All @@ -210,7 +215,7 @@ ExecuteTask.prototype.sendExecute = function sendExecute(cb) {

ExecuteTask.prototype.sendWriteLobRequest = function sendWriteLobRequest(cb) {
var self = this;
var availableSize = self.connection.getAvailableSize();
var availableSize = self.connection.getAvailableSize(true);
self.writer.getWriteLobRequest(availableSize, function send(err, buffer) {
if (err) {
return cb(err);
Expand Down
2 changes: 1 addition & 1 deletion lib/protocol/Statement.js
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,4 @@ function isInputParameter(metadata) {

function isOutputParameter(metadata) {
return metadata.ioType === IoType.OUTPUT || metadata.ioType === IoType.IN_OUT;
}
}
16 changes: 8 additions & 8 deletions lib/protocol/Writer.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ function storeErrorOnStream (err) {
}

Writer.prototype.finializeParameters = function finializeParameters(
bytesRemaining, cb) {
bytesRemainingForLOBs, cb) {
var self = this;
var stream, header;
this._streamErrorListeners = [];
Expand Down Expand Up @@ -155,14 +155,14 @@ Writer.prototype.finializeParameters = function finializeParameters(

function onreadable() {
/* jshint validthis:true */
var chunk = this.read(bytesRemaining);
var chunk = this.read(bytesRemainingForLOBs);
if (chunk === null) {
chunk = this.read();
}
if (chunk === null) {
return;
}
if (chunk.length > bytesRemaining) {
if (chunk.length > bytesRemainingForLOBs) {
cleanup();
return cb(createReadStreamError());
}
Expand All @@ -172,9 +172,9 @@ Writer.prototype.finializeParameters = function finializeParameters(
header.writeInt32LE(length, 2);
// push chunk
self.push(chunk);
bytesRemaining -= chunk.length;
bytesRemainingForLOBs -= chunk.length;
// stop appending if there is no remaining space
if (bytesRemaining === 0) {
if (bytesRemainingForLOBs === 0) {
cleanup();
// finalize lob if the stream has already ended
// because of cleanup we don't get end event in this case
Expand Down Expand Up @@ -210,7 +210,7 @@ Writer.prototype.finializeParameters = function finializeParameters(
next();
};

Writer.prototype.getParameters = function getParameters(bytesAvailable, cb) {
Writer.prototype.getParameters = function getParameters(bytesAvailableForLOBs, cb) {
var self = this;

function done(err) {
Expand All @@ -223,8 +223,8 @@ Writer.prototype.getParameters = function getParameters(bytesAvailable, cb) {
cb(null, buffer);
});
}
var bytesRemaining = bytesAvailable - this._bytesWritten;
this.finializeParameters(bytesRemaining, done);
var bytesRemainingForLOBs = bytesAvailableForLOBs - this._bytesWritten;
this.finializeParameters(bytesRemainingForLOBs, done);
};

Writer.prototype.finalizeWriteLobRequest = function finalizeWriteLobRequest(
Expand Down
2 changes: 1 addition & 1 deletion lib/protocol/common/Constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ module.exports = {
PACKET_HEADER_LENGTH: 32,
SEGMENT_HEADER_LENGTH: 24,
PART_HEADER_LENGTH: 16,
MAX_PACKET_SIZE: Math.pow(2, 17),
DEFAULT_PACKET_SIZE: Math.pow(2, 17),
MAX_RESULT_SET_SIZE: Math.pow(2, 20),
EMPTY_BUFFER: new Buffer(0),
DATA_LENGTH_MAX1BYTE_LENGTH: 245,
Expand Down
4 changes: 2 additions & 2 deletions lib/protocol/reply/Segment.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ var common = require('../common');
var FunctionCode = common.FunctionCode;
var SegmentKind = common.SegmentKind;
var PartKindName = common.PartKindName;
var MAX_PACKET_SIZE = common.MAX_PACKET_SIZE;
var PACKET_HEADER_LENGTH = common.PACKET_HEADER_LENGTH;
var DEFAULT_SEGMENT_SIZE = common.DEFAULT_PACKET_SIZE - PACKET_HEADER_LENGTH;
var SEGMENT_HEADER_LENGTH = common.SEGMENT_HEADER_LENGTH;

module.exports = Segment;
Expand Down Expand Up @@ -82,7 +82,7 @@ function readSegment(buffer, offset) {
}

Segment.prototype.toBuffer = function toBuffer(size) {
size = size || (MAX_PACKET_SIZE - PACKET_HEADER_LENGTH);
size = size || DEFAULT_SEGMENT_SIZE;
var remainingSize = size - SEGMENT_HEADER_LENGTH;
var length = SEGMENT_HEADER_LENGTH;
var buffers = this.parts.map(function getPartBuffer(part) {
Expand Down
6 changes: 3 additions & 3 deletions lib/protocol/request/Segment.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ var common = require('../common');
var Part = require('./Part');
var MessageType = common.MessageType;
var SegmentKind = common.SegmentKind;

var MAX_SEGMENT_SIZE = common.MAX_PACKET_SIZE - common.PACKET_HEADER_LENGTH;
var PACKET_HEADER_LENGTH = common.PACKET_HEADER_LENGTH;
var DEFAULT_SEGMENT_SIZE = common.DEFAULT_PACKET_SIZE - PACKET_HEADER_LENGTH;
var SEGMENT_HEADER_LENGTH = common.SEGMENT_HEADER_LENGTH;

module.exports = Segment;
Expand Down Expand Up @@ -73,7 +73,7 @@ Segment.prototype.add = function add(kind, args) {
};

Segment.prototype.toBuffer = function toBuffer(size) {
size = size || MAX_SEGMENT_SIZE;
size = size || DEFAULT_SEGMENT_SIZE;
var remainingSize = size - SEGMENT_HEADER_LENGTH;
var length = SEGMENT_HEADER_LENGTH;
var buffers = [];
Expand Down