diff --git a/docs/server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx b/docs/server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx index d25142ac27..11b6df492b 100644 --- a/docs/server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx +++ b/docs/server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx @@ -6,281 +6,326 @@ sidebar_position: 1 --- import Admonition from '@theme/Admonition'; -import Tabs from '@theme/Tabs'; -import TabItem from '@theme/TabItem'; import CodeBlock from '@theme/CodeBlock'; -import LanguageSwitcher from "@site/src/components/LanguageSwitcher"; -import LanguageContent from "@site/src/components/LanguageContent"; +import Panel from "@site/src/components/Panel"; +import ContentFrame from "@site/src/components/ContentFrame"; # Queue Sink: Apache Kafka + -* **Apache Kafka** is a distributed, high-performance, transactional messaging - platform, that remains performant as the number of messages it needs to process - increases and the number of events it needs to stream climbs to the big-data zone. - -* RavenDB can harness the advantages presented by message brokers like Kafka - both as a producer (by [running ETL tasks](../../../server/ongoing-tasks/etl/queue-etl/kafka.mdx)) - and as a **consumer** (using a sink task to consume enqueued messages). - -* To use RavenDB as a consumer, define an ongoing Sink task that will read batches - of enqueued JSON formatted messages from Kafka topics, construct documents using - user-defined scripts, and store the documents in RavenDB collections. - -* In this page: - * [The Queue Sink Task](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#the-queue-sink-task) - * [Client API](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#client-api) - * [Add a Kafka Connection String](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#add-a-kafka-connection-string) - * [Add a Kafka Sink Task](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#add-a-kafka-sink-task) - * [Configuration Options](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#configuration-options) +* **Apache Kafka** is a distributed, high-performance, transactional messaging + platform, that remains performant as the number of messages it needs to process + increases and the number of events it needs to stream climbs to the big-data zone. + +* RavenDB can harness the advantages presented by message brokers like Kafka + both as a producer (by [running ETL tasks](../../../server/ongoing-tasks/etl/queue-etl/kafka.mdx)) + and as a **consumer** (using a sink task to consume enqueued messages). + +* To use RavenDB as a consumer, define an ongoing Sink task that will read batches + of enqueued JSON formatted messages from Kafka topics, construct documents using + user-defined scripts, and store the documents in RavenDB collections. + +* In this page: + * [The Queue Sink Task](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#the-queue-sink-task) + * [Connecting a Kafka broker](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#connecting-a-kafka-broker) + * [Retrieving enqueued messages from selected Kafka topics](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#retrieving-enqueued-messages-from-selected-kafka-topics) + * [Running user-defined scripts](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#running-user-defined-scripts) + * [Storing documents in RavenDB collections](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#storing-documents-in-ravendb-collections) + * [Client API](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#client-api) + * [Add a Kafka Connection String](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#add-a-kafka-connection-string) + * [Add a Kafka Sink Task](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#add-a-kafka-sink-task) + * [Configuration Options](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#configuration-options) + -## The Queue Sink Task -Users of RavenDB 6.0 and on can create an ongoing Sink task that connects -a Kafka broker, retrieves enqueued messages from selected Kafka topics, -runs a user-defined script to manipulate data and construct documents, and -potentially stores the created documents in RavenDB collections. -#### Connecting a Kafka broker + + + + +## Connecting a Kafka broker + +Users of RavenDB 6.0 and on can create an ongoing Sink task that connects +a Kafka broker, retrieves enqueued messages from selected Kafka topics, +runs a user-defined script to manipulate data and construct documents, and +potentially stores the created documents in RavenDB collections. In the message broker architecture, RavenDB sinks take the role of data consumers. -A sink would connect a Kafka broker using a connection string, and retrieve messages -from the broker's Topics. +A sink would connect a Kafka broker using a connection string, and retrieve messages +from the broker's Topics. -Read [below](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#add-a-kafka-connection-string) +Read [below](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#add-a-kafka-connection-string) about adding a connection string via API. -Read [here](../../../studio/database/tasks/ongoing-tasks/kafka-queue-sink.mdx#define-a-kafka-sink-task) -about adding a connection string using Studio. +Read [here](../../../studio/database/tasks/ongoing-tasks/kafka-queue-sink.mdx#define-a-kafka-sink-task) +about adding a connection string using Studio. -Like all ongoing tasks, a sink task is operated by a +Like all ongoing tasks, a sink task is operated by a [responsible node](../../../server/clustering/distribution/highly-available-tasks.mdx#responsible-node). -When the responsibility for the task is moved from one node to another, -e.g. from node `A` to node `B` as a result of node `A` down time: +When the responsibility for the task is moved from one node to another, +e.g. from node `A` to node `B` as a result of node `A` down time: -* The consumer task will maintain the same consumer group id it had on the original node. -* Kafka brokers may **cease serving the sink task for some time** as the Kafka consumer - group rebalances (adapting to the leaving of one node and the joining of another, among - other changes). +* The consumer task will maintain the same consumer group id it had on the original node. +* Kafka brokers may **cease serving the sink task for some time** as the Kafka consumer + group rebalances (adapting to the leaving of one node and the joining of another, among + other changes). -#### Retrieving enqueued messages from selected Kafka topics - -When a message is sent to a Kafka broker by a producer, it is pushed to -the tail of a topic. As preceding messages are pulled, the message advances -up the queue until it reaches its head and can be consumed by RavenDB's sink. -#### Running user-defined scripts - -A sink task's script is a JavaScript segment. Its basic role is to retrieve -selected Kafka messages or message properties, and construct documents that -will then be stored in RavenDB. - -The script can simply store the whole message as a document, as in this -segment: - - -{`// Add the document a metadata \`@collection\` property to keep it in -// this collection, or do not set it to store the document in @empty). -this['@metadata']['@collection'] = 'Orders'; -// Store the message as is, using its Id property as its RavenDB Id as well. + + + +--- + + + +## Retrieving enqueued messages from selected Kafka topics + + +While a queue sink task is running, it prevents the host database from being unloaded due to idle operations. +This ensures that message consumption from Kafka topics continues uninterrupted, even if the database would otherwise go idle. + + +When a message is sent to a Kafka broker by a producer, it is pushed to +the tail of a topic. As preceding messages are pulled, the message advances +up the queue until it reaches its head and can be consumed by RavenDB's sink. + + + +--- + + + +## Running user-defined scripts + +A sink task's script is a JavaScript segment. Its basic role is to retrieve +selected Kafka messages or message properties, and construct documents that +will then be stored in RavenDB. + +The script can store the whole message as a document, as in this segment: + +```javascript +// Add the document a metadata `@collection` property to keep it in +// this collection, or do not set it to store the document in @empty). +this['@metadata']['@collection'] = 'Orders'; +// Store the message as is, using its Id property as its RavenDB Id as well. put(this.Id.toString(), this) -`} - - - -But the script can also retrieve some information from the read message -and construct a new document that doesn't resemble the original message. -Scripts often apply two sections: a section that creates a JSON object -that defines the document's structure and contents, and a second section -that stores the document. - -E.g., for Kafka messages of this format - - - -{`\{ +``` + +
+ +The script can also retrieve selected information from the read message and +construct a new document that does not resemble the original message. +Scripts often apply two sections: a section that creates a JSON object +that defines the document's structure and contents, and a second section +that stores the document. + +E.g., for Kafka messages of this format: + +```json +{ "Id" : 13, "FirstName" : "John", "LastName" : "Doe" -\} -`} -
-
- -We can create this script - - - -{`var item = \{ - Id : this.Id, - FirstName : this.FirstName, - LastName : this.LastName, - FullName : this.FirstName + ' ' + this.LastName, - "@metadata" : \{ +} +``` + +
+ +We can create this script: + +```javascript +var item = { + Id : this.Id, + FirstName : this.FirstName, + LastName : this.LastName, + FullName : this.FirstName + ' ' + this.LastName, + "@metadata" : { "@collection" : "Users" - \} -\}; + } +}; // Use .toString() to pass the Id as a string even if Kafka provides it as a number put(this.Id.toString(), item) -`} -
-
- -The script can also apply various other JavaScript commands, including -`load` to load a RavenDB document (e.g. to construct a document that -includes data from the retrieved message and complementing data from -existing RavenDB documents), `del` to remove existing RavenDB documents, -and [many others](../../../server/kb/javascript-engine.mdx#predefined-javascript-functions). -#### Storing documents in RavenDB collections - -The sink task consumes batches of queued messages and stores them in RavenDB -in a transactional manner, processing either the entire batch or none of it. +``` + +
+ +The script can also apply various other JavaScript commands, including +`load` to load a RavenDB document (e.g. to construct a document that +includes data from the retrieved message and complementing data from +existing RavenDB documents), `del` to remove existing RavenDB documents, +and [many others](../../../server/kb/javascript-engine.mdx#predefined-javascript-functions). + +
+ +--- + + + +## Storing documents in RavenDB collections + +The sink task consumes batches of queued messages and stores them in RavenDB +in a transactional manner, processing either the entire batch or none of it. + -Some script processing errors are allowed; when such an error occurs RavenDB -will skip the affected message, record the event in the logs, and alert the -user in Studio, but **continue processing the batch**. +Some script processing errors are allowed; when such an error occurs RavenDB +will skip the affected message, record the event in the logs, and alert the +user in Studio, but **continue processing the batch**. -Once a batch is consumed, the task confirms it by calling `kafkaConsumer.Commit()`. +Once a batch is consumed, the task confirms it by calling `kafkaConsumer.Commit()`. -Note that the number of documents included in a batch is +Note that the number of documents included in a batch is [configurable](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#configuration-options). -Producers may enqueue -[multiple instances](../../../server/ongoing-tasks/etl/queue-etl/kafka.mdx#idempotence-and-message-duplication) +Producers may enqueue +[multiple instances](../../../server/ongoing-tasks/etl/queue-etl/kafka.mdx#idempotence-and-message-duplication) of the same document. -if processing each message only once is important to the consumer, -it is **the consumer's responsibility** to verify the uniqueness of -each consumed message. - -Note that as long as the **Id** property of Kafka messages is preserved -(so duplicate messages share an Id), the script's `put(ID, { ... })` -command will overwrite a previous document with the same Id and only -one copy of it will remain. +If processing each message only once is important to the consumer, +it is **the consumer's responsibility** to verify the uniqueness of +each consumed message. + +Note that as long as the **Id** property of Kafka messages is preserved +(so duplicate messages share an Id), the script's `put(ID, { ... })` +command will overwrite a previous document with the same Id and only +one copy of it will remain. + +
-## Client API + -#### Add a Kafka Connection String + -Prior to defining a Kafka sink task, add a **Kafka connection string** -that the task will use to connect the message broker's bootstrap servers. +## Add a Kafka Connection String -To create the connection string: +Prior to defining a Kafka sink task, add a **Kafka connection string** +that the task will use to connect the message broker's bootstrap servers. -* Create a `QueueConnectionString`instance with the connection string configuration. - Pass it to the `PutConnectionStringOperation` store operation to add the connection string. +To create the connection string: - `QueueConnectionString`: - - -{`// Add Kafka connection string +* Create a `QueueConnectionString` instance with the connection string configuration. + Pass it to the `PutConnectionStringOperation` store operation to add the connection string. + +`QueueConnectionString`: + +```csharp +// Add Kafka connection string var res = store.Maintenance.Send( new PutConnectionStringOperation( new QueueConnectionString - \{ + { Name = "KafkaConStr", BrokerType = QueueBrokerType.Kafka, KafkaConnectionSettings = new KafkaConnectionSettings() - \{ BootstrapServers = "localhost:9092" \} - \})); -`} - - - - `QueueBrokerType`: - - -{`public enum QueueBrokerType -\{ + { BootstrapServers = "localhost:9092" } + })); +``` + +
+ +`QueueBrokerType`: + +```csharp +public enum QueueBrokerType +{ None, Kafka, RabbitMq -\} -`} -
-
- - | Property | Type | Description | - |:-------------|:-------------|:-------------| - | **Name** | `string` | Connection string name | - | **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.Kafka` for a Kafka connection string | - | **KafkaConnectionSettings** | `KafkaConnectionSettings[]` | A list of comma-separated host:port URLs to Kafka brokers | -#### Add a Kafka Sink Task - -To create the Sink task: - -* Create `QueueSinkScript` instances to define scripts with which the - task can process retrieved messages, apply JavaScript commands, construct - documents and store them in RavenDB. - - -{`// Define a Sink script +} +``` + +
+ +| Property | Type | Description | +|---|---|---| +| **Name** | `string` | Connection string name | +| **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.Kafka` for a Kafka connection string | +| **KafkaConnectionSettings** | `KafkaConnectionSettings[]` | A list of comma-separated host:port URLs to Kafka brokers | + +
+ +--- + + + +## Add a Kafka Sink Task + +To create the Sink task: + +* Create `QueueSinkScript` instances to define scripts with which the + task can process retrieved messages, apply JavaScript commands, construct + documents and store them in RavenDB. + +```csharp +// Define a Sink script QueueSinkScript queueSinkScript = new QueueSinkScript -\{ +{ // Script name Name = "orders", // A list of Kafka topics to connect - Queues = new List() \{ "orders" \}, + Queues = new List() { "orders" }, // Apply this script - Script = @"this['@metadata']['@collection'] = 'Orders'; + Script = @"this['@metadata']['@collection'] = 'Orders'; put(this.Id.toString(), this)" -\}; -`} - - +}; +``` + +
-* Prepare a `QueueSinkConfiguration`object with the sink task configuration. +* Prepare a `QueueSinkConfiguration` object with the sink task configuration. - `QueueSinkConfiguration` properties: +`QueueSinkConfiguration` properties: - | Property | Type | Description | - |:-------------|:-------------|:-------------| - | **Name** | `string` | The sink task name | - | **ConnectionStringName** | `string` | The registered connection string name | - | **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.Kafka` to define a Kafka sink task | - | **Scripts** | `List` | A list of scripts | +| Property | Type | Description | +|---|---|---| +| **Name** | `string` | The sink task name | +| **ConnectionStringName** | `string` | The registered connection string name | +| **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.Kafka` to define a Kafka sink task | +| **Scripts** | `List` | A list of scripts | -* Pass this object to the `AddQueueSinkOperation` store operation to add the Sink task. +* Pass this object to the `AddQueueSinkOperation` store operation to add the Sink task. - `QueueSinkScript` properties: +`QueueSinkScript` properties: - | Property | Type | Description | - |:-------------|:-------------|:-------------| - | **Name** | `string` | Script name | - | **Queues** | `List` | A list of Kafka topics to consume messages from | - | **Script** | `string ` | The script contents | +| Property | Type | Description | +|---|---|---| +| **Name** | `string` | Script name | +| **Queues** | `List` | A list of Kafka topics to consume messages from | +| **Script** | `string` | The script contents | -**Code Sample**: - - -{`// Add Kafka connection string +#### Example: + +```csharp +// Add Kafka connection string var res = store.Maintenance.Send( new PutConnectionStringOperation( new QueueConnectionString - \{ + { Name = "KafkaConStr", BrokerType = QueueBrokerType.Kafka, - KafkaConnectionSettings = new KafkaConnectionSettings() - \{ BootstrapServers = "localhost:9092" \} - \})); + KafkaConnectionSettings = new KafkaConnectionSettings() + { BootstrapServers = "localhost:9092" } + })); // Define a Sink script QueueSinkScript queueSinkScript = new QueueSinkScript -\{ +{ // Script name Name = "orders", // A list of Kafka topics to connect - Queues = new List() \{ "orders" \}, + Queues = new List() { "orders" }, // Apply this script - Script = @"this['@metadata']['@collection'] = 'Orders'; + Script = @"this['@metadata']['@collection'] = 'Orders'; put(this.Id.toString(), this)" -\}; +}; -// Define a Kafka configuration +// Define a Kafka sink task configuration var config = new QueueSinkConfiguration() -\{ +{ // Sink name Name = "KafkaSinkTaskName", // The connection string to connect the broker with @@ -288,27 +333,25 @@ var config = new QueueSinkConfiguration() // What queue broker is this task using BrokerType = QueueBrokerType.Kafka, // The list of scripts to run - Scripts = \{ queueSinkScript \} -\}; + Scripts = { queueSinkScript } +}; -AddQueueSinkOperationResult addQueueSinkOperationResult = +AddQueueSinkOperationResult addQueueSinkOperationResult = store.Maintenance.Send(new AddQueueSinkOperation(config)); -`} - - +``` +
+
-## Configuration Options + -Use these configuration options to gain more control over queue sink tasks. +Use these configuration options to gain more control over queue sink tasks. * [QueueSink.MaxBatchSize](../../../server/configuration/queue-sink-configuration.mdx#queuesinkmaxbatchsize) The maximum number of pulled messages consumed in a single batch. * [QueueSink.MaxFallbackTimeInSec](../../../server/configuration/queue-sink-configuration.mdx#queuesinkmaxfallbacktimeinsec) - The maximum number of seconds the Queue Sink process will be in a fallback - mode (i.e. suspending the process) after a connection failure. - - - + The maximum number of seconds the Queue Sink process will be in a fallback + mode (i.e. suspending the process) after a connection failure. + diff --git a/docs/server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx b/docs/server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx index 332b2be4bd..ac6e88140f 100644 --- a/docs/server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx +++ b/docs/server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx @@ -6,296 +6,339 @@ sidebar_position: 2 --- import Admonition from '@theme/Admonition'; -import Tabs from '@theme/Tabs'; -import TabItem from '@theme/TabItem'; import CodeBlock from '@theme/CodeBlock'; -import LanguageSwitcher from "@site/src/components/LanguageSwitcher"; -import LanguageContent from "@site/src/components/LanguageContent"; +import Panel from "@site/src/components/Panel"; +import ContentFrame from "@site/src/components/ContentFrame"; # Queue Sink: RabbitMQ + -* **RabbitMQ** brokers are designed to disperse data to multiple queues, - making for a flexible data channeling system that can easily handle complex - message streaming scenarios. +* **RabbitMQ** brokers are designed to disperse data to multiple queues, + making for a flexible data channeling system that can handle complex + message streaming scenarios. + +* RavenDB can harness the advantages presented by RabbitMQ brokers both as + a producer (by [running ETL tasks](../../../server/ongoing-tasks/etl/queue-etl/rabbit-mq.mdx)) + and as a **consumer** (using a sink task to consume enqueued messages). + +* To use RavenDB as a consumer, define an ongoing Sink task that will read + batches of JSON formatted messages from RabbitMQ queues, construct documents + using user-defined scripts, and store the documents in RavenDB collections. + +* In this page: + * [The RabbitMQ Sink Task](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#the-rabbitmq-sink-task) + * [Connecting a RabbitMQ broker](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#connecting-a-rabbitmq-broker) + * [Retrieving messages from RabbitMQ queues](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#retrieving-messages-from-rabbitmq-queues) + * [Running user-defined scripts](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#running-user-defined-scripts) + * [Storing documents in RavenDB collections](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#storing-documents-in-ravendb-collections) + * [Client API](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#client-api) + * [Add a RabbitMQ Connection String](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#add-a-rabbitmq-connection-string) + * [Add a RabbitMQ Sink Task](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#add-a-rabbitmq-sink-task) + * [Configuration Options](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#configuration-options) -* RavenDB can harness the advantages presented by RabbitMQ brokers both as - a producer (by [running ETL tasks](../../../server/ongoing-tasks/etl/queue-etl/rabbit-mq.mdx)) - and as a **consumer** (using a sink task to consume enqueued messages). + -* To use RavenDB as a consumer, define an ongoing Sink task that will read - batches of JSON formatted messages from RabbitMQ queues, construct documents - using user-defined scripts, and store the documents in RavenDB collections. + -* In this page: - * [The RabbitMQ Sink Task](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#the-rabbitmq-sink-task) - * [Client API](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#client-api) - * [Add a RabbitMQ Connection String](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#add-a-rabbitmq-connection-string) - * [Add a RabbitMQ Sink Task](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#add-a-rabbitmq-sink-task) - * [Configuration Options](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#configuration-options) - -## The RabbitMQ Sink Task + -Users of RavenDB 6.0 and on can create an ongoing Sink task that connects -a RabbitMQ broker, retrieves messages from selected queues, runs a user-defined -script to manipulate data and construct documents, and potentially stores the -created documents in RavenDB collections. -#### Connecting a RabbitMQ broker +## Connecting a RabbitMQ broker + +Users of RavenDB 6.0 and on can create an ongoing Sink task that connects +a RabbitMQ broker, retrieves messages from selected queues, runs a user-defined +script to manipulate data and construct documents, and potentially stores the +created documents in RavenDB collections. In the message broker architecture, RavenDB sinks take the role of data consumers. -A sink would connect a RabbitMQ broker using a connection string, and retrieve messages -from the broker's queues. +A sink would connect a RabbitMQ broker using a connection string, and retrieve messages +from the broker's queues. -Read [below](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#add-a-rabbitmq-connection-string) +Read [below](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#add-a-rabbitmq-connection-string) about adding a connection string via API. -Read [here](../../../studio/database/tasks/ongoing-tasks/rabbitmq-queue-sink.mdx#define-a-rabbitmq-sink-task) -about adding a connection string using Studio. -#### Retrieving messages from RabbitMQ queues - -When a message is sent to a RabbitMQ broker by a producer, it is pushed to -the tail of a queue. As preceding messages are pulled, the message advances -up the queue until it reaches its head and can be consumed by RavenDB's sink. -#### Running user-defined scripts - -A sink task's script is a JavaScript segment. Its basic role is to retrieve -selected RabbitMQ messages or message properties, and construct documents that -will then be stored in RavenDB. - -The script can simply store the whole message as a document, as in this -segment: - - -{`// Add the document a metadata \`@collection\` property to keep it in -// this collection, or do not set it to store the document in @empty). -this['@metadata']['@collection'] = 'Orders'; -// Store the message as is, using its Id property as its RavenDB Id as well. +Read [here](../../../studio/database/tasks/ongoing-tasks/rabbitmq-queue-sink.mdx#define-a-rabbitmq-sink-task) +about adding a connection string using Studio. + + + +--- + + + +## Retrieving messages from RabbitMQ queues + + +While a queue sink task is running, it prevents the host database from being unloaded due to idle operations. +This ensures that message consumption from RabbitMQ queues continues uninterrupted, even if the database would otherwise go idle. + + +When a message is sent to a RabbitMQ broker by a producer, it is pushed to +the tail of a queue. As preceding messages are pulled, the message advances +up the queue until it reaches its head and can be consumed by RavenDB's sink. + + + +--- + + + +## Running user-defined scripts + +A sink task's script is a JavaScript segment. Its basic role is to retrieve +selected RabbitMQ messages or message properties, and construct documents that +will then be stored in RavenDB. + +The script can store the whole message as a document, as in this segment: + +```javascript +// Add the document a metadata `@collection` property to keep it in +// this collection, or do not set it to store the document in @empty). +this['@metadata']['@collection'] = 'Orders'; +// Store the message as is, using its Id property as its RavenDB Id as well. put(this.Id.toString(), this) -`} - - - -But the script can also retrieve some information from the read message -and construct a new document that doesn't resemble the original message. -Scripts often apply two sections: a section that creates a JSON object -that defines the document's structure and contents, and a second section -that stores the document. - -E.g., for RabbitMQ messages of this format - - - -{`\{ +``` + +
+ +The script can also retrieve selected information from the read message and +construct a new document that does not resemble the original message. +Scripts often apply two sections: a section that creates a JSON object +that defines the document's structure and contents, and a second section +that stores the document. + +E.g., for RabbitMQ messages of this format: + +```json +{ "Id" : 13, "FirstName" : "John", "LastName" : "Doe" -\} -`} -
-
- -We can create this script - - - -{`var item = \{ - Id : this.Id, - FirstName : this.FirstName, - LastName : this.LastName, - FullName : this.FirstName + ' ' + this.LastName, - "@metadata" : \{ +} +``` + +
+ +We can create this script: + +```javascript +var item = { + Id : this.Id, + FirstName : this.FirstName, + LastName : this.LastName, + FullName : this.FirstName + ' ' + this.LastName, + "@metadata" : { "@collection" : "Users" - \} -\}; + } +}; // Use .toString() to pass the Id as a string even if RabbitMQ provides it as a number put(this.Id.toString(), item) -`} -
-
- -The script can also apply various other JavaScript commands, including -`load` to load a RavenDB document (e.g. to construct a document that -includes data from the retrieved message and complementing data from -existing RavenDB documents), `del` to remove existing RavenDB documents, -and [many others](../../../server/kb/javascript-engine.mdx#predefined-javascript-functions). -#### Storing documents in RavenDB collections - -The sink task consumes batches of queued messages and stores them in RavenDB -in a transactional manner, processing either the entire batch or none of it. +``` + +
+ +The script can also apply various other JavaScript commands, including +`load` to load a RavenDB document (e.g. to construct a document that +includes data from the retrieved message and complementing data from +existing RavenDB documents), `del` to remove existing RavenDB documents, +and [many others](../../../server/kb/javascript-engine.mdx#predefined-javascript-functions). + +
+ +--- + + + +## Storing documents in RavenDB collections + +The sink task consumes batches of queued messages and stores them in RavenDB +in a transactional manner, processing either the entire batch or none of it. + -Some script processing errors are allowed; when such an error occurs RavenDB -will skip the affected message, record the event in the logs, and alert the -user in Studio, but **continue processing the batch**. +Some script processing errors are allowed; when such an error occurs RavenDB +will skip the affected message, record the event in the logs, and alert the +user in Studio, but **continue processing the batch**. -Once a batch is consumed, the task confirms it by sending `_channel.BasicAck`. +Once a batch is consumed, the task confirms it by sending `_channel.BasicAck`. -Note that the number of documents included in a batch is +Note that the number of documents included in a batch is [configurable](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#configuration-options). -Producers may enqueue -[multiple instances](../../../server/ongoing-tasks/etl/queue-etl/rabbit-mq.mdx#message-duplication) +Producers may enqueue +[multiple instances](../../../server/ongoing-tasks/etl/queue-etl/rabbit-mq.mdx#message-duplication) of the same document. -if processing each message only once is important to the consumer, -it is **the consumer's responsibility** to verify the uniqueness of -each consumed message. - -Note that as long as the **Id** property of RabbitMQ messages is preserved -(so duplicate messages share an Id), the script's `put(ID, { ... })` command -will overwrite a previous document with the same Id and only one copy of -it will remain. +If processing each message only once is important to the consumer, +it is **the consumer's responsibility** to verify the uniqueness of +each consumed message. + +Note that as long as the **Id** property of RabbitMQ messages is preserved +(so duplicate messages share an Id), the script's `put(ID, { ... })` command +will overwrite a previous document with the same Id and only one copy of +it will remain. + + +
+ -## Client API + -#### Add a RabbitMQ Connection String +## Add a RabbitMQ Connection String -Prior to defining a RabbitMQ sink task, add a **RabbitMQ connection string** -that the task will use to connect the message brokers. +Prior to defining a RabbitMQ sink task, add a **RabbitMQ connection string** +that the task will use to connect the message brokers. -To create the connection string: +To create the connection string: -* Create a `QueueConnectionString`instance with the connection string configuration. - Pass it to the `PutConnectionStringOperation` store operation to add the connection string. +* Create a `QueueConnectionString` instance with the connection string configuration. + Pass it to the `PutConnectionStringOperation` store operation to add the connection string. - `QueueConnectionString`: - - -{`// Add RabbitMQ connection string +`QueueConnectionString`: + +```csharp +// Add RabbitMQ connection string var res = store.Maintenance.Send( new PutConnectionStringOperation( new QueueConnectionString - \{ + { Name = "RabbitMqConStr", BrokerType = QueueBrokerType.RabbitMq, RabbitMqConnectionSettings = new RabbitMqConnectionSettings() - \{ ConnectionString = "amqp://guest:guest@localhost:5672/" \} - \})); -`} - - - - `QueueBrokerType`: - - -{`public enum QueueBrokerType -\{ + { ConnectionString = "amqp://guest:guest@localhost:5672/" } + })); +``` + +
+ +`QueueBrokerType`: + +```csharp +public enum QueueBrokerType +{ None, Kafka, RabbitMq -\} -`} -
-
- - | Property | Type | Description | - |:-------------|:-------------|:-------------| - | **Name** | `string` | Connection string name | - | **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.RabbitMq` for a RabbitMQ connection string | - | **RabbitMqConnectionSettings** | `RabbitMqConnectionSettings[]` | A list of strings indicating RabbitMQ brokers connection details | -#### Add a RabbitMQ Sink Task - -To create the Sink task: - -* Create `QueueSinkScript` instances to define scripts with which the - task can process retrieved messages, apply JavaScript commands, construct - documents and store them in RavenDB. - - -{`// Define a Sink script +} +``` + +
+ +| Property | Type | Description | +|---|---|---| +| **Name** | `string` | Connection string name | +| **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.RabbitMq` for a RabbitMQ connection string | +| **RabbitMqConnectionSettings** | `RabbitMqConnectionSettings[]` | A list of strings indicating RabbitMQ brokers connection details | + +
+ +--- + + + +## Add a RabbitMQ Sink Task + +To create the Sink task: + +* Create `QueueSinkScript` instances to define scripts with which the + task can process retrieved messages, apply JavaScript commands, construct + documents and store them in RavenDB. + +```csharp +// Define a Sink script QueueSinkScript queueSinkScript = new QueueSinkScript -\{ +{ // Script name Name = "orders", // A list of RabbitMQ queues to connect - Queues = new List() \{ "orders" \}, + Queues = new List() { "orders" }, // Apply this script - Script = @"this['@metadata']['@collection'] = 'Orders'; + Script = @"this['@metadata']['@collection'] = 'Orders'; put(this.Id.toString(), this)" -\}; -`} - - - -* Prepare a `QueueSinkConfiguration`object with the sink task configuration. - - `QueueSinkConfiguration` properties: - - | Property | Type | Description | - |:-------------|:-------------|:-------------| - | **Name** | `string` | The sink task name | - | **ConnectionStringName** | `string` | The registered connection string name | - | **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.RabbitMq` to define a RabbitMQ sink task | - | **Scripts** | `List` | A list of scripts | - -* Pass this object to the `AddQueueSinkOperation` store operation to add the Sink task. - - `QueueSinkScript` properties: - - | Property | Type | Description | - |:-------------|:-------------|:-------------| - | **Name** | `string` | Script name| - | **Queues** | `List` | A list of RabbitMQ queues to consume messages from | - | **Script** | `string ` | The script contents | - -**Code Sample**: - - -{`// Add Kafka connection string +}; +``` + +
+ +* Prepare a `QueueSinkConfiguration` object with the sink task configuration. + +`QueueSinkConfiguration` properties: + +| Property | Type | Description | +|---|---|---| +| **Name** | `string` | The sink task name | +| **ConnectionStringName** | `string` | The registered connection string name | +| **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.RabbitMq` to define a RabbitMQ sink task | +| **Scripts** | `List` | A list of scripts | + +* Pass this object to the `AddQueueSinkOperation` store operation to add the Sink task. + +`QueueSinkScript` properties: + +| Property | Type | Description | +|---|---|---| +| **Name** | `string` | Script name | +| **Queues** | `List` | A list of RabbitMQ queues to consume messages from | +| **Script** | `string` | The script contents | + +#### Example: + +```csharp +// Add RabbitMQ connection string var res = store.Maintenance.Send( new PutConnectionStringOperation( new QueueConnectionString - \{ - Name = "KafkaConStr", - BrokerType = QueueBrokerType.Kafka, - KafkaConnectionSettings = new KafkaConnectionSettings() - \{ BootstrapServers = "localhost:9092" \} - \})); + { + Name = "RabbitMqConStr", + BrokerType = QueueBrokerType.RabbitMq, + RabbitMqConnectionSettings = new RabbitMqConnectionSettings() + { ConnectionString = "amqp://guest:guest@localhost:5672/" } + })); // Define a Sink script QueueSinkScript queueSinkScript = new QueueSinkScript -\{ +{ // Script name Name = "orders", - // A list of Kafka topics to connect - Queues = new List() \{ "orders" \}, + // A list of RabbitMQ queues to connect + Queues = new List() { "orders" }, // Apply this script - Script = @"this['@metadata']['@collection'] = 'Orders'; + Script = @"this['@metadata']['@collection'] = 'Orders'; put(this.Id.toString(), this)" -\}; +}; -// Define a Kafka configuration +// Define a RabbitMQ sink task configuration var config = new QueueSinkConfiguration() -\{ +{ // Sink name - Name = "KafkaSinkTaskName", + Name = "RabbitMqSinkTaskName", // The connection string to connect the broker with - ConnectionStringName = "KafkaConStr", + ConnectionStringName = "RabbitMqConStr", // What queue broker is this task using - BrokerType = QueueBrokerType.Kafka, + BrokerType = QueueBrokerType.RabbitMq, // The list of scripts to run - Scripts = \{ queueSinkScript \} -\}; + Scripts = { queueSinkScript } +}; -AddQueueSinkOperationResult addQueueSinkOperationResult = +AddQueueSinkOperationResult addQueueSinkOperationResult = store.Maintenance.Send(new AddQueueSinkOperation(config)); -`} -
-
+``` +
+
-## Configuration Options + -Use these configuration options to gain more control over queue sink tasks. +Use these configuration options to gain more control over queue sink tasks. * [QueueSink.MaxBatchSize](../../../server/configuration/queue-sink-configuration.mdx#queuesinkmaxbatchsize) The maximum number of pulled messages consumed in a single batch. * [QueueSink.MaxFallbackTimeInSec](../../../server/configuration/queue-sink-configuration.mdx#queuesinkmaxfallbacktimeinsec) - The maximum number of seconds the Queue Sink process will be in a fallback - mode (i.e. suspending the process) after a connection failure. - - - + The maximum number of seconds the Queue Sink process will be in a fallback + mode (i.e. suspending the process) after a connection failure. + diff --git a/versioned_docs/version-6.2/server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx b/versioned_docs/version-6.2/server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx index 00204c4ccd..11b6df492b 100644 --- a/versioned_docs/version-6.2/server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx +++ b/versioned_docs/version-6.2/server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx @@ -1,285 +1,331 @@ --- title: "Queue Sink: Apache Kafka" sidebar_label: Kafka Queue Sink +description: "Consume messages from Kafka topics and write them into RavenDB documents using queue sink tasks with JavaScript processing scripts." sidebar_position: 1 --- import Admonition from '@theme/Admonition'; -import Tabs from '@theme/Tabs'; -import TabItem from '@theme/TabItem'; import CodeBlock from '@theme/CodeBlock'; -import LanguageSwitcher from "@site/src/components/LanguageSwitcher"; -import LanguageContent from "@site/src/components/LanguageContent"; +import Panel from "@site/src/components/Panel"; +import ContentFrame from "@site/src/components/ContentFrame"; # Queue Sink: Apache Kafka + -* **Apache Kafka** is a distributed, high-performance, transactional messaging - platform, that remains performant as the number of messages it needs to process - increases and the number of events it needs to stream climbs to the big-data zone. - -* RavenDB can harness the advantages presented by message brokers like Kafka - both as a producer (by [running ETL tasks](../../../server/ongoing-tasks/etl/queue-etl/kafka.mdx)) - and as a **consumer** (using a sink task to consume enqueued messages). - -* To use RavenDB as a consumer, define an ongoing Sink task that will read batches - of enqueued JSON formatted messages from Kafka topics, construct documents using - user-defined scripts, and store the documents in RavenDB collections. - -* In this page: - * [The Queue Sink Task](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#the-queue-sink-task) - * [Client API](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#client-api) - * [Add a Kafka Connection String](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#add-a-kafka-connection-string) - * [Add a Kafka Sink Task](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#add-a-kafka-sink-task) - * [Configuration Options](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#configuration-options) +* **Apache Kafka** is a distributed, high-performance, transactional messaging + platform, that remains performant as the number of messages it needs to process + increases and the number of events it needs to stream climbs to the big-data zone. + +* RavenDB can harness the advantages presented by message brokers like Kafka + both as a producer (by [running ETL tasks](../../../server/ongoing-tasks/etl/queue-etl/kafka.mdx)) + and as a **consumer** (using a sink task to consume enqueued messages). + +* To use RavenDB as a consumer, define an ongoing Sink task that will read batches + of enqueued JSON formatted messages from Kafka topics, construct documents using + user-defined scripts, and store the documents in RavenDB collections. + +* In this page: + * [The Queue Sink Task](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#the-queue-sink-task) + * [Connecting a Kafka broker](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#connecting-a-kafka-broker) + * [Retrieving enqueued messages from selected Kafka topics](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#retrieving-enqueued-messages-from-selected-kafka-topics) + * [Running user-defined scripts](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#running-user-defined-scripts) + * [Storing documents in RavenDB collections](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#storing-documents-in-ravendb-collections) + * [Client API](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#client-api) + * [Add a Kafka Connection String](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#add-a-kafka-connection-string) + * [Add a Kafka Sink Task](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#add-a-kafka-sink-task) + * [Configuration Options](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#configuration-options) + -## The Queue Sink Task -Users of RavenDB 6.0 and on can create an ongoing Sink task that connects -a Kafka broker, retrieves enqueued messages from selected Kafka topics, -runs a user-defined script to manipulate data and construct documents, and -potentially stores the created documents in RavenDB collections. -#### Connecting a Kafka broker + + + + +## Connecting a Kafka broker + +Users of RavenDB 6.0 and on can create an ongoing Sink task that connects +a Kafka broker, retrieves enqueued messages from selected Kafka topics, +runs a user-defined script to manipulate data and construct documents, and +potentially stores the created documents in RavenDB collections. In the message broker architecture, RavenDB sinks take the role of data consumers. -A sink would connect a Kafka broker using a connection string, and retrieve messages -from the broker's Topics. +A sink would connect a Kafka broker using a connection string, and retrieve messages +from the broker's Topics. -Read [below](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#add-a-kafka-connection-string) +Read [below](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#add-a-kafka-connection-string) about adding a connection string via API. -Read [here](../../../studio/database/tasks/ongoing-tasks/kafka-queue-sink.mdx#define-a-kafka-sink-task) -about adding a connection string using Studio. +Read [here](../../../studio/database/tasks/ongoing-tasks/kafka-queue-sink.mdx#define-a-kafka-sink-task) +about adding a connection string using Studio. -Like all ongoing tasks, a sink task is operated by a +Like all ongoing tasks, a sink task is operated by a [responsible node](../../../server/clustering/distribution/highly-available-tasks.mdx#responsible-node). -When the responsibility for the task is moved from one node to another, -e.g. from node `A` to node `B` as a result of node `A` down time: +When the responsibility for the task is moved from one node to another, +e.g. from node `A` to node `B` as a result of node `A` down time: -* The consumer task will maintain the same consumer group id it had on the original node. -* Kafka brokers may **cease serving the sink task for some time** as the Kafka consumer - group rebalances (adapting to the leaving of one node and the joining of another, among - other changes). +* The consumer task will maintain the same consumer group id it had on the original node. +* Kafka brokers may **cease serving the sink task for some time** as the Kafka consumer + group rebalances (adapting to the leaving of one node and the joining of another, among + other changes). -#### Retrieving enqueued messages from selected Kafka topics - -When a message is sent to a Kafka broker by a producer, it is pushed to -the tail of a topic. As preceding messages are pulled, the message advances -up the queue until it reaches its head and can be consumed by RavenDB's sink. -#### Running user-defined scripts - -A sink task's script is a JavaScript segment. Its basic role is to retrieve -selected Kafka messages or message properties, and construct documents that -will then be stored in RavenDB. - -The script can simply store the whole message as a document, as in this -segment: - - -{`// Add the document a metadata \`@collection\` property to keep it in -// this collection, or do not set it to store the document in @empty). -this['@metadata']['@collection'] = 'Orders'; -// Store the message as is, using its Id property as its RavenDB Id as well. + + + +--- + + + +## Retrieving enqueued messages from selected Kafka topics + + +While a queue sink task is running, it prevents the host database from being unloaded due to idle operations. +This ensures that message consumption from Kafka topics continues uninterrupted, even if the database would otherwise go idle. + + +When a message is sent to a Kafka broker by a producer, it is pushed to +the tail of a topic. As preceding messages are pulled, the message advances +up the queue until it reaches its head and can be consumed by RavenDB's sink. + + + +--- + + + +## Running user-defined scripts + +A sink task's script is a JavaScript segment. Its basic role is to retrieve +selected Kafka messages or message properties, and construct documents that +will then be stored in RavenDB. + +The script can store the whole message as a document, as in this segment: + +```javascript +// Add the document a metadata `@collection` property to keep it in +// this collection, or do not set it to store the document in @empty). +this['@metadata']['@collection'] = 'Orders'; +// Store the message as is, using its Id property as its RavenDB Id as well. put(this.Id.toString(), this) -`} - - - -But the script can also retrieve some information from the read message -and construct a new document that doesn't resemble the original message. -Scripts often apply two sections: a section that creates a JSON object -that defines the document's structure and contents, and a second section -that stores the document. - -E.g., for Kafka messages of this format - - - -{`\{ +``` + +
+ +The script can also retrieve selected information from the read message and +construct a new document that does not resemble the original message. +Scripts often apply two sections: a section that creates a JSON object +that defines the document's structure and contents, and a second section +that stores the document. + +E.g., for Kafka messages of this format: + +```json +{ "Id" : 13, "FirstName" : "John", "LastName" : "Doe" -\} -`} -
-
- -We can create this script - - - -{`var item = \{ - Id : this.Id, - FirstName : this.FirstName, - LastName : this.LastName, - FullName : this.FirstName + ' ' + this.LastName, - "@metadata" : \{ +} +``` + +
+ +We can create this script: + +```javascript +var item = { + Id : this.Id, + FirstName : this.FirstName, + LastName : this.LastName, + FullName : this.FirstName + ' ' + this.LastName, + "@metadata" : { "@collection" : "Users" - \} -\}; + } +}; // Use .toString() to pass the Id as a string even if Kafka provides it as a number put(this.Id.toString(), item) -`} -
-
- -The script can also apply various other JavaScript commands, including -`load` to load a RavenDB document (e.g. to construct a document that -includes data from the retrieved message and complementing data from -existing RavenDB documents), `del` to remove existing RavenDB documents, -and [many others](../../../server/kb/javascript-engine.mdx#predefined-javascript-functions). -#### Storing documents in RavenDB collections - -The sink task consumes batches of queued messages and stores them in RavenDB -in a transactional manner, processing either the entire batch or none of it. +``` + +
+ +The script can also apply various other JavaScript commands, including +`load` to load a RavenDB document (e.g. to construct a document that +includes data from the retrieved message and complementing data from +existing RavenDB documents), `del` to remove existing RavenDB documents, +and [many others](../../../server/kb/javascript-engine.mdx#predefined-javascript-functions). + +
+ +--- + + + +## Storing documents in RavenDB collections + +The sink task consumes batches of queued messages and stores them in RavenDB +in a transactional manner, processing either the entire batch or none of it. + -Some script processing errors are allowed; when such an error occurs RavenDB -will skip the affected message, record the event in the logs, and alert the -user in Studio, but **continue processing the batch**. +Some script processing errors are allowed; when such an error occurs RavenDB +will skip the affected message, record the event in the logs, and alert the +user in Studio, but **continue processing the batch**. -Once a batch is consumed, the task confirms it by calling `kafkaConsumer.Commit()`. +Once a batch is consumed, the task confirms it by calling `kafkaConsumer.Commit()`. -Note that the number of documents included in a batch is +Note that the number of documents included in a batch is [configurable](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#configuration-options). -Producers may enqueue -[multiple instances](../../../server/ongoing-tasks/etl/queue-etl/kafka.mdx#idempotence-and-message-duplication) +Producers may enqueue +[multiple instances](../../../server/ongoing-tasks/etl/queue-etl/kafka.mdx#idempotence-and-message-duplication) of the same document. -if processing each message only once is important to the consumer, -it is **the consumer's responsibility** to verify the uniqueness of -each consumed message. - -Note that as long as the **Id** property of Kafka messages is preserved -(so duplicate messages share an Id), the script's `put(ID, { ... })` -command will overwrite a previous document with the same Id and only -one copy of it will remain. +If processing each message only once is important to the consumer, +it is **the consumer's responsibility** to verify the uniqueness of +each consumed message. + +Note that as long as the **Id** property of Kafka messages is preserved +(so duplicate messages share an Id), the script's `put(ID, { ... })` +command will overwrite a previous document with the same Id and only +one copy of it will remain. + +
-## Client API + -#### Add a Kafka Connection String + -Prior to defining a Kafka sink task, add a **Kafka connection string** -that the task will use to connect the message broker's bootstrap servers. +## Add a Kafka Connection String -To create the connection string: +Prior to defining a Kafka sink task, add a **Kafka connection string** +that the task will use to connect the message broker's bootstrap servers. -* Create a `QueueConnectionString`instance with the connection string configuration. - Pass it to the `PutConnectionStringOperation` store operation to add the connection string. +To create the connection string: - `QueueConnectionString`: - - -{`// Add Kafka connection string +* Create a `QueueConnectionString` instance with the connection string configuration. + Pass it to the `PutConnectionStringOperation` store operation to add the connection string. + +`QueueConnectionString`: + +```csharp +// Add Kafka connection string var res = store.Maintenance.Send( new PutConnectionStringOperation( new QueueConnectionString - \{ + { Name = "KafkaConStr", BrokerType = QueueBrokerType.Kafka, KafkaConnectionSettings = new KafkaConnectionSettings() - \{ BootstrapServers = "localhost:9092" \} - \})); -`} - - - - `QueueBrokerType`: - - -{`public enum QueueBrokerType -\{ + { BootstrapServers = "localhost:9092" } + })); +``` + +
+ +`QueueBrokerType`: + +```csharp +public enum QueueBrokerType +{ None, Kafka, RabbitMq -\} -`} -
-
- - | Property | Type | Description | - |:-------------|:-------------|:-------------| - | **Name** | `string` | Connection string name | - | **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.Kafka` for a Kafka connection string | - | **KafkaConnectionSettings** | `KafkaConnectionSettings[]` | A list of comma-separated host:port URLs to Kafka brokers | -#### Add a Kafka Sink Task - -To create the Sink task: - -* Create `QueueSinkScript` instances to define scripts with which the - task can process retrieved messages, apply JavaScript commands, construct - documents and store them in RavenDB. - - -{`// Define a Sink script +} +``` + +
+ +| Property | Type | Description | +|---|---|---| +| **Name** | `string` | Connection string name | +| **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.Kafka` for a Kafka connection string | +| **KafkaConnectionSettings** | `KafkaConnectionSettings[]` | A list of comma-separated host:port URLs to Kafka brokers | + +
+ +--- + + + +## Add a Kafka Sink Task + +To create the Sink task: + +* Create `QueueSinkScript` instances to define scripts with which the + task can process retrieved messages, apply JavaScript commands, construct + documents and store them in RavenDB. + +```csharp +// Define a Sink script QueueSinkScript queueSinkScript = new QueueSinkScript -\{ +{ // Script name Name = "orders", // A list of Kafka topics to connect - Queues = new List() \{ "orders" \}, + Queues = new List() { "orders" }, // Apply this script - Script = @"this['@metadata']['@collection'] = 'Orders'; + Script = @"this['@metadata']['@collection'] = 'Orders'; put(this.Id.toString(), this)" -\}; -`} - - +}; +``` + +
-* Prepare a `QueueSinkConfiguration`object with the sink task configuration. +* Prepare a `QueueSinkConfiguration` object with the sink task configuration. - `QueueSinkConfiguration` properties: +`QueueSinkConfiguration` properties: - | Property | Type | Description | - |:-------------|:-------------|:-------------| - | **Name** | `string` | The sink task name | - | **ConnectionStringName** | `string` | The registered connection string name | - | **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.Kafka` to define a Kafka sink task | - | **Scripts** | `List` | A list of scripts | +| Property | Type | Description | +|---|---|---| +| **Name** | `string` | The sink task name | +| **ConnectionStringName** | `string` | The registered connection string name | +| **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.Kafka` to define a Kafka sink task | +| **Scripts** | `List` | A list of scripts | -* Pass this object to the `AddQueueSinkOperation` store operation to add the Sink task. +* Pass this object to the `AddQueueSinkOperation` store operation to add the Sink task. - `QueueSinkScript` properties: +`QueueSinkScript` properties: - | Property | Type | Description | - |:-------------|:-------------|:-------------| - | **Name** | `string` | Script name | - | **Queues** | `List` | A list of Kafka topics to consume messages from | - | **Script** | `string ` | The script contents | +| Property | Type | Description | +|---|---|---| +| **Name** | `string` | Script name | +| **Queues** | `List` | A list of Kafka topics to consume messages from | +| **Script** | `string` | The script contents | -**Code Sample**: - - -{`// Add Kafka connection string +#### Example: + +```csharp +// Add Kafka connection string var res = store.Maintenance.Send( new PutConnectionStringOperation( new QueueConnectionString - \{ + { Name = "KafkaConStr", BrokerType = QueueBrokerType.Kafka, - KafkaConnectionSettings = new KafkaConnectionSettings() - \{ BootstrapServers = "localhost:9092" \} - \})); + KafkaConnectionSettings = new KafkaConnectionSettings() + { BootstrapServers = "localhost:9092" } + })); // Define a Sink script QueueSinkScript queueSinkScript = new QueueSinkScript -\{ +{ // Script name Name = "orders", // A list of Kafka topics to connect - Queues = new List() \{ "orders" \}, + Queues = new List() { "orders" }, // Apply this script - Script = @"this['@metadata']['@collection'] = 'Orders'; + Script = @"this['@metadata']['@collection'] = 'Orders'; put(this.Id.toString(), this)" -\}; +}; -// Define a Kafka configuration +// Define a Kafka sink task configuration var config = new QueueSinkConfiguration() -\{ +{ // Sink name Name = "KafkaSinkTaskName", // The connection string to connect the broker with @@ -287,27 +333,25 @@ var config = new QueueSinkConfiguration() // What queue broker is this task using BrokerType = QueueBrokerType.Kafka, // The list of scripts to run - Scripts = \{ queueSinkScript \} -\}; + Scripts = { queueSinkScript } +}; -AddQueueSinkOperationResult addQueueSinkOperationResult = +AddQueueSinkOperationResult addQueueSinkOperationResult = store.Maintenance.Send(new AddQueueSinkOperation(config)); -`} - - +``` +
+
-## Configuration Options + -Use these configuration options to gain more control over queue sink tasks. +Use these configuration options to gain more control over queue sink tasks. * [QueueSink.MaxBatchSize](../../../server/configuration/queue-sink-configuration.mdx#queuesinkmaxbatchsize) The maximum number of pulled messages consumed in a single batch. * [QueueSink.MaxFallbackTimeInSec](../../../server/configuration/queue-sink-configuration.mdx#queuesinkmaxfallbacktimeinsec) - The maximum number of seconds the Queue Sink process will be in a fallback - mode (i.e. suspending the process) after a connection failure. - - - + The maximum number of seconds the Queue Sink process will be in a fallback + mode (i.e. suspending the process) after a connection failure. + diff --git a/versioned_docs/version-6.2/server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx b/versioned_docs/version-6.2/server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx index 919752419b..ac6e88140f 100644 --- a/versioned_docs/version-6.2/server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx +++ b/versioned_docs/version-6.2/server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx @@ -1,300 +1,344 @@ --- title: "Queue Sink: RabbitMQ" sidebar_label: RabbitMQ Queue Sink +description: "Consume messages from RabbitMQ queues and write them into RavenDB documents using queue sink tasks with JavaScript processing scripts." sidebar_position: 2 --- import Admonition from '@theme/Admonition'; -import Tabs from '@theme/Tabs'; -import TabItem from '@theme/TabItem'; import CodeBlock from '@theme/CodeBlock'; -import LanguageSwitcher from "@site/src/components/LanguageSwitcher"; -import LanguageContent from "@site/src/components/LanguageContent"; +import Panel from "@site/src/components/Panel"; +import ContentFrame from "@site/src/components/ContentFrame"; # Queue Sink: RabbitMQ + -* **RabbitMQ** brokers are designed to disperse data to multiple queues, - making for a flexible data channeling system that can easily handle complex - message streaming scenarios. +* **RabbitMQ** brokers are designed to disperse data to multiple queues, + making for a flexible data channeling system that can handle complex + message streaming scenarios. + +* RavenDB can harness the advantages presented by RabbitMQ brokers both as + a producer (by [running ETL tasks](../../../server/ongoing-tasks/etl/queue-etl/rabbit-mq.mdx)) + and as a **consumer** (using a sink task to consume enqueued messages). + +* To use RavenDB as a consumer, define an ongoing Sink task that will read + batches of JSON formatted messages from RabbitMQ queues, construct documents + using user-defined scripts, and store the documents in RavenDB collections. + +* In this page: + * [The RabbitMQ Sink Task](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#the-rabbitmq-sink-task) + * [Connecting a RabbitMQ broker](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#connecting-a-rabbitmq-broker) + * [Retrieving messages from RabbitMQ queues](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#retrieving-messages-from-rabbitmq-queues) + * [Running user-defined scripts](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#running-user-defined-scripts) + * [Storing documents in RavenDB collections](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#storing-documents-in-ravendb-collections) + * [Client API](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#client-api) + * [Add a RabbitMQ Connection String](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#add-a-rabbitmq-connection-string) + * [Add a RabbitMQ Sink Task](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#add-a-rabbitmq-sink-task) + * [Configuration Options](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#configuration-options) -* RavenDB can harness the advantages presented by RabbitMQ brokers both as - a producer (by [running ETL tasks](../../../server/ongoing-tasks/etl/queue-etl/rabbit-mq.mdx)) - and as a **consumer** (using a sink task to consume enqueued messages). + -* To use RavenDB as a consumer, define an ongoing Sink task that will read - batches of JSON formatted messages from RabbitMQ queues, construct documents - using user-defined scripts, and store the documents in RavenDB collections. + -* In this page: - * [The RabbitMQ Sink Task](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#the-rabbitmq-sink-task) - * [Client API](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#client-api) - * [Add a RabbitMQ Connection String](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#add-a-rabbitmq-connection-string) - * [Add a RabbitMQ Sink Task](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#add-a-rabbitmq-sink-task) - * [Configuration Options](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#configuration-options) - -## The RabbitMQ Sink Task + -Users of RavenDB 6.0 and on can create an ongoing Sink task that connects -a RabbitMQ broker, retrieves messages from selected queues, runs a user-defined -script to manipulate data and construct documents, and potentially stores the -created documents in RavenDB collections. -#### Connecting a RabbitMQ broker +## Connecting a RabbitMQ broker + +Users of RavenDB 6.0 and on can create an ongoing Sink task that connects +a RabbitMQ broker, retrieves messages from selected queues, runs a user-defined +script to manipulate data and construct documents, and potentially stores the +created documents in RavenDB collections. In the message broker architecture, RavenDB sinks take the role of data consumers. -A sink would connect a RabbitMQ broker using a connection string, and retrieve messages -from the broker's queues. +A sink would connect a RabbitMQ broker using a connection string, and retrieve messages +from the broker's queues. -Read [below](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#add-a-rabbitmq-connection-string) +Read [below](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#add-a-rabbitmq-connection-string) about adding a connection string via API. -Read [here](../../../studio/database/tasks/ongoing-tasks/rabbitmq-queue-sink.mdx#define-a-rabbitmq-sink-task) -about adding a connection string using Studio. -#### Retrieving messages from RabbitMQ queues - -When a message is sent to a RabbitMQ broker by a producer, it is pushed to -the tail of a queue. As preceding messages are pulled, the message advances -up the queue until it reaches its head and can be consumed by RavenDB's sink. -#### Running user-defined scripts - -A sink task's script is a JavaScript segment. Its basic role is to retrieve -selected RabbitMQ messages or message properties, and construct documents that -will then be stored in RavenDB. - -The script can simply store the whole message as a document, as in this -segment: - - -{`// Add the document a metadata \`@collection\` property to keep it in -// this collection, or do not set it to store the document in @empty). -this['@metadata']['@collection'] = 'Orders'; -// Store the message as is, using its Id property as its RavenDB Id as well. +Read [here](../../../studio/database/tasks/ongoing-tasks/rabbitmq-queue-sink.mdx#define-a-rabbitmq-sink-task) +about adding a connection string using Studio. + + + +--- + + + +## Retrieving messages from RabbitMQ queues + + +While a queue sink task is running, it prevents the host database from being unloaded due to idle operations. +This ensures that message consumption from RabbitMQ queues continues uninterrupted, even if the database would otherwise go idle. + + +When a message is sent to a RabbitMQ broker by a producer, it is pushed to +the tail of a queue. As preceding messages are pulled, the message advances +up the queue until it reaches its head and can be consumed by RavenDB's sink. + + + +--- + + + +## Running user-defined scripts + +A sink task's script is a JavaScript segment. Its basic role is to retrieve +selected RabbitMQ messages or message properties, and construct documents that +will then be stored in RavenDB. + +The script can store the whole message as a document, as in this segment: + +```javascript +// Add the document a metadata `@collection` property to keep it in +// this collection, or do not set it to store the document in @empty). +this['@metadata']['@collection'] = 'Orders'; +// Store the message as is, using its Id property as its RavenDB Id as well. put(this.Id.toString(), this) -`} - - - -But the script can also retrieve some information from the read message -and construct a new document that doesn't resemble the original message. -Scripts often apply two sections: a section that creates a JSON object -that defines the document's structure and contents, and a second section -that stores the document. - -E.g., for RabbitMQ messages of this format - - - -{`\{ +``` + +
+ +The script can also retrieve selected information from the read message and +construct a new document that does not resemble the original message. +Scripts often apply two sections: a section that creates a JSON object +that defines the document's structure and contents, and a second section +that stores the document. + +E.g., for RabbitMQ messages of this format: + +```json +{ "Id" : 13, "FirstName" : "John", "LastName" : "Doe" -\} -`} -
-
- -We can create this script - - - -{`var item = \{ - Id : this.Id, - FirstName : this.FirstName, - LastName : this.LastName, - FullName : this.FirstName + ' ' + this.LastName, - "@metadata" : \{ +} +``` + +
+ +We can create this script: + +```javascript +var item = { + Id : this.Id, + FirstName : this.FirstName, + LastName : this.LastName, + FullName : this.FirstName + ' ' + this.LastName, + "@metadata" : { "@collection" : "Users" - \} -\}; + } +}; // Use .toString() to pass the Id as a string even if RabbitMQ provides it as a number put(this.Id.toString(), item) -`} -
-
- -The script can also apply various other JavaScript commands, including -`load` to load a RavenDB document (e.g. to construct a document that -includes data from the retrieved message and complementing data from -existing RavenDB documents), `del` to remove existing RavenDB documents, -and [many others](../../../server/kb/javascript-engine.mdx#predefined-javascript-functions). -#### Storing documents in RavenDB collections - -The sink task consumes batches of queued messages and stores them in RavenDB -in a transactional manner, processing either the entire batch or none of it. +``` + +
+ +The script can also apply various other JavaScript commands, including +`load` to load a RavenDB document (e.g. to construct a document that +includes data from the retrieved message and complementing data from +existing RavenDB documents), `del` to remove existing RavenDB documents, +and [many others](../../../server/kb/javascript-engine.mdx#predefined-javascript-functions). + +
+ +--- + + + +## Storing documents in RavenDB collections + +The sink task consumes batches of queued messages and stores them in RavenDB +in a transactional manner, processing either the entire batch or none of it. + -Some script processing errors are allowed; when such an error occurs RavenDB -will skip the affected message, record the event in the logs, and alert the -user in Studio, but **continue processing the batch**. +Some script processing errors are allowed; when such an error occurs RavenDB +will skip the affected message, record the event in the logs, and alert the +user in Studio, but **continue processing the batch**. -Once a batch is consumed, the task confirms it by sending `_channel.BasicAck`. +Once a batch is consumed, the task confirms it by sending `_channel.BasicAck`. -Note that the number of documents included in a batch is +Note that the number of documents included in a batch is [configurable](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#configuration-options). -Producers may enqueue -[multiple instances](../../../server/ongoing-tasks/etl/queue-etl/rabbit-mq.mdx#message-duplication) +Producers may enqueue +[multiple instances](../../../server/ongoing-tasks/etl/queue-etl/rabbit-mq.mdx#message-duplication) of the same document. -if processing each message only once is important to the consumer, -it is **the consumer's responsibility** to verify the uniqueness of -each consumed message. - -Note that as long as the **Id** property of RabbitMQ messages is preserved -(so duplicate messages share an Id), the script's `put(ID, { ... })` command -will overwrite a previous document with the same Id and only one copy of -it will remain. +If processing each message only once is important to the consumer, +it is **the consumer's responsibility** to verify the uniqueness of +each consumed message. + +Note that as long as the **Id** property of RabbitMQ messages is preserved +(so duplicate messages share an Id), the script's `put(ID, { ... })` command +will overwrite a previous document with the same Id and only one copy of +it will remain. + + +
+ -## Client API + -#### Add a RabbitMQ Connection String +## Add a RabbitMQ Connection String -Prior to defining a RabbitMQ sink task, add a **RabbitMQ connection string** -that the task will use to connect the message brokers. +Prior to defining a RabbitMQ sink task, add a **RabbitMQ connection string** +that the task will use to connect the message brokers. -To create the connection string: +To create the connection string: -* Create a `QueueConnectionString`instance with the connection string configuration. - Pass it to the `PutConnectionStringOperation` store operation to add the connection string. +* Create a `QueueConnectionString` instance with the connection string configuration. + Pass it to the `PutConnectionStringOperation` store operation to add the connection string. - `QueueConnectionString`: - - -{`// Add RabbitMQ connection string +`QueueConnectionString`: + +```csharp +// Add RabbitMQ connection string var res = store.Maintenance.Send( new PutConnectionStringOperation( new QueueConnectionString - \{ + { Name = "RabbitMqConStr", BrokerType = QueueBrokerType.RabbitMq, RabbitMqConnectionSettings = new RabbitMqConnectionSettings() - \{ ConnectionString = "amqp://guest:guest@localhost:5672/" \} - \})); -`} - - - - `QueueBrokerType`: - - -{`public enum QueueBrokerType -\{ + { ConnectionString = "amqp://guest:guest@localhost:5672/" } + })); +``` + +
+ +`QueueBrokerType`: + +```csharp +public enum QueueBrokerType +{ None, Kafka, RabbitMq -\} -`} -
-
- - | Property | Type | Description | - |:-------------|:-------------|:-------------| - | **Name** | `string` | Connection string name | - | **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.RabbitMq` for a RabbitMQ connection string | - | **RabbitMqConnectionSettings** | `RabbitMqConnectionSettings[]` | A list of strings indicating RabbitMQ brokers connection details | -#### Add a RabbitMQ Sink Task - -To create the Sink task: - -* Create `QueueSinkScript` instances to define scripts with which the - task can process retrieved messages, apply JavaScript commands, construct - documents and store them in RavenDB. - - -{`// Define a Sink script +} +``` + +
+ +| Property | Type | Description | +|---|---|---| +| **Name** | `string` | Connection string name | +| **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.RabbitMq` for a RabbitMQ connection string | +| **RabbitMqConnectionSettings** | `RabbitMqConnectionSettings[]` | A list of strings indicating RabbitMQ brokers connection details | + +
+ +--- + + + +## Add a RabbitMQ Sink Task + +To create the Sink task: + +* Create `QueueSinkScript` instances to define scripts with which the + task can process retrieved messages, apply JavaScript commands, construct + documents and store them in RavenDB. + +```csharp +// Define a Sink script QueueSinkScript queueSinkScript = new QueueSinkScript -\{ +{ // Script name Name = "orders", // A list of RabbitMQ queues to connect - Queues = new List() \{ "orders" \}, + Queues = new List() { "orders" }, // Apply this script - Script = @"this['@metadata']['@collection'] = 'Orders'; + Script = @"this['@metadata']['@collection'] = 'Orders'; put(this.Id.toString(), this)" -\}; -`} - - - -* Prepare a `QueueSinkConfiguration`object with the sink task configuration. - - `QueueSinkConfiguration` properties: - - | Property | Type | Description | - |:-------------|:-------------|:-------------| - | **Name** | `string` | The sink task name | - | **ConnectionStringName** | `string` | The registered connection string name | - | **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.RabbitMq` to define a RabbitMQ sink task | - | **Scripts** | `List` | A list of scripts | - -* Pass this object to the `AddQueueSinkOperation` store operation to add the Sink task. - - `QueueSinkScript` properties: - - | Property | Type | Description | - |:-------------|:-------------|:-------------| - | **Name** | `string` | Script name| - | **Queues** | `List` | A list of RabbitMQ queues to consume messages from | - | **Script** | `string ` | The script contents | - -**Code Sample**: - - -{`// Add Kafka connection string +}; +``` + +
+ +* Prepare a `QueueSinkConfiguration` object with the sink task configuration. + +`QueueSinkConfiguration` properties: + +| Property | Type | Description | +|---|---|---| +| **Name** | `string` | The sink task name | +| **ConnectionStringName** | `string` | The registered connection string name | +| **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.RabbitMq` to define a RabbitMQ sink task | +| **Scripts** | `List` | A list of scripts | + +* Pass this object to the `AddQueueSinkOperation` store operation to add the Sink task. + +`QueueSinkScript` properties: + +| Property | Type | Description | +|---|---|---| +| **Name** | `string` | Script name | +| **Queues** | `List` | A list of RabbitMQ queues to consume messages from | +| **Script** | `string` | The script contents | + +#### Example: + +```csharp +// Add RabbitMQ connection string var res = store.Maintenance.Send( new PutConnectionStringOperation( new QueueConnectionString - \{ - Name = "KafkaConStr", - BrokerType = QueueBrokerType.Kafka, - KafkaConnectionSettings = new KafkaConnectionSettings() - \{ BootstrapServers = "localhost:9092" \} - \})); + { + Name = "RabbitMqConStr", + BrokerType = QueueBrokerType.RabbitMq, + RabbitMqConnectionSettings = new RabbitMqConnectionSettings() + { ConnectionString = "amqp://guest:guest@localhost:5672/" } + })); // Define a Sink script QueueSinkScript queueSinkScript = new QueueSinkScript -\{ +{ // Script name Name = "orders", - // A list of Kafka topics to connect - Queues = new List() \{ "orders" \}, + // A list of RabbitMQ queues to connect + Queues = new List() { "orders" }, // Apply this script - Script = @"this['@metadata']['@collection'] = 'Orders'; + Script = @"this['@metadata']['@collection'] = 'Orders'; put(this.Id.toString(), this)" -\}; +}; -// Define a Kafka configuration +// Define a RabbitMQ sink task configuration var config = new QueueSinkConfiguration() -\{ +{ // Sink name - Name = "KafkaSinkTaskName", + Name = "RabbitMqSinkTaskName", // The connection string to connect the broker with - ConnectionStringName = "KafkaConStr", + ConnectionStringName = "RabbitMqConStr", // What queue broker is this task using - BrokerType = QueueBrokerType.Kafka, + BrokerType = QueueBrokerType.RabbitMq, // The list of scripts to run - Scripts = \{ queueSinkScript \} -\}; + Scripts = { queueSinkScript } +}; -AddQueueSinkOperationResult addQueueSinkOperationResult = +AddQueueSinkOperationResult addQueueSinkOperationResult = store.Maintenance.Send(new AddQueueSinkOperation(config)); -`} -
-
+``` +
+
-## Configuration Options + -Use these configuration options to gain more control over queue sink tasks. +Use these configuration options to gain more control over queue sink tasks. * [QueueSink.MaxBatchSize](../../../server/configuration/queue-sink-configuration.mdx#queuesinkmaxbatchsize) The maximum number of pulled messages consumed in a single batch. * [QueueSink.MaxFallbackTimeInSec](../../../server/configuration/queue-sink-configuration.mdx#queuesinkmaxfallbacktimeinsec) - The maximum number of seconds the Queue Sink process will be in a fallback - mode (i.e. suspending the process) after a connection failure. - - - + The maximum number of seconds the Queue Sink process will be in a fallback + mode (i.e. suspending the process) after a connection failure. + diff --git a/versioned_docs/version-7.0/server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx b/versioned_docs/version-7.0/server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx index 00204c4ccd..11b6df492b 100644 --- a/versioned_docs/version-7.0/server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx +++ b/versioned_docs/version-7.0/server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx @@ -1,285 +1,331 @@ --- title: "Queue Sink: Apache Kafka" sidebar_label: Kafka Queue Sink +description: "Consume messages from Kafka topics and write them into RavenDB documents using queue sink tasks with JavaScript processing scripts." sidebar_position: 1 --- import Admonition from '@theme/Admonition'; -import Tabs from '@theme/Tabs'; -import TabItem from '@theme/TabItem'; import CodeBlock from '@theme/CodeBlock'; -import LanguageSwitcher from "@site/src/components/LanguageSwitcher"; -import LanguageContent from "@site/src/components/LanguageContent"; +import Panel from "@site/src/components/Panel"; +import ContentFrame from "@site/src/components/ContentFrame"; # Queue Sink: Apache Kafka + -* **Apache Kafka** is a distributed, high-performance, transactional messaging - platform, that remains performant as the number of messages it needs to process - increases and the number of events it needs to stream climbs to the big-data zone. - -* RavenDB can harness the advantages presented by message brokers like Kafka - both as a producer (by [running ETL tasks](../../../server/ongoing-tasks/etl/queue-etl/kafka.mdx)) - and as a **consumer** (using a sink task to consume enqueued messages). - -* To use RavenDB as a consumer, define an ongoing Sink task that will read batches - of enqueued JSON formatted messages from Kafka topics, construct documents using - user-defined scripts, and store the documents in RavenDB collections. - -* In this page: - * [The Queue Sink Task](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#the-queue-sink-task) - * [Client API](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#client-api) - * [Add a Kafka Connection String](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#add-a-kafka-connection-string) - * [Add a Kafka Sink Task](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#add-a-kafka-sink-task) - * [Configuration Options](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#configuration-options) +* **Apache Kafka** is a distributed, high-performance, transactional messaging + platform, that remains performant as the number of messages it needs to process + increases and the number of events it needs to stream climbs to the big-data zone. + +* RavenDB can harness the advantages presented by message brokers like Kafka + both as a producer (by [running ETL tasks](../../../server/ongoing-tasks/etl/queue-etl/kafka.mdx)) + and as a **consumer** (using a sink task to consume enqueued messages). + +* To use RavenDB as a consumer, define an ongoing Sink task that will read batches + of enqueued JSON formatted messages from Kafka topics, construct documents using + user-defined scripts, and store the documents in RavenDB collections. + +* In this page: + * [The Queue Sink Task](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#the-queue-sink-task) + * [Connecting a Kafka broker](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#connecting-a-kafka-broker) + * [Retrieving enqueued messages from selected Kafka topics](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#retrieving-enqueued-messages-from-selected-kafka-topics) + * [Running user-defined scripts](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#running-user-defined-scripts) + * [Storing documents in RavenDB collections](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#storing-documents-in-ravendb-collections) + * [Client API](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#client-api) + * [Add a Kafka Connection String](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#add-a-kafka-connection-string) + * [Add a Kafka Sink Task](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#add-a-kafka-sink-task) + * [Configuration Options](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#configuration-options) + -## The Queue Sink Task -Users of RavenDB 6.0 and on can create an ongoing Sink task that connects -a Kafka broker, retrieves enqueued messages from selected Kafka topics, -runs a user-defined script to manipulate data and construct documents, and -potentially stores the created documents in RavenDB collections. -#### Connecting a Kafka broker + + + + +## Connecting a Kafka broker + +Users of RavenDB 6.0 and on can create an ongoing Sink task that connects +a Kafka broker, retrieves enqueued messages from selected Kafka topics, +runs a user-defined script to manipulate data and construct documents, and +potentially stores the created documents in RavenDB collections. In the message broker architecture, RavenDB sinks take the role of data consumers. -A sink would connect a Kafka broker using a connection string, and retrieve messages -from the broker's Topics. +A sink would connect a Kafka broker using a connection string, and retrieve messages +from the broker's Topics. -Read [below](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#add-a-kafka-connection-string) +Read [below](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#add-a-kafka-connection-string) about adding a connection string via API. -Read [here](../../../studio/database/tasks/ongoing-tasks/kafka-queue-sink.mdx#define-a-kafka-sink-task) -about adding a connection string using Studio. +Read [here](../../../studio/database/tasks/ongoing-tasks/kafka-queue-sink.mdx#define-a-kafka-sink-task) +about adding a connection string using Studio. -Like all ongoing tasks, a sink task is operated by a +Like all ongoing tasks, a sink task is operated by a [responsible node](../../../server/clustering/distribution/highly-available-tasks.mdx#responsible-node). -When the responsibility for the task is moved from one node to another, -e.g. from node `A` to node `B` as a result of node `A` down time: +When the responsibility for the task is moved from one node to another, +e.g. from node `A` to node `B` as a result of node `A` down time: -* The consumer task will maintain the same consumer group id it had on the original node. -* Kafka brokers may **cease serving the sink task for some time** as the Kafka consumer - group rebalances (adapting to the leaving of one node and the joining of another, among - other changes). +* The consumer task will maintain the same consumer group id it had on the original node. +* Kafka brokers may **cease serving the sink task for some time** as the Kafka consumer + group rebalances (adapting to the leaving of one node and the joining of another, among + other changes). -#### Retrieving enqueued messages from selected Kafka topics - -When a message is sent to a Kafka broker by a producer, it is pushed to -the tail of a topic. As preceding messages are pulled, the message advances -up the queue until it reaches its head and can be consumed by RavenDB's sink. -#### Running user-defined scripts - -A sink task's script is a JavaScript segment. Its basic role is to retrieve -selected Kafka messages or message properties, and construct documents that -will then be stored in RavenDB. - -The script can simply store the whole message as a document, as in this -segment: - - -{`// Add the document a metadata \`@collection\` property to keep it in -// this collection, or do not set it to store the document in @empty). -this['@metadata']['@collection'] = 'Orders'; -// Store the message as is, using its Id property as its RavenDB Id as well. + + + +--- + + + +## Retrieving enqueued messages from selected Kafka topics + + +While a queue sink task is running, it prevents the host database from being unloaded due to idle operations. +This ensures that message consumption from Kafka topics continues uninterrupted, even if the database would otherwise go idle. + + +When a message is sent to a Kafka broker by a producer, it is pushed to +the tail of a topic. As preceding messages are pulled, the message advances +up the queue until it reaches its head and can be consumed by RavenDB's sink. + + + +--- + + + +## Running user-defined scripts + +A sink task's script is a JavaScript segment. Its basic role is to retrieve +selected Kafka messages or message properties, and construct documents that +will then be stored in RavenDB. + +The script can store the whole message as a document, as in this segment: + +```javascript +// Add the document a metadata `@collection` property to keep it in +// this collection, or do not set it to store the document in @empty). +this['@metadata']['@collection'] = 'Orders'; +// Store the message as is, using its Id property as its RavenDB Id as well. put(this.Id.toString(), this) -`} - - - -But the script can also retrieve some information from the read message -and construct a new document that doesn't resemble the original message. -Scripts often apply two sections: a section that creates a JSON object -that defines the document's structure and contents, and a second section -that stores the document. - -E.g., for Kafka messages of this format - - - -{`\{ +``` + +
+ +The script can also retrieve selected information from the read message and +construct a new document that does not resemble the original message. +Scripts often apply two sections: a section that creates a JSON object +that defines the document's structure and contents, and a second section +that stores the document. + +E.g., for Kafka messages of this format: + +```json +{ "Id" : 13, "FirstName" : "John", "LastName" : "Doe" -\} -`} -
-
- -We can create this script - - - -{`var item = \{ - Id : this.Id, - FirstName : this.FirstName, - LastName : this.LastName, - FullName : this.FirstName + ' ' + this.LastName, - "@metadata" : \{ +} +``` + +
+ +We can create this script: + +```javascript +var item = { + Id : this.Id, + FirstName : this.FirstName, + LastName : this.LastName, + FullName : this.FirstName + ' ' + this.LastName, + "@metadata" : { "@collection" : "Users" - \} -\}; + } +}; // Use .toString() to pass the Id as a string even if Kafka provides it as a number put(this.Id.toString(), item) -`} -
-
- -The script can also apply various other JavaScript commands, including -`load` to load a RavenDB document (e.g. to construct a document that -includes data from the retrieved message and complementing data from -existing RavenDB documents), `del` to remove existing RavenDB documents, -and [many others](../../../server/kb/javascript-engine.mdx#predefined-javascript-functions). -#### Storing documents in RavenDB collections - -The sink task consumes batches of queued messages and stores them in RavenDB -in a transactional manner, processing either the entire batch or none of it. +``` + +
+ +The script can also apply various other JavaScript commands, including +`load` to load a RavenDB document (e.g. to construct a document that +includes data from the retrieved message and complementing data from +existing RavenDB documents), `del` to remove existing RavenDB documents, +and [many others](../../../server/kb/javascript-engine.mdx#predefined-javascript-functions). + +
+ +--- + + + +## Storing documents in RavenDB collections + +The sink task consumes batches of queued messages and stores them in RavenDB +in a transactional manner, processing either the entire batch or none of it. + -Some script processing errors are allowed; when such an error occurs RavenDB -will skip the affected message, record the event in the logs, and alert the -user in Studio, but **continue processing the batch**. +Some script processing errors are allowed; when such an error occurs RavenDB +will skip the affected message, record the event in the logs, and alert the +user in Studio, but **continue processing the batch**. -Once a batch is consumed, the task confirms it by calling `kafkaConsumer.Commit()`. +Once a batch is consumed, the task confirms it by calling `kafkaConsumer.Commit()`. -Note that the number of documents included in a batch is +Note that the number of documents included in a batch is [configurable](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#configuration-options). -Producers may enqueue -[multiple instances](../../../server/ongoing-tasks/etl/queue-etl/kafka.mdx#idempotence-and-message-duplication) +Producers may enqueue +[multiple instances](../../../server/ongoing-tasks/etl/queue-etl/kafka.mdx#idempotence-and-message-duplication) of the same document. -if processing each message only once is important to the consumer, -it is **the consumer's responsibility** to verify the uniqueness of -each consumed message. - -Note that as long as the **Id** property of Kafka messages is preserved -(so duplicate messages share an Id), the script's `put(ID, { ... })` -command will overwrite a previous document with the same Id and only -one copy of it will remain. +If processing each message only once is important to the consumer, +it is **the consumer's responsibility** to verify the uniqueness of +each consumed message. + +Note that as long as the **Id** property of Kafka messages is preserved +(so duplicate messages share an Id), the script's `put(ID, { ... })` +command will overwrite a previous document with the same Id and only +one copy of it will remain. + +
-## Client API + -#### Add a Kafka Connection String + -Prior to defining a Kafka sink task, add a **Kafka connection string** -that the task will use to connect the message broker's bootstrap servers. +## Add a Kafka Connection String -To create the connection string: +Prior to defining a Kafka sink task, add a **Kafka connection string** +that the task will use to connect the message broker's bootstrap servers. -* Create a `QueueConnectionString`instance with the connection string configuration. - Pass it to the `PutConnectionStringOperation` store operation to add the connection string. +To create the connection string: - `QueueConnectionString`: - - -{`// Add Kafka connection string +* Create a `QueueConnectionString` instance with the connection string configuration. + Pass it to the `PutConnectionStringOperation` store operation to add the connection string. + +`QueueConnectionString`: + +```csharp +// Add Kafka connection string var res = store.Maintenance.Send( new PutConnectionStringOperation( new QueueConnectionString - \{ + { Name = "KafkaConStr", BrokerType = QueueBrokerType.Kafka, KafkaConnectionSettings = new KafkaConnectionSettings() - \{ BootstrapServers = "localhost:9092" \} - \})); -`} - - - - `QueueBrokerType`: - - -{`public enum QueueBrokerType -\{ + { BootstrapServers = "localhost:9092" } + })); +``` + +
+ +`QueueBrokerType`: + +```csharp +public enum QueueBrokerType +{ None, Kafka, RabbitMq -\} -`} -
-
- - | Property | Type | Description | - |:-------------|:-------------|:-------------| - | **Name** | `string` | Connection string name | - | **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.Kafka` for a Kafka connection string | - | **KafkaConnectionSettings** | `KafkaConnectionSettings[]` | A list of comma-separated host:port URLs to Kafka brokers | -#### Add a Kafka Sink Task - -To create the Sink task: - -* Create `QueueSinkScript` instances to define scripts with which the - task can process retrieved messages, apply JavaScript commands, construct - documents and store them in RavenDB. - - -{`// Define a Sink script +} +``` + +
+ +| Property | Type | Description | +|---|---|---| +| **Name** | `string` | Connection string name | +| **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.Kafka` for a Kafka connection string | +| **KafkaConnectionSettings** | `KafkaConnectionSettings[]` | A list of comma-separated host:port URLs to Kafka brokers | + +
+ +--- + + + +## Add a Kafka Sink Task + +To create the Sink task: + +* Create `QueueSinkScript` instances to define scripts with which the + task can process retrieved messages, apply JavaScript commands, construct + documents and store them in RavenDB. + +```csharp +// Define a Sink script QueueSinkScript queueSinkScript = new QueueSinkScript -\{ +{ // Script name Name = "orders", // A list of Kafka topics to connect - Queues = new List() \{ "orders" \}, + Queues = new List() { "orders" }, // Apply this script - Script = @"this['@metadata']['@collection'] = 'Orders'; + Script = @"this['@metadata']['@collection'] = 'Orders'; put(this.Id.toString(), this)" -\}; -`} - - +}; +``` + +
-* Prepare a `QueueSinkConfiguration`object with the sink task configuration. +* Prepare a `QueueSinkConfiguration` object with the sink task configuration. - `QueueSinkConfiguration` properties: +`QueueSinkConfiguration` properties: - | Property | Type | Description | - |:-------------|:-------------|:-------------| - | **Name** | `string` | The sink task name | - | **ConnectionStringName** | `string` | The registered connection string name | - | **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.Kafka` to define a Kafka sink task | - | **Scripts** | `List` | A list of scripts | +| Property | Type | Description | +|---|---|---| +| **Name** | `string` | The sink task name | +| **ConnectionStringName** | `string` | The registered connection string name | +| **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.Kafka` to define a Kafka sink task | +| **Scripts** | `List` | A list of scripts | -* Pass this object to the `AddQueueSinkOperation` store operation to add the Sink task. +* Pass this object to the `AddQueueSinkOperation` store operation to add the Sink task. - `QueueSinkScript` properties: +`QueueSinkScript` properties: - | Property | Type | Description | - |:-------------|:-------------|:-------------| - | **Name** | `string` | Script name | - | **Queues** | `List` | A list of Kafka topics to consume messages from | - | **Script** | `string ` | The script contents | +| Property | Type | Description | +|---|---|---| +| **Name** | `string` | Script name | +| **Queues** | `List` | A list of Kafka topics to consume messages from | +| **Script** | `string` | The script contents | -**Code Sample**: - - -{`// Add Kafka connection string +#### Example: + +```csharp +// Add Kafka connection string var res = store.Maintenance.Send( new PutConnectionStringOperation( new QueueConnectionString - \{ + { Name = "KafkaConStr", BrokerType = QueueBrokerType.Kafka, - KafkaConnectionSettings = new KafkaConnectionSettings() - \{ BootstrapServers = "localhost:9092" \} - \})); + KafkaConnectionSettings = new KafkaConnectionSettings() + { BootstrapServers = "localhost:9092" } + })); // Define a Sink script QueueSinkScript queueSinkScript = new QueueSinkScript -\{ +{ // Script name Name = "orders", // A list of Kafka topics to connect - Queues = new List() \{ "orders" \}, + Queues = new List() { "orders" }, // Apply this script - Script = @"this['@metadata']['@collection'] = 'Orders'; + Script = @"this['@metadata']['@collection'] = 'Orders'; put(this.Id.toString(), this)" -\}; +}; -// Define a Kafka configuration +// Define a Kafka sink task configuration var config = new QueueSinkConfiguration() -\{ +{ // Sink name Name = "KafkaSinkTaskName", // The connection string to connect the broker with @@ -287,27 +333,25 @@ var config = new QueueSinkConfiguration() // What queue broker is this task using BrokerType = QueueBrokerType.Kafka, // The list of scripts to run - Scripts = \{ queueSinkScript \} -\}; + Scripts = { queueSinkScript } +}; -AddQueueSinkOperationResult addQueueSinkOperationResult = +AddQueueSinkOperationResult addQueueSinkOperationResult = store.Maintenance.Send(new AddQueueSinkOperation(config)); -`} - - +``` +
+
-## Configuration Options + -Use these configuration options to gain more control over queue sink tasks. +Use these configuration options to gain more control over queue sink tasks. * [QueueSink.MaxBatchSize](../../../server/configuration/queue-sink-configuration.mdx#queuesinkmaxbatchsize) The maximum number of pulled messages consumed in a single batch. * [QueueSink.MaxFallbackTimeInSec](../../../server/configuration/queue-sink-configuration.mdx#queuesinkmaxfallbacktimeinsec) - The maximum number of seconds the Queue Sink process will be in a fallback - mode (i.e. suspending the process) after a connection failure. - - - + The maximum number of seconds the Queue Sink process will be in a fallback + mode (i.e. suspending the process) after a connection failure. + diff --git a/versioned_docs/version-7.0/server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx b/versioned_docs/version-7.0/server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx index 919752419b..ac6e88140f 100644 --- a/versioned_docs/version-7.0/server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx +++ b/versioned_docs/version-7.0/server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx @@ -1,300 +1,344 @@ --- title: "Queue Sink: RabbitMQ" sidebar_label: RabbitMQ Queue Sink +description: "Consume messages from RabbitMQ queues and write them into RavenDB documents using queue sink tasks with JavaScript processing scripts." sidebar_position: 2 --- import Admonition from '@theme/Admonition'; -import Tabs from '@theme/Tabs'; -import TabItem from '@theme/TabItem'; import CodeBlock from '@theme/CodeBlock'; -import LanguageSwitcher from "@site/src/components/LanguageSwitcher"; -import LanguageContent from "@site/src/components/LanguageContent"; +import Panel from "@site/src/components/Panel"; +import ContentFrame from "@site/src/components/ContentFrame"; # Queue Sink: RabbitMQ + -* **RabbitMQ** brokers are designed to disperse data to multiple queues, - making for a flexible data channeling system that can easily handle complex - message streaming scenarios. +* **RabbitMQ** brokers are designed to disperse data to multiple queues, + making for a flexible data channeling system that can handle complex + message streaming scenarios. + +* RavenDB can harness the advantages presented by RabbitMQ brokers both as + a producer (by [running ETL tasks](../../../server/ongoing-tasks/etl/queue-etl/rabbit-mq.mdx)) + and as a **consumer** (using a sink task to consume enqueued messages). + +* To use RavenDB as a consumer, define an ongoing Sink task that will read + batches of JSON formatted messages from RabbitMQ queues, construct documents + using user-defined scripts, and store the documents in RavenDB collections. + +* In this page: + * [The RabbitMQ Sink Task](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#the-rabbitmq-sink-task) + * [Connecting a RabbitMQ broker](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#connecting-a-rabbitmq-broker) + * [Retrieving messages from RabbitMQ queues](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#retrieving-messages-from-rabbitmq-queues) + * [Running user-defined scripts](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#running-user-defined-scripts) + * [Storing documents in RavenDB collections](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#storing-documents-in-ravendb-collections) + * [Client API](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#client-api) + * [Add a RabbitMQ Connection String](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#add-a-rabbitmq-connection-string) + * [Add a RabbitMQ Sink Task](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#add-a-rabbitmq-sink-task) + * [Configuration Options](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#configuration-options) -* RavenDB can harness the advantages presented by RabbitMQ brokers both as - a producer (by [running ETL tasks](../../../server/ongoing-tasks/etl/queue-etl/rabbit-mq.mdx)) - and as a **consumer** (using a sink task to consume enqueued messages). + -* To use RavenDB as a consumer, define an ongoing Sink task that will read - batches of JSON formatted messages from RabbitMQ queues, construct documents - using user-defined scripts, and store the documents in RavenDB collections. + -* In this page: - * [The RabbitMQ Sink Task](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#the-rabbitmq-sink-task) - * [Client API](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#client-api) - * [Add a RabbitMQ Connection String](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#add-a-rabbitmq-connection-string) - * [Add a RabbitMQ Sink Task](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#add-a-rabbitmq-sink-task) - * [Configuration Options](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#configuration-options) - -## The RabbitMQ Sink Task + -Users of RavenDB 6.0 and on can create an ongoing Sink task that connects -a RabbitMQ broker, retrieves messages from selected queues, runs a user-defined -script to manipulate data and construct documents, and potentially stores the -created documents in RavenDB collections. -#### Connecting a RabbitMQ broker +## Connecting a RabbitMQ broker + +Users of RavenDB 6.0 and on can create an ongoing Sink task that connects +a RabbitMQ broker, retrieves messages from selected queues, runs a user-defined +script to manipulate data and construct documents, and potentially stores the +created documents in RavenDB collections. In the message broker architecture, RavenDB sinks take the role of data consumers. -A sink would connect a RabbitMQ broker using a connection string, and retrieve messages -from the broker's queues. +A sink would connect a RabbitMQ broker using a connection string, and retrieve messages +from the broker's queues. -Read [below](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#add-a-rabbitmq-connection-string) +Read [below](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#add-a-rabbitmq-connection-string) about adding a connection string via API. -Read [here](../../../studio/database/tasks/ongoing-tasks/rabbitmq-queue-sink.mdx#define-a-rabbitmq-sink-task) -about adding a connection string using Studio. -#### Retrieving messages from RabbitMQ queues - -When a message is sent to a RabbitMQ broker by a producer, it is pushed to -the tail of a queue. As preceding messages are pulled, the message advances -up the queue until it reaches its head and can be consumed by RavenDB's sink. -#### Running user-defined scripts - -A sink task's script is a JavaScript segment. Its basic role is to retrieve -selected RabbitMQ messages or message properties, and construct documents that -will then be stored in RavenDB. - -The script can simply store the whole message as a document, as in this -segment: - - -{`// Add the document a metadata \`@collection\` property to keep it in -// this collection, or do not set it to store the document in @empty). -this['@metadata']['@collection'] = 'Orders'; -// Store the message as is, using its Id property as its RavenDB Id as well. +Read [here](../../../studio/database/tasks/ongoing-tasks/rabbitmq-queue-sink.mdx#define-a-rabbitmq-sink-task) +about adding a connection string using Studio. + + + +--- + + + +## Retrieving messages from RabbitMQ queues + + +While a queue sink task is running, it prevents the host database from being unloaded due to idle operations. +This ensures that message consumption from RabbitMQ queues continues uninterrupted, even if the database would otherwise go idle. + + +When a message is sent to a RabbitMQ broker by a producer, it is pushed to +the tail of a queue. As preceding messages are pulled, the message advances +up the queue until it reaches its head and can be consumed by RavenDB's sink. + + + +--- + + + +## Running user-defined scripts + +A sink task's script is a JavaScript segment. Its basic role is to retrieve +selected RabbitMQ messages or message properties, and construct documents that +will then be stored in RavenDB. + +The script can store the whole message as a document, as in this segment: + +```javascript +// Add the document a metadata `@collection` property to keep it in +// this collection, or do not set it to store the document in @empty). +this['@metadata']['@collection'] = 'Orders'; +// Store the message as is, using its Id property as its RavenDB Id as well. put(this.Id.toString(), this) -`} - - - -But the script can also retrieve some information from the read message -and construct a new document that doesn't resemble the original message. -Scripts often apply two sections: a section that creates a JSON object -that defines the document's structure and contents, and a second section -that stores the document. - -E.g., for RabbitMQ messages of this format - - - -{`\{ +``` + +
+ +The script can also retrieve selected information from the read message and +construct a new document that does not resemble the original message. +Scripts often apply two sections: a section that creates a JSON object +that defines the document's structure and contents, and a second section +that stores the document. + +E.g., for RabbitMQ messages of this format: + +```json +{ "Id" : 13, "FirstName" : "John", "LastName" : "Doe" -\} -`} -
-
- -We can create this script - - - -{`var item = \{ - Id : this.Id, - FirstName : this.FirstName, - LastName : this.LastName, - FullName : this.FirstName + ' ' + this.LastName, - "@metadata" : \{ +} +``` + +
+ +We can create this script: + +```javascript +var item = { + Id : this.Id, + FirstName : this.FirstName, + LastName : this.LastName, + FullName : this.FirstName + ' ' + this.LastName, + "@metadata" : { "@collection" : "Users" - \} -\}; + } +}; // Use .toString() to pass the Id as a string even if RabbitMQ provides it as a number put(this.Id.toString(), item) -`} -
-
- -The script can also apply various other JavaScript commands, including -`load` to load a RavenDB document (e.g. to construct a document that -includes data from the retrieved message and complementing data from -existing RavenDB documents), `del` to remove existing RavenDB documents, -and [many others](../../../server/kb/javascript-engine.mdx#predefined-javascript-functions). -#### Storing documents in RavenDB collections - -The sink task consumes batches of queued messages and stores them in RavenDB -in a transactional manner, processing either the entire batch or none of it. +``` + +
+ +The script can also apply various other JavaScript commands, including +`load` to load a RavenDB document (e.g. to construct a document that +includes data from the retrieved message and complementing data from +existing RavenDB documents), `del` to remove existing RavenDB documents, +and [many others](../../../server/kb/javascript-engine.mdx#predefined-javascript-functions). + +
+ +--- + + + +## Storing documents in RavenDB collections + +The sink task consumes batches of queued messages and stores them in RavenDB +in a transactional manner, processing either the entire batch or none of it. + -Some script processing errors are allowed; when such an error occurs RavenDB -will skip the affected message, record the event in the logs, and alert the -user in Studio, but **continue processing the batch**. +Some script processing errors are allowed; when such an error occurs RavenDB +will skip the affected message, record the event in the logs, and alert the +user in Studio, but **continue processing the batch**. -Once a batch is consumed, the task confirms it by sending `_channel.BasicAck`. +Once a batch is consumed, the task confirms it by sending `_channel.BasicAck`. -Note that the number of documents included in a batch is +Note that the number of documents included in a batch is [configurable](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#configuration-options). -Producers may enqueue -[multiple instances](../../../server/ongoing-tasks/etl/queue-etl/rabbit-mq.mdx#message-duplication) +Producers may enqueue +[multiple instances](../../../server/ongoing-tasks/etl/queue-etl/rabbit-mq.mdx#message-duplication) of the same document. -if processing each message only once is important to the consumer, -it is **the consumer's responsibility** to verify the uniqueness of -each consumed message. - -Note that as long as the **Id** property of RabbitMQ messages is preserved -(so duplicate messages share an Id), the script's `put(ID, { ... })` command -will overwrite a previous document with the same Id and only one copy of -it will remain. +If processing each message only once is important to the consumer, +it is **the consumer's responsibility** to verify the uniqueness of +each consumed message. + +Note that as long as the **Id** property of RabbitMQ messages is preserved +(so duplicate messages share an Id), the script's `put(ID, { ... })` command +will overwrite a previous document with the same Id and only one copy of +it will remain. + + +
+ -## Client API + -#### Add a RabbitMQ Connection String +## Add a RabbitMQ Connection String -Prior to defining a RabbitMQ sink task, add a **RabbitMQ connection string** -that the task will use to connect the message brokers. +Prior to defining a RabbitMQ sink task, add a **RabbitMQ connection string** +that the task will use to connect the message brokers. -To create the connection string: +To create the connection string: -* Create a `QueueConnectionString`instance with the connection string configuration. - Pass it to the `PutConnectionStringOperation` store operation to add the connection string. +* Create a `QueueConnectionString` instance with the connection string configuration. + Pass it to the `PutConnectionStringOperation` store operation to add the connection string. - `QueueConnectionString`: - - -{`// Add RabbitMQ connection string +`QueueConnectionString`: + +```csharp +// Add RabbitMQ connection string var res = store.Maintenance.Send( new PutConnectionStringOperation( new QueueConnectionString - \{ + { Name = "RabbitMqConStr", BrokerType = QueueBrokerType.RabbitMq, RabbitMqConnectionSettings = new RabbitMqConnectionSettings() - \{ ConnectionString = "amqp://guest:guest@localhost:5672/" \} - \})); -`} - - - - `QueueBrokerType`: - - -{`public enum QueueBrokerType -\{ + { ConnectionString = "amqp://guest:guest@localhost:5672/" } + })); +``` + +
+ +`QueueBrokerType`: + +```csharp +public enum QueueBrokerType +{ None, Kafka, RabbitMq -\} -`} -
-
- - | Property | Type | Description | - |:-------------|:-------------|:-------------| - | **Name** | `string` | Connection string name | - | **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.RabbitMq` for a RabbitMQ connection string | - | **RabbitMqConnectionSettings** | `RabbitMqConnectionSettings[]` | A list of strings indicating RabbitMQ brokers connection details | -#### Add a RabbitMQ Sink Task - -To create the Sink task: - -* Create `QueueSinkScript` instances to define scripts with which the - task can process retrieved messages, apply JavaScript commands, construct - documents and store them in RavenDB. - - -{`// Define a Sink script +} +``` + +
+ +| Property | Type | Description | +|---|---|---| +| **Name** | `string` | Connection string name | +| **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.RabbitMq` for a RabbitMQ connection string | +| **RabbitMqConnectionSettings** | `RabbitMqConnectionSettings[]` | A list of strings indicating RabbitMQ brokers connection details | + +
+ +--- + + + +## Add a RabbitMQ Sink Task + +To create the Sink task: + +* Create `QueueSinkScript` instances to define scripts with which the + task can process retrieved messages, apply JavaScript commands, construct + documents and store them in RavenDB. + +```csharp +// Define a Sink script QueueSinkScript queueSinkScript = new QueueSinkScript -\{ +{ // Script name Name = "orders", // A list of RabbitMQ queues to connect - Queues = new List() \{ "orders" \}, + Queues = new List() { "orders" }, // Apply this script - Script = @"this['@metadata']['@collection'] = 'Orders'; + Script = @"this['@metadata']['@collection'] = 'Orders'; put(this.Id.toString(), this)" -\}; -`} - - - -* Prepare a `QueueSinkConfiguration`object with the sink task configuration. - - `QueueSinkConfiguration` properties: - - | Property | Type | Description | - |:-------------|:-------------|:-------------| - | **Name** | `string` | The sink task name | - | **ConnectionStringName** | `string` | The registered connection string name | - | **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.RabbitMq` to define a RabbitMQ sink task | - | **Scripts** | `List` | A list of scripts | - -* Pass this object to the `AddQueueSinkOperation` store operation to add the Sink task. - - `QueueSinkScript` properties: - - | Property | Type | Description | - |:-------------|:-------------|:-------------| - | **Name** | `string` | Script name| - | **Queues** | `List` | A list of RabbitMQ queues to consume messages from | - | **Script** | `string ` | The script contents | - -**Code Sample**: - - -{`// Add Kafka connection string +}; +``` + +
+ +* Prepare a `QueueSinkConfiguration` object with the sink task configuration. + +`QueueSinkConfiguration` properties: + +| Property | Type | Description | +|---|---|---| +| **Name** | `string` | The sink task name | +| **ConnectionStringName** | `string` | The registered connection string name | +| **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.RabbitMq` to define a RabbitMQ sink task | +| **Scripts** | `List` | A list of scripts | + +* Pass this object to the `AddQueueSinkOperation` store operation to add the Sink task. + +`QueueSinkScript` properties: + +| Property | Type | Description | +|---|---|---| +| **Name** | `string` | Script name | +| **Queues** | `List` | A list of RabbitMQ queues to consume messages from | +| **Script** | `string` | The script contents | + +#### Example: + +```csharp +// Add RabbitMQ connection string var res = store.Maintenance.Send( new PutConnectionStringOperation( new QueueConnectionString - \{ - Name = "KafkaConStr", - BrokerType = QueueBrokerType.Kafka, - KafkaConnectionSettings = new KafkaConnectionSettings() - \{ BootstrapServers = "localhost:9092" \} - \})); + { + Name = "RabbitMqConStr", + BrokerType = QueueBrokerType.RabbitMq, + RabbitMqConnectionSettings = new RabbitMqConnectionSettings() + { ConnectionString = "amqp://guest:guest@localhost:5672/" } + })); // Define a Sink script QueueSinkScript queueSinkScript = new QueueSinkScript -\{ +{ // Script name Name = "orders", - // A list of Kafka topics to connect - Queues = new List() \{ "orders" \}, + // A list of RabbitMQ queues to connect + Queues = new List() { "orders" }, // Apply this script - Script = @"this['@metadata']['@collection'] = 'Orders'; + Script = @"this['@metadata']['@collection'] = 'Orders'; put(this.Id.toString(), this)" -\}; +}; -// Define a Kafka configuration +// Define a RabbitMQ sink task configuration var config = new QueueSinkConfiguration() -\{ +{ // Sink name - Name = "KafkaSinkTaskName", + Name = "RabbitMqSinkTaskName", // The connection string to connect the broker with - ConnectionStringName = "KafkaConStr", + ConnectionStringName = "RabbitMqConStr", // What queue broker is this task using - BrokerType = QueueBrokerType.Kafka, + BrokerType = QueueBrokerType.RabbitMq, // The list of scripts to run - Scripts = \{ queueSinkScript \} -\}; + Scripts = { queueSinkScript } +}; -AddQueueSinkOperationResult addQueueSinkOperationResult = +AddQueueSinkOperationResult addQueueSinkOperationResult = store.Maintenance.Send(new AddQueueSinkOperation(config)); -`} -
-
+``` +
+
-## Configuration Options + -Use these configuration options to gain more control over queue sink tasks. +Use these configuration options to gain more control over queue sink tasks. * [QueueSink.MaxBatchSize](../../../server/configuration/queue-sink-configuration.mdx#queuesinkmaxbatchsize) The maximum number of pulled messages consumed in a single batch. * [QueueSink.MaxFallbackTimeInSec](../../../server/configuration/queue-sink-configuration.mdx#queuesinkmaxfallbacktimeinsec) - The maximum number of seconds the Queue Sink process will be in a fallback - mode (i.e. suspending the process) after a connection failure. - - - + The maximum number of seconds the Queue Sink process will be in a fallback + mode (i.e. suspending the process) after a connection failure. + diff --git a/versioned_docs/version-7.1/server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx b/versioned_docs/version-7.1/server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx index 00204c4ccd..11b6df492b 100644 --- a/versioned_docs/version-7.1/server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx +++ b/versioned_docs/version-7.1/server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx @@ -1,285 +1,331 @@ --- title: "Queue Sink: Apache Kafka" sidebar_label: Kafka Queue Sink +description: "Consume messages from Kafka topics and write them into RavenDB documents using queue sink tasks with JavaScript processing scripts." sidebar_position: 1 --- import Admonition from '@theme/Admonition'; -import Tabs from '@theme/Tabs'; -import TabItem from '@theme/TabItem'; import CodeBlock from '@theme/CodeBlock'; -import LanguageSwitcher from "@site/src/components/LanguageSwitcher"; -import LanguageContent from "@site/src/components/LanguageContent"; +import Panel from "@site/src/components/Panel"; +import ContentFrame from "@site/src/components/ContentFrame"; # Queue Sink: Apache Kafka + -* **Apache Kafka** is a distributed, high-performance, transactional messaging - platform, that remains performant as the number of messages it needs to process - increases and the number of events it needs to stream climbs to the big-data zone. - -* RavenDB can harness the advantages presented by message brokers like Kafka - both as a producer (by [running ETL tasks](../../../server/ongoing-tasks/etl/queue-etl/kafka.mdx)) - and as a **consumer** (using a sink task to consume enqueued messages). - -* To use RavenDB as a consumer, define an ongoing Sink task that will read batches - of enqueued JSON formatted messages from Kafka topics, construct documents using - user-defined scripts, and store the documents in RavenDB collections. - -* In this page: - * [The Queue Sink Task](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#the-queue-sink-task) - * [Client API](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#client-api) - * [Add a Kafka Connection String](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#add-a-kafka-connection-string) - * [Add a Kafka Sink Task](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#add-a-kafka-sink-task) - * [Configuration Options](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#configuration-options) +* **Apache Kafka** is a distributed, high-performance, transactional messaging + platform, that remains performant as the number of messages it needs to process + increases and the number of events it needs to stream climbs to the big-data zone. + +* RavenDB can harness the advantages presented by message brokers like Kafka + both as a producer (by [running ETL tasks](../../../server/ongoing-tasks/etl/queue-etl/kafka.mdx)) + and as a **consumer** (using a sink task to consume enqueued messages). + +* To use RavenDB as a consumer, define an ongoing Sink task that will read batches + of enqueued JSON formatted messages from Kafka topics, construct documents using + user-defined scripts, and store the documents in RavenDB collections. + +* In this page: + * [The Queue Sink Task](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#the-queue-sink-task) + * [Connecting a Kafka broker](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#connecting-a-kafka-broker) + * [Retrieving enqueued messages from selected Kafka topics](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#retrieving-enqueued-messages-from-selected-kafka-topics) + * [Running user-defined scripts](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#running-user-defined-scripts) + * [Storing documents in RavenDB collections](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#storing-documents-in-ravendb-collections) + * [Client API](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#client-api) + * [Add a Kafka Connection String](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#add-a-kafka-connection-string) + * [Add a Kafka Sink Task](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#add-a-kafka-sink-task) + * [Configuration Options](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#configuration-options) + -## The Queue Sink Task -Users of RavenDB 6.0 and on can create an ongoing Sink task that connects -a Kafka broker, retrieves enqueued messages from selected Kafka topics, -runs a user-defined script to manipulate data and construct documents, and -potentially stores the created documents in RavenDB collections. -#### Connecting a Kafka broker + + + + +## Connecting a Kafka broker + +Users of RavenDB 6.0 and on can create an ongoing Sink task that connects +a Kafka broker, retrieves enqueued messages from selected Kafka topics, +runs a user-defined script to manipulate data and construct documents, and +potentially stores the created documents in RavenDB collections. In the message broker architecture, RavenDB sinks take the role of data consumers. -A sink would connect a Kafka broker using a connection string, and retrieve messages -from the broker's Topics. +A sink would connect a Kafka broker using a connection string, and retrieve messages +from the broker's Topics. -Read [below](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#add-a-kafka-connection-string) +Read [below](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#add-a-kafka-connection-string) about adding a connection string via API. -Read [here](../../../studio/database/tasks/ongoing-tasks/kafka-queue-sink.mdx#define-a-kafka-sink-task) -about adding a connection string using Studio. +Read [here](../../../studio/database/tasks/ongoing-tasks/kafka-queue-sink.mdx#define-a-kafka-sink-task) +about adding a connection string using Studio. -Like all ongoing tasks, a sink task is operated by a +Like all ongoing tasks, a sink task is operated by a [responsible node](../../../server/clustering/distribution/highly-available-tasks.mdx#responsible-node). -When the responsibility for the task is moved from one node to another, -e.g. from node `A` to node `B` as a result of node `A` down time: +When the responsibility for the task is moved from one node to another, +e.g. from node `A` to node `B` as a result of node `A` down time: -* The consumer task will maintain the same consumer group id it had on the original node. -* Kafka brokers may **cease serving the sink task for some time** as the Kafka consumer - group rebalances (adapting to the leaving of one node and the joining of another, among - other changes). +* The consumer task will maintain the same consumer group id it had on the original node. +* Kafka brokers may **cease serving the sink task for some time** as the Kafka consumer + group rebalances (adapting to the leaving of one node and the joining of another, among + other changes). -#### Retrieving enqueued messages from selected Kafka topics - -When a message is sent to a Kafka broker by a producer, it is pushed to -the tail of a topic. As preceding messages are pulled, the message advances -up the queue until it reaches its head and can be consumed by RavenDB's sink. -#### Running user-defined scripts - -A sink task's script is a JavaScript segment. Its basic role is to retrieve -selected Kafka messages or message properties, and construct documents that -will then be stored in RavenDB. - -The script can simply store the whole message as a document, as in this -segment: - - -{`// Add the document a metadata \`@collection\` property to keep it in -// this collection, or do not set it to store the document in @empty). -this['@metadata']['@collection'] = 'Orders'; -// Store the message as is, using its Id property as its RavenDB Id as well. + + + +--- + + + +## Retrieving enqueued messages from selected Kafka topics + + +While a queue sink task is running, it prevents the host database from being unloaded due to idle operations. +This ensures that message consumption from Kafka topics continues uninterrupted, even if the database would otherwise go idle. + + +When a message is sent to a Kafka broker by a producer, it is pushed to +the tail of a topic. As preceding messages are pulled, the message advances +up the queue until it reaches its head and can be consumed by RavenDB's sink. + + + +--- + + + +## Running user-defined scripts + +A sink task's script is a JavaScript segment. Its basic role is to retrieve +selected Kafka messages or message properties, and construct documents that +will then be stored in RavenDB. + +The script can store the whole message as a document, as in this segment: + +```javascript +// Add the document a metadata `@collection` property to keep it in +// this collection, or do not set it to store the document in @empty). +this['@metadata']['@collection'] = 'Orders'; +// Store the message as is, using its Id property as its RavenDB Id as well. put(this.Id.toString(), this) -`} - - - -But the script can also retrieve some information from the read message -and construct a new document that doesn't resemble the original message. -Scripts often apply two sections: a section that creates a JSON object -that defines the document's structure and contents, and a second section -that stores the document. - -E.g., for Kafka messages of this format - - - -{`\{ +``` + +
+ +The script can also retrieve selected information from the read message and +construct a new document that does not resemble the original message. +Scripts often apply two sections: a section that creates a JSON object +that defines the document's structure and contents, and a second section +that stores the document. + +E.g., for Kafka messages of this format: + +```json +{ "Id" : 13, "FirstName" : "John", "LastName" : "Doe" -\} -`} -
-
- -We can create this script - - - -{`var item = \{ - Id : this.Id, - FirstName : this.FirstName, - LastName : this.LastName, - FullName : this.FirstName + ' ' + this.LastName, - "@metadata" : \{ +} +``` + +
+ +We can create this script: + +```javascript +var item = { + Id : this.Id, + FirstName : this.FirstName, + LastName : this.LastName, + FullName : this.FirstName + ' ' + this.LastName, + "@metadata" : { "@collection" : "Users" - \} -\}; + } +}; // Use .toString() to pass the Id as a string even if Kafka provides it as a number put(this.Id.toString(), item) -`} -
-
- -The script can also apply various other JavaScript commands, including -`load` to load a RavenDB document (e.g. to construct a document that -includes data from the retrieved message and complementing data from -existing RavenDB documents), `del` to remove existing RavenDB documents, -and [many others](../../../server/kb/javascript-engine.mdx#predefined-javascript-functions). -#### Storing documents in RavenDB collections - -The sink task consumes batches of queued messages and stores them in RavenDB -in a transactional manner, processing either the entire batch or none of it. +``` + +
+ +The script can also apply various other JavaScript commands, including +`load` to load a RavenDB document (e.g. to construct a document that +includes data from the retrieved message and complementing data from +existing RavenDB documents), `del` to remove existing RavenDB documents, +and [many others](../../../server/kb/javascript-engine.mdx#predefined-javascript-functions). + +
+ +--- + + + +## Storing documents in RavenDB collections + +The sink task consumes batches of queued messages and stores them in RavenDB +in a transactional manner, processing either the entire batch or none of it. + -Some script processing errors are allowed; when such an error occurs RavenDB -will skip the affected message, record the event in the logs, and alert the -user in Studio, but **continue processing the batch**. +Some script processing errors are allowed; when such an error occurs RavenDB +will skip the affected message, record the event in the logs, and alert the +user in Studio, but **continue processing the batch**. -Once a batch is consumed, the task confirms it by calling `kafkaConsumer.Commit()`. +Once a batch is consumed, the task confirms it by calling `kafkaConsumer.Commit()`. -Note that the number of documents included in a batch is +Note that the number of documents included in a batch is [configurable](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#configuration-options). -Producers may enqueue -[multiple instances](../../../server/ongoing-tasks/etl/queue-etl/kafka.mdx#idempotence-and-message-duplication) +Producers may enqueue +[multiple instances](../../../server/ongoing-tasks/etl/queue-etl/kafka.mdx#idempotence-and-message-duplication) of the same document. -if processing each message only once is important to the consumer, -it is **the consumer's responsibility** to verify the uniqueness of -each consumed message. - -Note that as long as the **Id** property of Kafka messages is preserved -(so duplicate messages share an Id), the script's `put(ID, { ... })` -command will overwrite a previous document with the same Id and only -one copy of it will remain. +If processing each message only once is important to the consumer, +it is **the consumer's responsibility** to verify the uniqueness of +each consumed message. + +Note that as long as the **Id** property of Kafka messages is preserved +(so duplicate messages share an Id), the script's `put(ID, { ... })` +command will overwrite a previous document with the same Id and only +one copy of it will remain. + +
-## Client API + -#### Add a Kafka Connection String + -Prior to defining a Kafka sink task, add a **Kafka connection string** -that the task will use to connect the message broker's bootstrap servers. +## Add a Kafka Connection String -To create the connection string: +Prior to defining a Kafka sink task, add a **Kafka connection string** +that the task will use to connect the message broker's bootstrap servers. -* Create a `QueueConnectionString`instance with the connection string configuration. - Pass it to the `PutConnectionStringOperation` store operation to add the connection string. +To create the connection string: - `QueueConnectionString`: - - -{`// Add Kafka connection string +* Create a `QueueConnectionString` instance with the connection string configuration. + Pass it to the `PutConnectionStringOperation` store operation to add the connection string. + +`QueueConnectionString`: + +```csharp +// Add Kafka connection string var res = store.Maintenance.Send( new PutConnectionStringOperation( new QueueConnectionString - \{ + { Name = "KafkaConStr", BrokerType = QueueBrokerType.Kafka, KafkaConnectionSettings = new KafkaConnectionSettings() - \{ BootstrapServers = "localhost:9092" \} - \})); -`} - - - - `QueueBrokerType`: - - -{`public enum QueueBrokerType -\{ + { BootstrapServers = "localhost:9092" } + })); +``` + +
+ +`QueueBrokerType`: + +```csharp +public enum QueueBrokerType +{ None, Kafka, RabbitMq -\} -`} -
-
- - | Property | Type | Description | - |:-------------|:-------------|:-------------| - | **Name** | `string` | Connection string name | - | **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.Kafka` for a Kafka connection string | - | **KafkaConnectionSettings** | `KafkaConnectionSettings[]` | A list of comma-separated host:port URLs to Kafka brokers | -#### Add a Kafka Sink Task - -To create the Sink task: - -* Create `QueueSinkScript` instances to define scripts with which the - task can process retrieved messages, apply JavaScript commands, construct - documents and store them in RavenDB. - - -{`// Define a Sink script +} +``` + +
+ +| Property | Type | Description | +|---|---|---| +| **Name** | `string` | Connection string name | +| **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.Kafka` for a Kafka connection string | +| **KafkaConnectionSettings** | `KafkaConnectionSettings[]` | A list of comma-separated host:port URLs to Kafka brokers | + +
+ +--- + + + +## Add a Kafka Sink Task + +To create the Sink task: + +* Create `QueueSinkScript` instances to define scripts with which the + task can process retrieved messages, apply JavaScript commands, construct + documents and store them in RavenDB. + +```csharp +// Define a Sink script QueueSinkScript queueSinkScript = new QueueSinkScript -\{ +{ // Script name Name = "orders", // A list of Kafka topics to connect - Queues = new List() \{ "orders" \}, + Queues = new List() { "orders" }, // Apply this script - Script = @"this['@metadata']['@collection'] = 'Orders'; + Script = @"this['@metadata']['@collection'] = 'Orders'; put(this.Id.toString(), this)" -\}; -`} - - +}; +``` + +
-* Prepare a `QueueSinkConfiguration`object with the sink task configuration. +* Prepare a `QueueSinkConfiguration` object with the sink task configuration. - `QueueSinkConfiguration` properties: +`QueueSinkConfiguration` properties: - | Property | Type | Description | - |:-------------|:-------------|:-------------| - | **Name** | `string` | The sink task name | - | **ConnectionStringName** | `string` | The registered connection string name | - | **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.Kafka` to define a Kafka sink task | - | **Scripts** | `List` | A list of scripts | +| Property | Type | Description | +|---|---|---| +| **Name** | `string` | The sink task name | +| **ConnectionStringName** | `string` | The registered connection string name | +| **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.Kafka` to define a Kafka sink task | +| **Scripts** | `List` | A list of scripts | -* Pass this object to the `AddQueueSinkOperation` store operation to add the Sink task. +* Pass this object to the `AddQueueSinkOperation` store operation to add the Sink task. - `QueueSinkScript` properties: +`QueueSinkScript` properties: - | Property | Type | Description | - |:-------------|:-------------|:-------------| - | **Name** | `string` | Script name | - | **Queues** | `List` | A list of Kafka topics to consume messages from | - | **Script** | `string ` | The script contents | +| Property | Type | Description | +|---|---|---| +| **Name** | `string` | Script name | +| **Queues** | `List` | A list of Kafka topics to consume messages from | +| **Script** | `string` | The script contents | -**Code Sample**: - - -{`// Add Kafka connection string +#### Example: + +```csharp +// Add Kafka connection string var res = store.Maintenance.Send( new PutConnectionStringOperation( new QueueConnectionString - \{ + { Name = "KafkaConStr", BrokerType = QueueBrokerType.Kafka, - KafkaConnectionSettings = new KafkaConnectionSettings() - \{ BootstrapServers = "localhost:9092" \} - \})); + KafkaConnectionSettings = new KafkaConnectionSettings() + { BootstrapServers = "localhost:9092" } + })); // Define a Sink script QueueSinkScript queueSinkScript = new QueueSinkScript -\{ +{ // Script name Name = "orders", // A list of Kafka topics to connect - Queues = new List() \{ "orders" \}, + Queues = new List() { "orders" }, // Apply this script - Script = @"this['@metadata']['@collection'] = 'Orders'; + Script = @"this['@metadata']['@collection'] = 'Orders'; put(this.Id.toString(), this)" -\}; +}; -// Define a Kafka configuration +// Define a Kafka sink task configuration var config = new QueueSinkConfiguration() -\{ +{ // Sink name Name = "KafkaSinkTaskName", // The connection string to connect the broker with @@ -287,27 +333,25 @@ var config = new QueueSinkConfiguration() // What queue broker is this task using BrokerType = QueueBrokerType.Kafka, // The list of scripts to run - Scripts = \{ queueSinkScript \} -\}; + Scripts = { queueSinkScript } +}; -AddQueueSinkOperationResult addQueueSinkOperationResult = +AddQueueSinkOperationResult addQueueSinkOperationResult = store.Maintenance.Send(new AddQueueSinkOperation(config)); -`} - - +``` +
+
-## Configuration Options + -Use these configuration options to gain more control over queue sink tasks. +Use these configuration options to gain more control over queue sink tasks. * [QueueSink.MaxBatchSize](../../../server/configuration/queue-sink-configuration.mdx#queuesinkmaxbatchsize) The maximum number of pulled messages consumed in a single batch. * [QueueSink.MaxFallbackTimeInSec](../../../server/configuration/queue-sink-configuration.mdx#queuesinkmaxfallbacktimeinsec) - The maximum number of seconds the Queue Sink process will be in a fallback - mode (i.e. suspending the process) after a connection failure. - - - + The maximum number of seconds the Queue Sink process will be in a fallback + mode (i.e. suspending the process) after a connection failure. + diff --git a/versioned_docs/version-7.1/server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx b/versioned_docs/version-7.1/server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx index 919752419b..ac6e88140f 100644 --- a/versioned_docs/version-7.1/server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx +++ b/versioned_docs/version-7.1/server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx @@ -1,300 +1,344 @@ --- title: "Queue Sink: RabbitMQ" sidebar_label: RabbitMQ Queue Sink +description: "Consume messages from RabbitMQ queues and write them into RavenDB documents using queue sink tasks with JavaScript processing scripts." sidebar_position: 2 --- import Admonition from '@theme/Admonition'; -import Tabs from '@theme/Tabs'; -import TabItem from '@theme/TabItem'; import CodeBlock from '@theme/CodeBlock'; -import LanguageSwitcher from "@site/src/components/LanguageSwitcher"; -import LanguageContent from "@site/src/components/LanguageContent"; +import Panel from "@site/src/components/Panel"; +import ContentFrame from "@site/src/components/ContentFrame"; # Queue Sink: RabbitMQ + -* **RabbitMQ** brokers are designed to disperse data to multiple queues, - making for a flexible data channeling system that can easily handle complex - message streaming scenarios. +* **RabbitMQ** brokers are designed to disperse data to multiple queues, + making for a flexible data channeling system that can handle complex + message streaming scenarios. + +* RavenDB can harness the advantages presented by RabbitMQ brokers both as + a producer (by [running ETL tasks](../../../server/ongoing-tasks/etl/queue-etl/rabbit-mq.mdx)) + and as a **consumer** (using a sink task to consume enqueued messages). + +* To use RavenDB as a consumer, define an ongoing Sink task that will read + batches of JSON formatted messages from RabbitMQ queues, construct documents + using user-defined scripts, and store the documents in RavenDB collections. + +* In this page: + * [The RabbitMQ Sink Task](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#the-rabbitmq-sink-task) + * [Connecting a RabbitMQ broker](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#connecting-a-rabbitmq-broker) + * [Retrieving messages from RabbitMQ queues](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#retrieving-messages-from-rabbitmq-queues) + * [Running user-defined scripts](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#running-user-defined-scripts) + * [Storing documents in RavenDB collections](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#storing-documents-in-ravendb-collections) + * [Client API](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#client-api) + * [Add a RabbitMQ Connection String](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#add-a-rabbitmq-connection-string) + * [Add a RabbitMQ Sink Task](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#add-a-rabbitmq-sink-task) + * [Configuration Options](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#configuration-options) -* RavenDB can harness the advantages presented by RabbitMQ brokers both as - a producer (by [running ETL tasks](../../../server/ongoing-tasks/etl/queue-etl/rabbit-mq.mdx)) - and as a **consumer** (using a sink task to consume enqueued messages). + -* To use RavenDB as a consumer, define an ongoing Sink task that will read - batches of JSON formatted messages from RabbitMQ queues, construct documents - using user-defined scripts, and store the documents in RavenDB collections. + -* In this page: - * [The RabbitMQ Sink Task](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#the-rabbitmq-sink-task) - * [Client API](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#client-api) - * [Add a RabbitMQ Connection String](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#add-a-rabbitmq-connection-string) - * [Add a RabbitMQ Sink Task](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#add-a-rabbitmq-sink-task) - * [Configuration Options](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#configuration-options) - -## The RabbitMQ Sink Task + -Users of RavenDB 6.0 and on can create an ongoing Sink task that connects -a RabbitMQ broker, retrieves messages from selected queues, runs a user-defined -script to manipulate data and construct documents, and potentially stores the -created documents in RavenDB collections. -#### Connecting a RabbitMQ broker +## Connecting a RabbitMQ broker + +Users of RavenDB 6.0 and on can create an ongoing Sink task that connects +a RabbitMQ broker, retrieves messages from selected queues, runs a user-defined +script to manipulate data and construct documents, and potentially stores the +created documents in RavenDB collections. In the message broker architecture, RavenDB sinks take the role of data consumers. -A sink would connect a RabbitMQ broker using a connection string, and retrieve messages -from the broker's queues. +A sink would connect a RabbitMQ broker using a connection string, and retrieve messages +from the broker's queues. -Read [below](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#add-a-rabbitmq-connection-string) +Read [below](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#add-a-rabbitmq-connection-string) about adding a connection string via API. -Read [here](../../../studio/database/tasks/ongoing-tasks/rabbitmq-queue-sink.mdx#define-a-rabbitmq-sink-task) -about adding a connection string using Studio. -#### Retrieving messages from RabbitMQ queues - -When a message is sent to a RabbitMQ broker by a producer, it is pushed to -the tail of a queue. As preceding messages are pulled, the message advances -up the queue until it reaches its head and can be consumed by RavenDB's sink. -#### Running user-defined scripts - -A sink task's script is a JavaScript segment. Its basic role is to retrieve -selected RabbitMQ messages or message properties, and construct documents that -will then be stored in RavenDB. - -The script can simply store the whole message as a document, as in this -segment: - - -{`// Add the document a metadata \`@collection\` property to keep it in -// this collection, or do not set it to store the document in @empty). -this['@metadata']['@collection'] = 'Orders'; -// Store the message as is, using its Id property as its RavenDB Id as well. +Read [here](../../../studio/database/tasks/ongoing-tasks/rabbitmq-queue-sink.mdx#define-a-rabbitmq-sink-task) +about adding a connection string using Studio. + + + +--- + + + +## Retrieving messages from RabbitMQ queues + + +While a queue sink task is running, it prevents the host database from being unloaded due to idle operations. +This ensures that message consumption from RabbitMQ queues continues uninterrupted, even if the database would otherwise go idle. + + +When a message is sent to a RabbitMQ broker by a producer, it is pushed to +the tail of a queue. As preceding messages are pulled, the message advances +up the queue until it reaches its head and can be consumed by RavenDB's sink. + + + +--- + + + +## Running user-defined scripts + +A sink task's script is a JavaScript segment. Its basic role is to retrieve +selected RabbitMQ messages or message properties, and construct documents that +will then be stored in RavenDB. + +The script can store the whole message as a document, as in this segment: + +```javascript +// Add the document a metadata `@collection` property to keep it in +// this collection, or do not set it to store the document in @empty). +this['@metadata']['@collection'] = 'Orders'; +// Store the message as is, using its Id property as its RavenDB Id as well. put(this.Id.toString(), this) -`} - - - -But the script can also retrieve some information from the read message -and construct a new document that doesn't resemble the original message. -Scripts often apply two sections: a section that creates a JSON object -that defines the document's structure and contents, and a second section -that stores the document. - -E.g., for RabbitMQ messages of this format - - - -{`\{ +``` + +
+ +The script can also retrieve selected information from the read message and +construct a new document that does not resemble the original message. +Scripts often apply two sections: a section that creates a JSON object +that defines the document's structure and contents, and a second section +that stores the document. + +E.g., for RabbitMQ messages of this format: + +```json +{ "Id" : 13, "FirstName" : "John", "LastName" : "Doe" -\} -`} -
-
- -We can create this script - - - -{`var item = \{ - Id : this.Id, - FirstName : this.FirstName, - LastName : this.LastName, - FullName : this.FirstName + ' ' + this.LastName, - "@metadata" : \{ +} +``` + +
+ +We can create this script: + +```javascript +var item = { + Id : this.Id, + FirstName : this.FirstName, + LastName : this.LastName, + FullName : this.FirstName + ' ' + this.LastName, + "@metadata" : { "@collection" : "Users" - \} -\}; + } +}; // Use .toString() to pass the Id as a string even if RabbitMQ provides it as a number put(this.Id.toString(), item) -`} -
-
- -The script can also apply various other JavaScript commands, including -`load` to load a RavenDB document (e.g. to construct a document that -includes data from the retrieved message and complementing data from -existing RavenDB documents), `del` to remove existing RavenDB documents, -and [many others](../../../server/kb/javascript-engine.mdx#predefined-javascript-functions). -#### Storing documents in RavenDB collections - -The sink task consumes batches of queued messages and stores them in RavenDB -in a transactional manner, processing either the entire batch or none of it. +``` + +
+ +The script can also apply various other JavaScript commands, including +`load` to load a RavenDB document (e.g. to construct a document that +includes data from the retrieved message and complementing data from +existing RavenDB documents), `del` to remove existing RavenDB documents, +and [many others](../../../server/kb/javascript-engine.mdx#predefined-javascript-functions). + +
+ +--- + + + +## Storing documents in RavenDB collections + +The sink task consumes batches of queued messages and stores them in RavenDB +in a transactional manner, processing either the entire batch or none of it. + -Some script processing errors are allowed; when such an error occurs RavenDB -will skip the affected message, record the event in the logs, and alert the -user in Studio, but **continue processing the batch**. +Some script processing errors are allowed; when such an error occurs RavenDB +will skip the affected message, record the event in the logs, and alert the +user in Studio, but **continue processing the batch**. -Once a batch is consumed, the task confirms it by sending `_channel.BasicAck`. +Once a batch is consumed, the task confirms it by sending `_channel.BasicAck`. -Note that the number of documents included in a batch is +Note that the number of documents included in a batch is [configurable](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#configuration-options). -Producers may enqueue -[multiple instances](../../../server/ongoing-tasks/etl/queue-etl/rabbit-mq.mdx#message-duplication) +Producers may enqueue +[multiple instances](../../../server/ongoing-tasks/etl/queue-etl/rabbit-mq.mdx#message-duplication) of the same document. -if processing each message only once is important to the consumer, -it is **the consumer's responsibility** to verify the uniqueness of -each consumed message. - -Note that as long as the **Id** property of RabbitMQ messages is preserved -(so duplicate messages share an Id), the script's `put(ID, { ... })` command -will overwrite a previous document with the same Id and only one copy of -it will remain. +If processing each message only once is important to the consumer, +it is **the consumer's responsibility** to verify the uniqueness of +each consumed message. + +Note that as long as the **Id** property of RabbitMQ messages is preserved +(so duplicate messages share an Id), the script's `put(ID, { ... })` command +will overwrite a previous document with the same Id and only one copy of +it will remain. + + +
+ -## Client API + -#### Add a RabbitMQ Connection String +## Add a RabbitMQ Connection String -Prior to defining a RabbitMQ sink task, add a **RabbitMQ connection string** -that the task will use to connect the message brokers. +Prior to defining a RabbitMQ sink task, add a **RabbitMQ connection string** +that the task will use to connect the message brokers. -To create the connection string: +To create the connection string: -* Create a `QueueConnectionString`instance with the connection string configuration. - Pass it to the `PutConnectionStringOperation` store operation to add the connection string. +* Create a `QueueConnectionString` instance with the connection string configuration. + Pass it to the `PutConnectionStringOperation` store operation to add the connection string. - `QueueConnectionString`: - - -{`// Add RabbitMQ connection string +`QueueConnectionString`: + +```csharp +// Add RabbitMQ connection string var res = store.Maintenance.Send( new PutConnectionStringOperation( new QueueConnectionString - \{ + { Name = "RabbitMqConStr", BrokerType = QueueBrokerType.RabbitMq, RabbitMqConnectionSettings = new RabbitMqConnectionSettings() - \{ ConnectionString = "amqp://guest:guest@localhost:5672/" \} - \})); -`} - - - - `QueueBrokerType`: - - -{`public enum QueueBrokerType -\{ + { ConnectionString = "amqp://guest:guest@localhost:5672/" } + })); +``` + +
+ +`QueueBrokerType`: + +```csharp +public enum QueueBrokerType +{ None, Kafka, RabbitMq -\} -`} -
-
- - | Property | Type | Description | - |:-------------|:-------------|:-------------| - | **Name** | `string` | Connection string name | - | **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.RabbitMq` for a RabbitMQ connection string | - | **RabbitMqConnectionSettings** | `RabbitMqConnectionSettings[]` | A list of strings indicating RabbitMQ brokers connection details | -#### Add a RabbitMQ Sink Task - -To create the Sink task: - -* Create `QueueSinkScript` instances to define scripts with which the - task can process retrieved messages, apply JavaScript commands, construct - documents and store them in RavenDB. - - -{`// Define a Sink script +} +``` + +
+ +| Property | Type | Description | +|---|---|---| +| **Name** | `string` | Connection string name | +| **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.RabbitMq` for a RabbitMQ connection string | +| **RabbitMqConnectionSettings** | `RabbitMqConnectionSettings[]` | A list of strings indicating RabbitMQ brokers connection details | + +
+ +--- + + + +## Add a RabbitMQ Sink Task + +To create the Sink task: + +* Create `QueueSinkScript` instances to define scripts with which the + task can process retrieved messages, apply JavaScript commands, construct + documents and store them in RavenDB. + +```csharp +// Define a Sink script QueueSinkScript queueSinkScript = new QueueSinkScript -\{ +{ // Script name Name = "orders", // A list of RabbitMQ queues to connect - Queues = new List() \{ "orders" \}, + Queues = new List() { "orders" }, // Apply this script - Script = @"this['@metadata']['@collection'] = 'Orders'; + Script = @"this['@metadata']['@collection'] = 'Orders'; put(this.Id.toString(), this)" -\}; -`} - - - -* Prepare a `QueueSinkConfiguration`object with the sink task configuration. - - `QueueSinkConfiguration` properties: - - | Property | Type | Description | - |:-------------|:-------------|:-------------| - | **Name** | `string` | The sink task name | - | **ConnectionStringName** | `string` | The registered connection string name | - | **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.RabbitMq` to define a RabbitMQ sink task | - | **Scripts** | `List` | A list of scripts | - -* Pass this object to the `AddQueueSinkOperation` store operation to add the Sink task. - - `QueueSinkScript` properties: - - | Property | Type | Description | - |:-------------|:-------------|:-------------| - | **Name** | `string` | Script name| - | **Queues** | `List` | A list of RabbitMQ queues to consume messages from | - | **Script** | `string ` | The script contents | - -**Code Sample**: - - -{`// Add Kafka connection string +}; +``` + +
+ +* Prepare a `QueueSinkConfiguration` object with the sink task configuration. + +`QueueSinkConfiguration` properties: + +| Property | Type | Description | +|---|---|---| +| **Name** | `string` | The sink task name | +| **ConnectionStringName** | `string` | The registered connection string name | +| **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.RabbitMq` to define a RabbitMQ sink task | +| **Scripts** | `List` | A list of scripts | + +* Pass this object to the `AddQueueSinkOperation` store operation to add the Sink task. + +`QueueSinkScript` properties: + +| Property | Type | Description | +|---|---|---| +| **Name** | `string` | Script name | +| **Queues** | `List` | A list of RabbitMQ queues to consume messages from | +| **Script** | `string` | The script contents | + +#### Example: + +```csharp +// Add RabbitMQ connection string var res = store.Maintenance.Send( new PutConnectionStringOperation( new QueueConnectionString - \{ - Name = "KafkaConStr", - BrokerType = QueueBrokerType.Kafka, - KafkaConnectionSettings = new KafkaConnectionSettings() - \{ BootstrapServers = "localhost:9092" \} - \})); + { + Name = "RabbitMqConStr", + BrokerType = QueueBrokerType.RabbitMq, + RabbitMqConnectionSettings = new RabbitMqConnectionSettings() + { ConnectionString = "amqp://guest:guest@localhost:5672/" } + })); // Define a Sink script QueueSinkScript queueSinkScript = new QueueSinkScript -\{ +{ // Script name Name = "orders", - // A list of Kafka topics to connect - Queues = new List() \{ "orders" \}, + // A list of RabbitMQ queues to connect + Queues = new List() { "orders" }, // Apply this script - Script = @"this['@metadata']['@collection'] = 'Orders'; + Script = @"this['@metadata']['@collection'] = 'Orders'; put(this.Id.toString(), this)" -\}; +}; -// Define a Kafka configuration +// Define a RabbitMQ sink task configuration var config = new QueueSinkConfiguration() -\{ +{ // Sink name - Name = "KafkaSinkTaskName", + Name = "RabbitMqSinkTaskName", // The connection string to connect the broker with - ConnectionStringName = "KafkaConStr", + ConnectionStringName = "RabbitMqConStr", // What queue broker is this task using - BrokerType = QueueBrokerType.Kafka, + BrokerType = QueueBrokerType.RabbitMq, // The list of scripts to run - Scripts = \{ queueSinkScript \} -\}; + Scripts = { queueSinkScript } +}; -AddQueueSinkOperationResult addQueueSinkOperationResult = +AddQueueSinkOperationResult addQueueSinkOperationResult = store.Maintenance.Send(new AddQueueSinkOperation(config)); -`} -
-
+``` +
+
-## Configuration Options + -Use these configuration options to gain more control over queue sink tasks. +Use these configuration options to gain more control over queue sink tasks. * [QueueSink.MaxBatchSize](../../../server/configuration/queue-sink-configuration.mdx#queuesinkmaxbatchsize) The maximum number of pulled messages consumed in a single batch. * [QueueSink.MaxFallbackTimeInSec](../../../server/configuration/queue-sink-configuration.mdx#queuesinkmaxfallbacktimeinsec) - The maximum number of seconds the Queue Sink process will be in a fallback - mode (i.e. suspending the process) after a connection failure. - - - + The maximum number of seconds the Queue Sink process will be in a fallback + mode (i.e. suspending the process) after a connection failure. +