From 8c10a7ae5fc33a54adfe77483237a432b8fbab58 Mon Sep 17 00:00:00 2001 From: Amar Zavery Date: Mon, 17 Sep 2018 12:02:33 -0700 Subject: [PATCH] test updates and add overview --- processor/README.md | 3 +- processor/examples/singleEph.ts | 8 +- processor/overview.md | 136 +++++++ processor/tests/eph.spec.ts | 596 ++++++++++++++++--------------- processor/tests/iothub.spec.ts | 59 +-- processor/tests/negative.spec.ts | 57 +-- processor/tests/retry.spec.ts | 241 +++++++------ 7 files changed, 646 insertions(+), 454 deletions(-) create mode 100644 processor/overview.md diff --git a/processor/README.md b/processor/README.md index 5e9b226d67a6..d2b2a37ca0ee 100644 --- a/processor/README.md +++ b/processor/README.md @@ -9,7 +9,8 @@ Azure Storage Blob. This makes it easy to continue receiving messages from where #### Conceptual Overview ![overview](https://raw.githubusercontent.com/Azure/azure-event-hubs-node/master/processor/eph.png) -More information about Azure Event Processor Host can be found over [here](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-event-processor-host). +- More information about Azure Event Processor Host can be found over [here](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-event-processor-host). +- General overview of how the Event Processor Host SDK works internally can be found over [here](https://raw.githubusercontent.com/Azure/azure-event-hubs-node/master/processor/overview.md). ## Pre-requisite ## - **Node.js version: 8.x or higher.** We would encourage you to install the latest available LTS version at any given time from https://nodejs.org. **Please do not use older LTS versions of node.js.** diff --git a/processor/examples/singleEph.ts b/processor/examples/singleEph.ts index 826a19d35b52..f146915e707b 100644 --- a/processor/examples/singleEph.ts +++ b/processor/examples/singleEph.ts @@ -72,16 +72,16 @@ async function startEph(ephName: string): Promise { partionCount[context.partitionId], context.partitionId, data.offset); // Checkpointing every 100th event received for a given partition. if (partionCount[context.partitionId] % 100 === 0) { + const num = partionCount[context.partitionId]; try { console.log("***** [%s] Number of partitions: %O", ephName, eph.receivingFromPartitions.length); console.log("***** [%s] EPH is currently receiving messages from partitions: %s", ephName, eph.receivingFromPartitions.toString()); + console.log("$$$$ [%s] Attempting to checkpoint message number %d", ephName, num); await context.checkpoint(); - console.log("$$$$ [%s] Successfully checkpointed message number %d", ephName, - partionCount[context.partitionId]); + console.log("$$$$ [%s] Successfully checkpointed message number %d", ephName, num); } catch (err) { - console.log(">>>>>>> [%s] An error occurred while checkpointing msg number %d: %O", - ephName, partionCount[context.partitionId], err); + console.log(">>>>> [%s] An error occurred while checkpointing msg number %d: %O", ephName, num, err); } } }; diff --git a/processor/overview.md b/processor/overview.md new file mode 100644 index 000000000000..2a2f821d870a --- /dev/null +++ b/processor/overview.md @@ -0,0 +1,136 @@ +# General Overview of Microsoft Azure Event Processor Host (@azure/event-processor-host) for JS + +Event Processor Host is built on top of the Microsoft Azure Event Hubs Client `@azure/event-hubs` for JS and provides a number of features not present in that lower layer: + +1. Event Processor Host removes the need to think about partitions. By default, it creates one instance of the event + hub client for each partition. Each instance will only ever handle + events from one partition, further simplifying the processing code. +2. Event Processor Host allows easy load balancing. Utilizing a shared persistent store for leases on partitions + (by default based on Azure Storage), instances of Event Processor Host receiving from the same consumer group + of the same Event Hub can be spread across multiple machines and partitions will be distributed across those + machines as evenly as possible. These instances can be started and stopped at any time, and partitions will be + redistributed as needed. It is even allowed to have more instances than partitions as a form of hot standby. (Note that + partition distribution is based solely on the number of partitions per instance, not event flow rate or any other metric.) +3. Event Processor Host allows the event processor to create a persistent "checkpoint" that describes a position in + the partition's event stream, and if restarted it automatically begins receiving at the next event after the checkpoint. + Because checkpointing is usually an expensive operation, it is up to you to create + them from within the `onMessage` hander, at whatever interval is suitable for your application. For example, an application with relatively infrequent messages might checkpoint after processing each one, whereas an application that requires high performance in the processing code in order to keep up with event flow might checkpoint once every hundred messages, or once + per second. + +## Using Event Processor Host + +### Step 1: Instantiate the Event Processor Host and provide a general error notification handler + +Instantiate the EPH using one of the many static methods that is the best fit for you. You can also +provide a general error notification handler. It will let you know about internal errors that happen +while managing partitions. + +```js +import { + EventProcessorHost, OnReceivedError, OnReceivedMessage, EventData, PartitionContext, delay +} from "@azure/event-processor-host"; + +const path = process.env.EVENTHUB_NAME; +const storageCS = process.env.STORAGE_CONNECTION_STRING; // you can get this from https://portal.azure.com +const ehCS = process.env.EVENTHUB_CONNECTION_STRING; +// creates a unique storageContainer name for every run +// if you wish to keep the name same between different runs then use the following then that is fine as well. +const storageContainerName = EventProcessorHost.createHostName("test-container"); +const ephName = "my-eph"; + +// Create the Event Processo Host +const eph = EventProcessorHost.createFromConnectionString( + EventProcessorHost.createHostName(ephName), + storageCS!, + storageContainerName, + ehCS!, + { + eventHubPath: path, + onEphError: (error) => { + console.log(">>>>>>> [%s] Error: %O", ephName, error); + } + } +); +``` + +### Step 2: Implement the message handler and the error handler and start the EPH +The `onMessage` handler processes all the received events from different partitions. It provides, +the partition context and the EventData. PartitionContext provides the means to create a checkpoint for the partition. Please make sure to checkpoint within a `try/catch` block. + +```js +const onMessage: OnReceivedMessage = async (context: PartitionContext, data: EventData) => { + console.log("##### [%s] - Rx message from partition: '%s', offset: '%s'", ephName, context.partitionId, data.offset); + // Checkpointing every 100th event received for a given partition. + if (partionCount[context.partitionId] % 100 === 0) { + const num = partionCount[context.partitionId]; + try { + console.log("$$$$ [%s] Attempting to checkpoint message number %d", ephName, num); + await context.checkpoint(); + console.log("$$$$ [%s] Successfully checkpointed message number %d", ephName, num); + } catch (err) { + console.log(">>>>> [%s] An error occurred while checkpointing msg number %d: %O", ephName, num, err); + } + } +}; + +// Error handler +const onError: OnReceivedError = (error) => { + console.log(">>>>> [%s] Received Error: %O", ephName, error); +}; + +try { + await eph.start(onMessage, onError); +} catch (err) { + console.log("An error occurred while starting the EPH: %O", err); +} +``` +### Step 3: Graceful Shutdown + +```js +try { + await eph.stop(); + console.log(">>>>>> Successfully stopped the EPH - '%s'.", eph.hostName); +} catch (err) { + console.log("An error occurred while stopping the EPH: %O", err); +} +``` + +## Checkpointing, Partition Ownership, and Reprocessing Messages + +In a system using Event Processor Host, there are one or more hosts processing events from a particular event hub+consumer group combination, and ownership of the partitions of the event hub are split up between the hosts. When a host takes ownership of a partition, it starts a receiver on that partition, and when doing so it must specify the position in the stream of events at which the receiver will begin consuming. If there is a checkpoint for that event hub+consumer group+partition combination available via the checkpoint manager (by default, in Azure Storage), the receiver will begin consuming at the position indicated by the checkpoint. + +Any time a host takes ownership of a partition, reprocessing of events may occur. Exactly how many messages may be reprocessed depends on how often checkpoints are written. Writing a checkpoint with the default checkpoint manager is expensive, since it makes at least one HTTPS call to Azure Storage. The obvious strategy to minimize reprocessing of events is to checkpoint after processing each event, but we advise against this due to the performance hit. +In a low-throughput scenario it may be OK, but as the event rate goes up, checkpointing too often could prevent a processor from being able to keep up with the flow. Also, event checkpointing after each event cannot completely prevent event reprocessing, since there will always be some time between finishing +processing and writing the checkpoint, during which the processor could fail. Customer applications must be able to detect and handle some amount of reprocessing, and the customer needs to study their particular scenario and application to balance the cost of handling the reprocessing against the performance hit of checkpointing more frequently. + +What can cause ownership of a partition to change: +1. Bringing a host online: it will steal ownership of partitions from already-running hosts until the distribution of partitions among hosts is as even as possible. +2. A host crashing/losing power/losing network connection/going offline for any reason: the leases on the partitions that the downed host owned will expire and the remaining hosts will find the expired leases and take ownership. This may result in unbalanced distribution to start with which will cause additional ownership changes until the distribution is balanced. +3. Azure Storage latency or failures which result in a partition lease expiring because it cannot be renewed in time: other hosts (or even the same host) will find the expired lease and take ownership. Again, this can result in unbalanced distribution and additional ownership changes. This scenario can occur even if there is only one host. +4. Certain event hub client errors can cause the processor for a partition to shut down, with the same effects as case 3. This scenario can also occur even with only one host. + +## Internal working of Event Processor Host + +EventHubs supports creating receivers with an `epoch value`. Epoch is of type `number`. At any given point in time, the receiver with the `highest epoch` value can receive messages from an EventHub for a given partition. Already connected receivers with `lower epoch` value or receivers `without an epoch` value will be disconnected. This ensures, that at any given time, there is `only one` receiver receiving messages from a partition in an EventHub. EPH makes use of this key functionality to receive messages from an EventHub. + +### Lease Acquisition: +Whenever a lease is acquired for the specified leaseDuration (default 30 seconds), it reads the epoch value and the offset from the blob. It creates the receiver with a higher epoch value (+1) than what was read from the blob, and with an offset read from the blob as the starting point. If there is no offset in the blob, then it starts from the beginning (-1, default value). + +### Lease Renewal: +While it is receiving messages, it keeps on renewing the lease at a regular interval (default 10 seconds). If an error occurs while renewing the lease, then it simply logs the error. It does not disconnect the receiver, since the receiver will be automatically disconnected when the lease expires or someone steals the lease. + +The EPH will keep on scanning across all the partitions at some interval. If it was able to steal leases, in the previous scan then it will sleep for lesser time before scanning again. If it did not steal any leases in the previous scan then it sleeps for more time before scanning again. + +When a new instance of EPH comes online, it starts scanning partitions by reading the contents of the LeaseStore. This helps the EPH understand the state of things in it's world. + +During each scan: +- It tries to find the number of unique hosts. This helps the EPH determine the ideal number of leases (1 per partition) that it should aim for. +- If the number of leases that it owns is less than the desired count, then it attempts to acquire any expired leases (done concurrently, to make full use of async nature of node.js) + - If it is able to acquire the lease, then it starts the receiver as described above. +- If it still needs more leases, then it will try stealing leases + - It decides to steal leases only from those owners that own leases more than the desired count. + - It randomly picks one of the biggest owners and tries to steal the desired number of leases from that owner. +- While stealing leases (done concurrently, to make full use of async nature of node.js) + - If the lease is successfully stolen, then it starts the receiver as described above. This means that some other EPH instance's lease was lost. That EPH instance would have received a disconnect error from that receiver and the receiver would be closed. + - Else, logs an error that it was not able to steal the lease and does nothing about it +- If it does not steal any leases, it returns false. This drives the amount of time, the EPH will sleep before it starts scanning again. \ No newline at end of file diff --git a/processor/tests/eph.spec.ts b/processor/tests/eph.spec.ts index e6456c666d91..9549eaa41538 100644 --- a/processor/tests/eph.spec.ts +++ b/processor/tests/eph.spec.ts @@ -77,7 +77,7 @@ describe("EPH", function () { describe("single", function () { it("should checkpoint messages in order", function (done) { - const func = async () => { + const test = async () => { host = EventProcessorHost.createFromConnectionString( EventProcessorHost.createHostName(), storageConnString!, @@ -96,6 +96,7 @@ describe("EPH", function () { } const ehc = EventHubClient.createFromConnectionString(ehConnString!, hubName!); await ehc.sendBatch(datas, "0"); + await ehc.close(); debug("Sent batch message successfully"); let num = 0; let offset = "0"; @@ -137,7 +138,7 @@ describe("EPH", function () { content.sequenceNumber.should.equal(sequence); await host.stop(); } - func().then(() => { done(); }).catch((err) => { done(err); }); + test().then(() => { done(); }).catch((err) => { done(err); }); }); it("should checkpoint a single received event.", function (done) { @@ -191,293 +192,321 @@ describe("EPH", function () { }); }); - it("should be able to receive messages from the checkpointed offset.", async function () { - const msgId = uuid(); - const ehc = EventHubClient.createFromConnectionString(ehConnString!, hubName!); - const leasecontainerName = EventProcessorHost.createHostName("tc"); - debug(">>>>> Lease container name: %s", leasecontainerName); - async function sendAcrossAllPartitions(ehc: EventHubClient, ids: string[]): Promise> { - const result: Promise[] = []; - const idMessage: Dictionary = {}; - for (const id of ids) { - const data = { body: "Test Message - " + id, properties: { message_id: msgId } }; - idMessage[id] = data; - result.push(ehc.send(data, id)); + it("should be able to receive messages from the checkpointed offset.", function (done) { + const test = async () => { + const msgId = uuid(); + const ehc = EventHubClient.createFromConnectionString(ehConnString!, hubName!); + const leasecontainerName = EventProcessorHost.createHostName("tc"); + debug(">>>>> Lease container name: %s", leasecontainerName); + async function sendAcrossAllPartitions(ehc: EventHubClient, ids: string[]): Promise> { + const result: Promise[] = []; + const idMessage: Dictionary = {}; + for (const id of ids) { + const data = { body: "Test Message - " + id, properties: { message_id: msgId } }; + idMessage[id] = data; + result.push(ehc.send(data, id)); + } + await Promise.all(result); + debug(">>>> Successfully finished sending messages.. %O", idMessage); + return idMessage; } - await Promise.all(result); - debug(">>>> Successfully finished sending messages.. %O", idMessage); - return idMessage; - } - const ids = await ehc.getPartitionIds(); - debug(">>> Received partition ids: ", ids); - host = EventProcessorHost.createFromConnectionString( - "my-eph-1", - storageConnString!, - leasecontainerName, - ehConnString!, - { - eventHubPath: hubName!, - initialOffset: EventPosition.fromEnqueuedTime(Date.now()), - startupScanDelay: 15, - leaseRenewInterval: 5, - leaseDuration: 15 + const ids = await ehc.getPartitionIds(); + debug(">>> Received partition ids: ", ids); + host = EventProcessorHost.createFromConnectionString( + "my-eph-1", + storageConnString!, + leasecontainerName, + ehConnString!, + { + eventHubPath: hubName!, + initialOffset: EventPosition.fromEnqueuedTime(Date.now()), + startupScanDelay: 15, + leaseRenewInterval: 5, + leaseDuration: 15 + } + ); + await delay(1000); + debug(">>>>> Sending the first set of test messages..."); + const firstSend = await sendAcrossAllPartitions(ehc, ids); + let count = 0; + const onMessage: OnReceivedMessage = async (context: PartitionContext, data: EventData) => { + const partitionId = context.partitionId + debug(">>>>> Rx message from '%s': '%o'", partitionId, data); + if (data.properties!.message_id === firstSend[partitionId].properties!.message_id) { + debug(">>>> Checkpointing the received message..."); + await context.checkpoint(); + count++; + } else { + const msg = `Sent message id '${data.properties!.message_id}' did not match the ` + + `received message id '${firstSend[partitionId].properties!.message_id}' for ` + + `partitionId '${partitionId}'.` + throw new Error(msg); + } + }; + const onError: OnReceivedError = (err) => { + debug("An error occurred while receiving the message: %O", err); + throw err; + }; + debug(">>>> Starting my-eph-1"); + await host.start(onMessage, onError); + while (count < ids.length) { + await delay(10000); + debug(">>>> number of partitionIds: %d, count: %d", ids.length, count); } - ); - await delay(1000); - debug(">>>>> Sending the first set of test messages..."); - const firstSend = await sendAcrossAllPartitions(ehc, ids); - let count = 0; - const onMessage: OnReceivedMessage = async (context: PartitionContext, data: EventData) => { - const partitionId = context.partitionId - debug(">>>>> Rx message from '%s': '%o'", partitionId, data); - if (data.properties!.message_id === firstSend[partitionId].properties!.message_id) { - debug(">>>> Checkpointing the received message..."); - await context.checkpoint(); - count++; - } else { - const msg = `Sent message id '${data.properties!.message_id}' did not match the ` + - `received message id '${firstSend[partitionId].properties!.message_id}' for ` + - `partitionId '${partitionId}'.` - throw new Error(msg); + await host.stop(); + + debug(">>>> Restarting the same host. This time the initial offset should be ignored, and " + + "the EventPosition should be from the checkpointed offset.."); + debug(">>>>> Sending the second set of test messages..."); + const secondSend = await sendAcrossAllPartitions(ehc, ids); + let count2 = 0; + const onMessage2: OnReceivedMessage = async (context: PartitionContext, data: EventData) => { + const partitionId = context.partitionId + debug(">>>>> Rx message from '%s': '%s'", partitionId, data); + if (data.properties!.message_id === secondSend[partitionId].properties!.message_id) { + debug(">>>> Checkpointing the received message..."); + await context.checkpoint(); + count2++; + } else { + const msg = `Sent message id '${data.properties!.message_id}' did not match the ` + + `received message id '${secondSend[partitionId].properties!.message_id}' for ` + + `partitionId '${partitionId}'.` + throw new Error(msg); + } + }; + const onError2: OnReceivedError = (err) => { + debug("An error occurred while receiving the message: %O", err); + throw err; + }; + debug(">>>> Starting my-eph-2"); + await host.start(onMessage2, onError2); + while (count2 < ids.length) { + await delay(10000); + debug(">>>> number of partitionIds: %d, count: %d", ids.length, count); } - }; - const onError: OnReceivedError = (err) => { - debug("An error occurred while receiving the message: %O", err); - throw err; - }; - debug(">>>> Starting my-eph-1"); - await host.start(onMessage, onError); - while (count < ids.length) { + debug(">>>>>> sleeping for 10 more seconds...."); await delay(10000); - debug(">>>> number of partitionIds: %d, count: %d", ids.length, count); - } - await host.stop(); - - debug(">>>> Restarting the same host. This time the initial offset should be ignored, and " + - "the EventPosition should be from the checkpointed offset.."); - debug(">>>>> Sending the second set of test messages..."); - const secondSend = await sendAcrossAllPartitions(ehc, ids); - let count2 = 0; - const onMessage2: OnReceivedMessage = async (context: PartitionContext, data: EventData) => { - const partitionId = context.partitionId - debug(">>>>> Rx message from '%s': '%s'", partitionId, data); - if (data.properties!.message_id === secondSend[partitionId].properties!.message_id) { - debug(">>>> Checkpointing the received message..."); - await context.checkpoint(); - count2++; - } else { - const msg = `Sent message id '${data.properties!.message_id}' did not match the ` + - `received message id '${secondSend[partitionId].properties!.message_id}' for ` + - `partitionId '${partitionId}'.` - throw new Error(msg); + await host.stop(); + await ehc.close(); + if (count2 > ids.length) { + throw new Error("We received more messages than we were expecting..."); } }; - const onError2: OnReceivedError = (err) => { - debug("An error occurred while receiving the message: %O", err); - throw err; - }; - debug(">>>> Starting my-eph-2"); - await host.start(onMessage2, onError2); - while (count2 < ids.length) { - await delay(10000); - debug(">>>> number of partitionIds: %d, count: %d", ids.length, count); - } - debug(">>>>>> sleeping for 10 more seconds...."); - await delay(10000); - await host.stop(); - if (count2 > ids.length) { - throw new Error("We received more messages than we were expecting..."); - } + test().then(() => { done(); }).catch((err) => { done(err); }); }); }); describe("multiple", function () { - it("should be able to run multiple eph successfully.", async function () { - const ehc = EventHubClient.createFromConnectionString(ehConnString!, hubName!); - const containerName: string = `sharedhost-${uuid()}`; - const now = Date.now(); - const hostByName: Dictionary = {}; - const sendDataByPartition: Dictionary = {}; - const getReceivingFromPartitionsForAllEph = (): Dictionary => { - const receivingPartitionsByHost: Dictionary = {}; - for (const hostName in hostByName) { - receivingPartitionsByHost[hostName] = hostByName[hostName].receivingFromPartitions; - } - debug(">>> EPH -> Partitions: \n%O", receivingPartitionsByHost); - return receivingPartitionsByHost; - }; + it("should be able to run multiple eph successfully.", function (done) { + const test = async () => { + const ehc = EventHubClient.createFromConnectionString(ehConnString!, hubName!); + const containerName: string = `sharedhost-${uuid()}`; + const now = Date.now(); + const hostByName: Dictionary = {}; + const sendDataByPartition: Dictionary = {}; + const getReceivingFromPartitionsForAllEph = (): Dictionary => { + const receivingPartitionsByHost: Dictionary = {}; + for (const hostName in hostByName) { + receivingPartitionsByHost[hostName] = hostByName[hostName].receivingFromPartitions; + } + debug(">>> EPH -> Partitions: \n%O", receivingPartitionsByHost); + return receivingPartitionsByHost; + }; - const sendEvents = async (ids: string[]) => { - for (let i = 0; i < ids.length; i++) { - const data: EventData = { - body: `Hello World - ${ids[i]}!!` + const sendEvents = async (ids: string[]) => { + for (let i = 0; i < ids.length; i++) { + const data: EventData = { + body: `Hello World - ${ids[i]}!!` + } + sendDataByPartition[ids[i]] = data; + await ehc.send(data, ids[i]); + debug(">>> Sent data to partition: %s", ids[i]); } - sendDataByPartition[ids[i]] = data; - await ehc.send(data, ids[i]); - debug(">>> Sent data to partition: %s", ids[i]); + }; + + const ids = await ehc.getPartitionIds(); + for (let i = 0; i < ids.length; i++) { + const hostName = `host-${i}`; + hostByName[hostName] = EventProcessorHost.createFromConnectionString( + hostName, + storageConnString!, + containerName, + ehConnString!, + { + eventHubPath: hubName!, + initialOffset: EventPosition.fromEnqueuedTime(now), + } + ); + + const onError: OnReceivedError = (error: Error) => { + debug(`>>> [%s] Received error: %O`, hostName, error); + throw error; + }; + const onMessage: OnReceivedMessage = (context: PartitionContext, data: EventData) => { + debug(">>> [%s] Rx message from '%s': '%O'", hostName, context.partitionId, data); + should.equal(sendDataByPartition[context.partitionId].body, data.body); + }; + hostByName[hostName].start(onMessage, onError); + debug(">>> Sleeping for 8 seconds after starting %s.", hostName); + await delay(8000); + debug(">>> [%s] currently receiving messages from partitions : %o", hostName, + hostByName[hostName].receivingFromPartitions); + } + debug(">>> Sleeping for another 15 seconds.") + await delay(15000); + const hostToPartition = getReceivingFromPartitionsForAllEph(); + for (const host in hostToPartition) { + should.equal(Array.isArray(hostToPartition[host]), true); + hostToPartition[host].length.should.eql(1); + } + await sendEvents(ids); + await delay(5000); + await ehc.close(); + for (const host in hostByName) { + await hostByName[host].stop(); } }; + test().then(() => { done(); }).catch((err) => { done(err); }); + }); + }); - const ids = await ehc.getPartitionIds(); - for (let i = 0; i < ids.length; i++) { - const hostName = `host-${i}`; - hostByName[hostName] = EventProcessorHost.createFromConnectionString( - hostName, + describe("runtimeInfo", function () { + it("should get hub runtime info correctly", function (done) { + const test = async () => { + host = EventProcessorHost.createFromConnectionString( + EventProcessorHost.createHostName(), storageConnString!, - containerName, + EventProcessorHost.createHostName("single"), ehConnString!, { eventHubPath: hubName!, - initialOffset: EventPosition.fromEnqueuedTime(now), + initialOffset: EventPosition.fromEnqueuedTime(Date.now()) } ); - - const onError: OnReceivedError = (error: Error) => { - debug(`>>> [%s] Received error: %O`, hostName, error); - throw error; - }; - const onMessage: OnReceivedMessage = (context: PartitionContext, data: EventData) => { - debug(">>> [%s] Rx message from '%s': '%O'", hostName, context.partitionId, data); - should.equal(sendDataByPartition[context.partitionId].body, data.body); - }; - hostByName[hostName].start(onMessage, onError); - debug(">>> Sleeping for 8 seconds after starting %s.", hostName); - await delay(8000); - debug(">>> [%s] currently receiving messages from partitions : %o", hostName, - hostByName[hostName].receivingFromPartitions); - } - debug(">>> Sleeping for another 15 seconds.") - await delay(15000); - const hostToPartition = getReceivingFromPartitionsForAllEph(); - for (const host in hostToPartition) { - should.equal(Array.isArray(hostToPartition[host]), true); - hostToPartition[host].length.should.eql(1); - } - await sendEvents(ids); - await delay(5000); - await ehc.close(); - for (const host in hostByName) { - await hostByName[host].stop(); - } - }); - }); - - describe("runtimeInfo", function () { - it("should get hub runtime info correctly", async function () { - host = EventProcessorHost.createFromConnectionString( - EventProcessorHost.createHostName(), - storageConnString!, - EventProcessorHost.createHostName("single"), - ehConnString!, - { - eventHubPath: hubName!, - initialOffset: EventPosition.fromEnqueuedTime(Date.now()) - } - ); - const hubRuntimeInfo = await host.getHubRuntimeInformation(); - should.equal(Array.isArray(hubRuntimeInfo.partitionIds), true); - should.equal(typeof hubRuntimeInfo.partitionCount, "number"); + const hubRuntimeInfo = await host.getHubRuntimeInformation(); + should.equal(Array.isArray(hubRuntimeInfo.partitionIds), true); + should.equal(typeof hubRuntimeInfo.partitionCount, "number"); + await host.stop(); + }; + test().then(() => { done(); }).catch((err) => { done(err); }); }); - it("should get partition runtime info correctly with partitionId as string", async function () { - host = EventProcessorHost.createFromConnectionString( - EventProcessorHost.createHostName(), - storageConnString!, - EventProcessorHost.createHostName("single"), - ehConnString!, - { - eventHubPath: hubName!, - initialOffset: EventPosition.fromEnqueuedTime(Date.now()) - } - ); - const partitionInfo = await host.getPartitionInformation("0"); - debug(">>> partitionInfo: %o", partitionInfo); - partitionInfo.partitionId.should.equal("0"); - partitionInfo.type.should.equal("com.microsoft:partition"); - partitionInfo.hubPath.should.equal(hubName); - partitionInfo.lastEnqueuedTimeUtc.should.be.instanceof(Date); - should.exist(partitionInfo.lastSequenceNumber); - should.exist(partitionInfo.lastEnqueuedOffset); + it("should get partition runtime info correctly with partitionId as string", function (done) { + const test = async () => { + host = EventProcessorHost.createFromConnectionString( + EventProcessorHost.createHostName(), + storageConnString!, + EventProcessorHost.createHostName("single"), + ehConnString!, + { + eventHubPath: hubName!, + initialOffset: EventPosition.fromEnqueuedTime(Date.now()) + } + ); + const partitionInfo = await host.getPartitionInformation("0"); + debug(">>> partitionInfo: %o", partitionInfo); + partitionInfo.partitionId.should.equal("0"); + partitionInfo.type.should.equal("com.microsoft:partition"); + partitionInfo.hubPath.should.equal(hubName); + partitionInfo.lastEnqueuedTimeUtc.should.be.instanceof(Date); + should.exist(partitionInfo.lastSequenceNumber); + should.exist(partitionInfo.lastEnqueuedOffset); + await host.stop(); + }; + test().then(() => { done(); }).catch((err) => { done(err); }); }); - it("should get partition runtime info correctly with partitionId as number", async function () { - host = EventProcessorHost.createFromConnectionString( - EventProcessorHost.createHostName(), - storageConnString!, - EventProcessorHost.createHostName("single"), - ehConnString!, - { - eventHubPath: hubName!, - initialOffset: EventPosition.fromEnqueuedTime(Date.now()) - } - ); - const partitionInfo = await host.getPartitionInformation(0); - partitionInfo.partitionId.should.equal("0"); - partitionInfo.type.should.equal("com.microsoft:partition"); - partitionInfo.hubPath.should.equal(hubName); - partitionInfo.lastEnqueuedTimeUtc.should.be.instanceof(Date); - should.exist(partitionInfo.lastSequenceNumber); - should.exist(partitionInfo.lastEnqueuedOffset); + it("should get partition runtime info correctly with partitionId as number", function (done) { + const test = async () => { + host = EventProcessorHost.createFromConnectionString( + EventProcessorHost.createHostName(), + storageConnString!, + EventProcessorHost.createHostName("single"), + ehConnString!, + { + eventHubPath: hubName!, + initialOffset: EventPosition.fromEnqueuedTime(Date.now()) + } + ); + const partitionInfo = await host.getPartitionInformation(0); + partitionInfo.partitionId.should.equal("0"); + partitionInfo.type.should.equal("com.microsoft:partition"); + partitionInfo.hubPath.should.equal(hubName); + partitionInfo.lastEnqueuedTimeUtc.should.be.instanceof(Date); + should.exist(partitionInfo.lastSequenceNumber); + should.exist(partitionInfo.lastEnqueuedOffset); + await host.stop(); + }; + test().then(() => { done(); }).catch((err) => { done(err); }); }); - it("should fail getting partition information when partitionId is not a string or number", async function () { - host = EventProcessorHost.createFromConnectionString( - EventProcessorHost.createHostName(), - storageConnString!, - EventProcessorHost.createHostName("single"), - ehConnString!, - { - eventHubPath: hubName!, - initialOffset: EventPosition.fromEnqueuedTime(Date.now()) + it("should fail getting partition information when partitionId is not a string or number", function (done) { + const test = async () => { + host = EventProcessorHost.createFromConnectionString( + EventProcessorHost.createHostName(), + storageConnString!, + EventProcessorHost.createHostName("single"), + ehConnString!, + { + eventHubPath: hubName!, + initialOffset: EventPosition.fromEnqueuedTime(Date.now()) + } + ); + try { + await host.getPartitionInformation(false as any); + } catch (err) { + err.message.should.equal("'partitionId' is a required parameter and must be of type: 'string' | 'number'."); } - ); - try { - await host.getPartitionInformation(false as any); - } catch (err) { - err.message.should.equal("'partitionId' is a required parameter and must be of type: 'string' | 'number'."); - } + }; + test().then(() => { done(); }).catch((err) => { done(err); }); }); - it("should fail getting partition information when partitionId is empty string", async function () { - host = EventProcessorHost.createFromConnectionString( - EventProcessorHost.createHostName(), - storageConnString!, - EventProcessorHost.createHostName("single"), - ehConnString!, - { - eventHubPath: hubName!, - initialOffset: EventPosition.fromEnqueuedTime(Date.now()) + it("should fail getting partition information when partitionId is empty string", function (done) { + const test = async () => { + host = EventProcessorHost.createFromConnectionString( + EventProcessorHost.createHostName(), + storageConnString!, + EventProcessorHost.createHostName("single"), + ehConnString!, + { + eventHubPath: hubName!, + initialOffset: EventPosition.fromEnqueuedTime(Date.now()) + } + ); + try { + await host.getPartitionInformation(""); + } catch (err) { + err.message.should.match(/.*The specified partition is invalid for an EventHub partition sender or receiver.*/ig); } - ); - try { - await host.getPartitionInformation(""); - } catch (err) { - err.message.should.match(/.*The specified partition is invalid for an EventHub partition sender or receiver.*/ig); - } + }; + test().then(() => { done(); }).catch((err) => { done(err); }); }); - it("should fail getting partition information when partitionId is a negative number", async function () { - host = EventProcessorHost.createFromConnectionString( - EventProcessorHost.createHostName(), - storageConnString!, - EventProcessorHost.createHostName("single"), - ehConnString!, - { - eventHubPath: hubName!, - initialOffset: EventPosition.fromEnqueuedTime(Date.now()) + it("should fail getting partition information when partitionId is a negative number", function (done) { + const test = async () => { + host = EventProcessorHost.createFromConnectionString( + EventProcessorHost.createHostName(), + storageConnString!, + EventProcessorHost.createHostName("single"), + ehConnString!, + { + eventHubPath: hubName!, + initialOffset: EventPosition.fromEnqueuedTime(Date.now()) + } + ); + try { + await host.getPartitionInformation(-1); + } catch (err) { + err.message.should.match(/.*The specified partition is invalid for an EventHub partition sender or receiver.*/ig); } - ); - try { - await host.getPartitionInformation(-1); - } catch (err) { - err.message.should.match(/.*The specified partition is invalid for an EventHub partition sender or receiver.*/ig); - } + }; + test().then(() => { done(); }).catch((err) => { done(err); }); }); }); describe("options", function () { - it("should throw an error if the event hub name is neither provided in the connection string and nor in the options object", function () { + it("should throw an error if the event hub name is neither provided in the connection string and nor in the options object", function (done) { try { const ehc = "Endpoint=sb://foo.bar.baz.net/;SharedAccessKeyName=somekey;SharedAccessKey=somesecret" EventProcessorHost.createFromConnectionString( @@ -492,40 +521,49 @@ describe("EPH", function () { } catch (err) { should.exist(err); err.message.match(/.*Either provide "path" or the "connectionString": "Endpoint=sb:\/\/foo\.bar\.baz\.net\/;SharedAccessKeyName=somekey;SharedAccessKey=somesecret", must contain EntityPath=".*"/ig); + done(); } }); - it("should get hub runtime info correctly when eventhub name is present in connection string but not as an option in the options object.", async function () { - host = EventProcessorHost.createFromConnectionString( - EventProcessorHost.createHostName(), - storageConnString!, - EventProcessorHost.createHostName("single"), - `${ehConnString!};EntityPath=${hubName!}`, - { - initialOffset: EventPosition.fromEnqueuedTime(Date.now()) - } - ); - const hubRuntimeInfo = await host.getHubRuntimeInformation(); - hubRuntimeInfo.path.should.equal(hubName); - should.equal(Array.isArray(hubRuntimeInfo.partitionIds), true); - should.equal(typeof hubRuntimeInfo.partitionCount, "number"); + it("should get hub runtime info correctly when eventhub name is present in connection string but not as an option in the options object.", function (done) { + const test = async () => { + host = EventProcessorHost.createFromConnectionString( + EventProcessorHost.createHostName(), + storageConnString!, + EventProcessorHost.createHostName("single"), + `${ehConnString!};EntityPath=${hubName!}`, + { + initialOffset: EventPosition.fromEnqueuedTime(Date.now()) + } + ); + const hubRuntimeInfo = await host.getHubRuntimeInformation(); + hubRuntimeInfo.path.should.equal(hubName); + should.equal(Array.isArray(hubRuntimeInfo.partitionIds), true); + should.equal(typeof hubRuntimeInfo.partitionCount, "number"); + await host.stop(); + }; + test().then(() => { done(); }).catch((err) => { done(err); }); }); - it("when eventhub name is present in connection string and in the options object, the one in options object is selected.", async function () { - host = EventProcessorHost.createFromConnectionString( - EventProcessorHost.createHostName(), - storageConnString!, - EventProcessorHost.createHostName("single"), - `${ehConnString!};EntityPath=foo`, - { - eventHubPath: hubName, - initialOffset: EventPosition.fromEnqueuedTime(Date.now()) - } - ); - const hubRuntimeInfo = await host.getHubRuntimeInformation(); - hubRuntimeInfo.path.should.equal(hubName); - should.equal(Array.isArray(hubRuntimeInfo.partitionIds), true); - should.equal(typeof hubRuntimeInfo.partitionCount, "number"); + it("when eventhub name is present in connection string and in the options object, the one in options object is selected.", function (done) { + const test = async () => { + host = EventProcessorHost.createFromConnectionString( + EventProcessorHost.createHostName(), + storageConnString!, + EventProcessorHost.createHostName("single"), + `${ehConnString!};EntityPath=foo`, + { + eventHubPath: hubName, + initialOffset: EventPosition.fromEnqueuedTime(Date.now()) + } + ); + const hubRuntimeInfo = await host.getHubRuntimeInformation(); + hubRuntimeInfo.path.should.equal(hubName); + should.equal(Array.isArray(hubRuntimeInfo.partitionIds), true); + should.equal(typeof hubRuntimeInfo.partitionCount, "number"); + await host.stop(); + }; + test().then(() => { done(); }).catch((err) => { done(err); }); }); }); }); \ No newline at end of file diff --git a/processor/tests/iothub.spec.ts b/processor/tests/iothub.spec.ts index 06f77cb7b72d..a7730732feab 100644 --- a/processor/tests/iothub.spec.ts +++ b/processor/tests/iothub.spec.ts @@ -26,35 +26,38 @@ describe("EPH with iothub connection string", function () { "define IOTHUB_CONNECTION_STRING in your environment before running integration tests."); }); - it("should be able to receive messages from the event hub associated with an iothub.", async function () { - try { - host = await EventProcessorHost.createFromIotHubConnectionString( - hostName, - storageConnString!, - EventProcessorHost.createHostName("iot"), - iothubConnString!, - { - initialOffset: EventPosition.fromEnqueuedTime(Date.now()), - leaseDuration: 20, - leaseRenewInterval: 10 - } - ); - const onMessage: OnReceivedMessage = (context: PartitionContext, data: EventData) => { - debug(">>> [%s] Rx message from '%s': '%O'", hostName, context.partitionId, data); - }; - const onError: OnReceivedError = (err) => { - debug("An error occurred while receiving the message: %O", err); + it("should be able to receive messages from the event hub associated with an iothub.", function (done) { + const test = async () => { + try { + host = await EventProcessorHost.createFromIotHubConnectionString( + hostName, + storageConnString!, + EventProcessorHost.createHostName("iot"), + iothubConnString!, + { + initialOffset: EventPosition.fromEnqueuedTime(Date.now()), + leaseDuration: 20, + leaseRenewInterval: 10 + } + ); + const onMessage: OnReceivedMessage = (context: PartitionContext, data: EventData) => { + debug(">>> [%s] Rx message from '%s': '%O'", hostName, context.partitionId, data); + }; + const onError: OnReceivedError = (err) => { + debug("An error occurred while receiving the message: %O", err); + throw err; + }; + const runtimeInfo = await host.getHubRuntimeInformation(); + debug(">>>> runtimeInfo: %O", runtimeInfo); + runtimeInfo.createdAt.should.exist; + (typeof runtimeInfo.partitionCount).should.equal("number"); + await host.start(onMessage, onError); + await delay(15000); + await host.stop(); + } catch (err) { throw err; - }; - const runtimeInfo = await host.getHubRuntimeInformation(); - debug(">>>> runtimeInfo: %O", runtimeInfo); - runtimeInfo.createdAt.should.exist; - (typeof runtimeInfo.partitionCount).should.equal("number"); - await host.start(onMessage, onError); - await delay(15000); - await host.stop(); - } catch (err) { - throw err; + } } + test().then(() => { done(); }).catch((err) => { done(err); }); }); }); \ No newline at end of file diff --git a/processor/tests/negative.spec.ts b/processor/tests/negative.spec.ts index 42efbbc79eae..5be28436d16d 100644 --- a/processor/tests/negative.spec.ts +++ b/processor/tests/negative.spec.ts @@ -28,35 +28,38 @@ describe("negative", function () { const hubName = process.env.EVENTHUB_NAME; const hostName = EventProcessorHost.createHostName(); let host: EventProcessorHost; - it("should fail when trying to start an EPH that is already started.", async function () { - host = EventProcessorHost.createFromConnectionString( - hostName, - storageConnString!, - EventProcessorHost.createHostName("tc"), - ehConnString!, - { - eventHubPath: hubName!, - initialOffset: EventPosition.fromEnqueuedTime(Date.now()) + it("should fail when trying to start an EPH that is already started.", function (done) { + const test = async () => { + host = EventProcessorHost.createFromConnectionString( + hostName, + storageConnString!, + EventProcessorHost.createHostName("tc"), + ehConnString!, + { + eventHubPath: hubName!, + initialOffset: EventPosition.fromEnqueuedTime(Date.now()) + } + ); + const onMessage: OnReceivedMessage = (context: PartitionContext, data: EventData) => { + debug(">>> [%s] Rx message from '%s': '%O'", hostName, context.partitionId, data); + }; + const onError: OnReceivedError = (err) => { + debug("An error occurred while receiving the message: %O", err); + throw err; + }; + await host.start(onMessage, onError); + try { + debug(">>> [%s] Trying to start second time.", hostName); + await host.start(onMessage, onError); + throw new Error("The second call to start() should have failed."); + } catch (err) { + err.message.should.match(/A partition manager cannot be started multiple times/ig); + } finally { + await host.stop(); + should.equal(host["_context"]["partitionManager"]["_isCancelRequested"], true); } - ); - const onMessage: OnReceivedMessage = (context: PartitionContext, data: EventData) => { - debug(">>> [%s] Rx message from '%s': '%O'", hostName, context.partitionId, data); - }; - const onError: OnReceivedError = (err) => { - debug("An error occurred while receiving the message: %O", err); - throw err; }; - await host.start(onMessage, onError); - try { - debug(">>> [%s] Trying to start second time.", hostName); - await host.start(onMessage, onError); - throw new Error("The second call to start() should have failed."); - } catch (err) { - err.message.should.match(/A partition manager cannot be started multiple times/ig); - } finally { - await host.stop(); - should.equal(host["_context"]["partitionManager"]["_isCancelRequested"], true); - } + test().then(() => { done(); }).catch((err) => { done(err); }); }); it("should fail when the eventhub name is incorrect.", function (done) { diff --git a/processor/tests/retry.spec.ts b/processor/tests/retry.spec.ts index 9c1934995866..5412a0eb2a36 100644 --- a/processor/tests/retry.spec.ts +++ b/processor/tests/retry.spec.ts @@ -11,126 +11,137 @@ const should = chai.should(); const debug = debugModule("azure:eph:retry-spec"); describe("retry function", function () { - it("should succeed if the operation succeeds.", async function () { - let counter = 0; - try { - const config: RetryConfig = { - operation: async () => { - debug("counter: %d", ++counter); - await delay(200); - return { - code: 200, - description: "OK" - } - }, - hostName: "eph-1", - action: "Succeed", - maxRetries: 5, - retryMessage: "Retry", - finalFailureMessage: "Out of retry attempts, still failing!!" - }; - const result = await retry(config); - result.code.should.equal(200); - result.description.should.equal("OK"); - counter.should.equal(1); - } catch (err) { - debug("An error occurred in a test that should have succeeded: %O", err); - throw err; - } - }); - - it("should succeed if the operation initially fails and then succeeds.", async function () { - let counter = 0; - try { - const config: RetryConfig = { - operation: async () => { - await delay(200); - debug("counter: %d", ++counter); - if (counter == 1) { - throw new Error("The server is busy right now. Retry later."); - } else { - return ["0", "1"]; - } - }, - hostName: "eph-1", - action: "Initially fail then suceed", - maxRetries: 5, - retryMessage: "Retry", - finalFailureMessage: "Out of retry attempts, still failing!!" - }; - const result = await retry(config); - should.equal(Array.isArray(result), true); - result.toString().should.equal("0,1"); - counter.should.equal(2); - } catch (err) { - debug("An error occurred in a test that should have succeeded: %O", err); - throw err; - } - }); - - it("should succeed in the last attempt.", async function () { - let counter = 0; - try { - const config: RetryConfig = { - operation: async () => { - await delay(200); - debug("counter: %d", ++counter); - if (counter == 1) { - const e = new Error("Error in attempt 1."); - throw e; - } else if (counter == 2) { - const e = new Error("Error in attempt 2."); - throw e; - } else { + it("should succeed if the operation succeeds.", function (done) { + const test = async () => { + let counter = 0; + try { + const config: RetryConfig = { + operation: async () => { + debug("counter: %d", ++counter); + await delay(200); return { code: 200, description: "OK" - }; - } - }, - hostName: "eph-1", - action: "Success in last attempt", - maxRetries: 3, - retryMessage: "Retry", - finalFailureMessage: "Out of retry attempts, still failing!!" - }; - const result = await retry(config); - result.code.should.equal(200); - result.description.should.equal("OK"); - counter.should.equal(3); - } catch (err) { - debug("An error occurred in a test that should have succeeded: %O", err); - throw err; - } + } + }, + hostName: "eph-1", + action: "Succeed", + maxRetries: 5, + retryMessage: "Retry", + finalFailureMessage: "Out of retry attempts, still failing!!" + }; + const result = await retry(config); + result.code.should.equal(200); + result.description.should.equal("OK"); + counter.should.equal(1); + } catch (err) { + debug("An error occurred in a test that should have succeeded: %O", err); + throw err; + } + }; + test().then(() => { done(); }).catch((err) => { done(err); }); }); - it("should fail if all attempts return an error", async function () { - let counter = 0; - try { - const config: RetryConfig = { - operation: async () => { - debug("counter: %d", ++counter); - await delay(200); - const e = new Error("I would always like to fail, keep retrying."); - throw e; - }, - hostName: "eph-1", - action: "Fail after 5 attempts", - maxRetries: 5, - retryMessage: "Retry", - partitionId: "1", - finalFailureMessage: "Out of retry attempts, still failing!!" - }; - await retry(config); - } catch (err) { - counter.should.equal(5); - should.exist(err); - err.action.should.equal("Fail after 5 attempts"); - err.hostName.should.equal("eph-1"); - err.partitionId.should.equal("1"); - should.exist(err.error); - err.error.message.should.match(/Out of retry attempts, still failing!! while performing the action "Fail after 5 attempts" due to Error\: I would always like to fail, keep retrying.*/ig); + it("should succeed if the operation initially fails and then succeeds.", function (done) { + const test = async () => { + let counter = 0; + try { + const config: RetryConfig = { + operation: async () => { + await delay(200); + debug("counter: %d", ++counter); + if (counter == 1) { + throw new Error("The server is busy right now. Retry later."); + } else { + return ["0", "1"]; + } + }, + hostName: "eph-1", + action: "Initially fail then suceed", + maxRetries: 5, + retryMessage: "Retry", + finalFailureMessage: "Out of retry attempts, still failing!!" + }; + const result = await retry(config); + should.equal(Array.isArray(result), true); + result.toString().should.equal("0,1"); + counter.should.equal(2); + } catch (err) { + debug("An error occurred in a test that should have succeeded: %O", err); + throw err; + } + }; + test().then(() => { done(); }).catch((err) => { done(err); }); + }); - } + it("should succeed in the last attempt.", function (done) { + const test = async () => { + let counter = 0; + try { + const config: RetryConfig = { + operation: async () => { + await delay(200); + debug("counter: %d", ++counter); + if (counter == 1) { + const e = new Error("Error in attempt 1."); + throw e; + } else if (counter == 2) { + const e = new Error("Error in attempt 2."); + throw e; + } else { + return { + code: 200, + description: "OK" + }; + } + }, + hostName: "eph-1", + action: "Success in last attempt", + maxRetries: 3, + retryMessage: "Retry", + finalFailureMessage: "Out of retry attempts, still failing!!" + }; + const result = await retry(config); + result.code.should.equal(200); + result.description.should.equal("OK"); + counter.should.equal(3); + } catch (err) { + debug("An error occurred in a test that should have succeeded: %O", err); + throw err; + } + }; + test().then(() => { done(); }).catch((err) => { done(err); }); + }); + + it("should fail if all attempts return an error", function (done) { + const test = async () => { + let counter = 0; + try { + const config: RetryConfig = { + operation: async () => { + debug("counter: %d", ++counter); + await delay(200); + const e = new Error("I would always like to fail, keep retrying."); + throw e; + }, + hostName: "eph-1", + action: "Fail after 5 attempts", + maxRetries: 5, + retryMessage: "Retry", + partitionId: "1", + finalFailureMessage: "Out of retry attempts, still failing!!" + }; + await retry(config); + } catch (err) { + counter.should.equal(5); + should.exist(err); + err.action.should.equal("Fail after 5 attempts"); + err.hostName.should.equal("eph-1"); + err.partitionId.should.equal("1"); + should.exist(err.error); + err.error.message.should.match(/Out of retry attempts, still failing!! while performing the action "Fail after 5 attempts" due to Error\: I would always like to fail, keep retrying.*/ig); + } + }; + test().then(() => { done(); }).catch((err) => { done(err); }); }); }); \ No newline at end of file