How to Use Kafka Connect with Vultr Managed Apache Kafka®

Updated on March 21, 2025

Kafka Connect is a key component that enables scalable and reliable data integration in Vultr Managed Apache Kafka® cluster. You can use Kafka Connect to move data in and out of topics in a Vultr Managed Apache Kafka®, providing an efficient way to integrate multiple applications such as databases, Object Storage, and Elasticsearch in your cluster.

Follow this guide to enable Kafka Connect in Vultr Managed Apache Kafka® and create new connectors using the Vultr Customer Portal or Vultr API.

  • Vultr Customer Portal
  • Vultr API
  1. Navigate to Products and click Databases.

  2. Click your target Vultr Managed Apache Kafka® cluster to open its management page.

  3. Navigate to the Kafka Connect tab.

    Access Kafka Connect

  4. Verify the list of available connectors compatible with your cluster.

  5. Choose your desired connector type:

    • Sink: Read data from Kafka topics and push the data to external sources such as search indexes, batch systems and databases for processsing.
    • Source: Pull data from external sources and publish the data to specific Kafka topics in the cluster.
  6. Click Add Connector.

  7. Specify the connector label and specify the topics to link in your cluster.

  8. Modify the default Config JSON parameters to include the connector information, such as:

    • Destination details.
    • Output format.
    • Queries.
    • Protocols.
    • Size specifications.
    • Maximum timeout and retry information.
  9. Remove any unused optional properties in the connector configuration.

  10. Click Add Connector to create the connector.

  11. Verify the list of active connectors in the cluster.

    View active connectors

  12. Click Connector Status to monitor the connector status.

    • Verify the Connector State information.
    • Click Refresh Status to refresh the connector information.
    • Click Restart Connector to restart it.
    • Click Pause Connector to pause the configuration.
    • Click Resume Connector to resume a paused connector.
  13. Click Edit Connector to modify the connector information.

  14. Click Delete Connector to delete the connector.

  1. Send a GET request to the List Managed Databases endpoint and note the target database cluster's ID.

    console
    $ curl "https://api.vultr.com/v2/databases" \
        -X GET \
        -H "Authorization: Bearer ${VULTR_API_KEY}"
    
  2. Send a GET request to the List Database Available Connectors endpoint, specifing the database ID to list all available connectors and note the target connector class.

    console
    $ curl "https://api.vultr.com/v2/databases/{database-id}/available-connectors" \
        -X GET \
        -H "Authorization: Bearer ${VULTR_API_KEY}"
    
  3. Send a GET request to the Get Database Connector Configuration Schema, specifying the target database ID and connector class to get the default connector configuration.

    console
    $ curl "https://api.vultr.com/v2/databases/{database-id}/available-connectors/{connector-class}/configuration" \
      -X GET \
      -H "Authorization: Bearer ${VULTR_API_KEY}"
    
  4. Send a GET request to the List Database Topics endpoint, specifying the target database ID to list all topics and note your target topic's ID.

    console
    $ curl "https://api.vultr.com/v2/databases/<database-id>/topics" \            
        -X GET \                                   
        -H "Authorization: Bearer ${VULTR_API_KEY}"
    
  5. Send a POST request to the Create Database Connector endpoint, specifying the target database ID, connector class and topics to create a new connector.

    console
    $ curl "https://api.vultr.com/v2/databases/{database-id}/connectors" \
      -X POST \
      -H "Authorization: Bearer ${VULTR_API_KEY}" \
      -H "Content-Type: application/json" \
      --data '{
        "name" : "<connector-name>",
        "class" : "<connector-class>",
        "topics" : "<topic>",
        "config" : {
          "{configuration property}": <value>,
          "{configuration property}": "<value>",
          "{configuration property}": "<value>"
        }
      }'
    
  6. Send a GET request to the List Database Connectors endpoint, specifying the target database ID to list all active connectors.

    console
    $ curl "https://api.vultr.com/v2/databases/{database-id}/connectors" \
      -X GET \
      -H "Authorization: Bearer ${VULTR_API_KEY}"
    
  7. Send a PUT request to the Update Database Connector endpoint, specifying the target database ID and the connector name to update its configuration.

    console
    $ curl "https://api.vultr.com/v2/databases/{database-id}/connectors/{connector-name}" \
      -X PUT \
      -H "Authorization: Bearer ${VULTR_API_KEY}" \
      -H "Content-Type: application/json" \
      --data '{
        "topics" : "<topic>",
        "config" : {
          "{configuration property}": <value>,
          "{configuration property}": "<value>",
          "{configuration property}": "<value>"
        }
      }'
    
  8. Send a GET request to the Get Database Connector Status endpoint, specifyng the target database ID and the connector name to get its status information.

    console
    $ curl "https://api.vultr.com/v2/databases/{database-id}/connectors/{connector-name}/status" \
      -X GET \
      -H "Authorization: Bearer ${VULTR_API_KEY}"
    
  9. Send a POST request to the Pause Database Connector endpoint, specifying the target database ID and the connector name to pause.

    console
    $ curl "https://api.vultr.com/v2/databases/{database-id}/connectors/{connector-name}/pause" \
      -X POST \
      -H "Authorization: Bearer ${VULTR_API_KEY}"
    
  10. Send a POST request to the Resume Database Connector endpoint, specifying the target database ID and the connector name to resume.

    console
    $ curl "https://api.vultr.com/v2/databases/{database-id}/connectors/{connector-name}/resume" \
      -X POST \
      -H "Authorization: Bearer ${VULTR_API_KEY}"
    
  11. Send a POST request to the Restart Database Connector Task endpoint, specifying the target database ID, and the connector task ID to restart.

    console
    $ curl "https://api.vultr.com/v2/databases/{database-id}/connectors/{connector-name}/tasks/{task-id}/restart" \
      -X POST \
      -H "Authorization: Bearer ${VULTR_API_KEY}"
    
  12. Send a DELETE request to the Delete Database Connector endpoint, specifying the target database ID and the connector name to delete from the cluster.

    console
    $ curl "https://api.vultr.com/v2/databases/{database-id}/connectors/{connector-name}" \
      -X DELETE \
      -H "Authorization: Bearer ${VULTR_API_KEY}"
    

Comments

No comments yet.