applicationB_orders
pointing to a physical topic orders
, a client that’s able to access the physical one would be able to see both topics.
ConcentrationRule
:
spec.pattern
:
concentrated.topicA
and concentrated.topicB
) with partition counts of 3 and 4 respectively, mapped to a single physical topic (physical.topic
) with three partitions.
To ensure that consumers don’t consume messages from other partitions or from other concentrated topics, we store the concentrated partition and the concentrated topic name in the record headers. Gateway will automatically filter the messages that should be returned to the consumer.
ConcentrationRule
has to have a backing topic for each of them, otherwise it won’t let you create the topic.
spec.deleteCompact
is commented out, trying to create this topic will fail:
ConcentrationRule
. This prevents you from declaring a backing topic with a cleanup.policy of delete on the ConcentrationRule spec.physicalTopic.compact
field.
partitions
cleanup.policy
retention.ms
retention.bytes
delete.retention.ms
retention.ms
and retention.bytes
can be set to values lower or equal to the backing topic. If a user tries to create a topic with a higher value, topic creation will fail with an error:
spec.autoManaged
.
retention.ms
and retention.bytes
are not cleanup but retention guarantees.autoManaged
is enabled:
retention.ms
and retention.bytes
are allowed. This automatically extends the configuration of the backing topic.retention.ms = -1
), all the topics with the same cleanup policy associated with the rule will also inherit this extended configuration and have infinite retention.EndOffset
of the physical topic. This can create confusion for customers and applications that will see incorrect metrics.
To counter this, we’ve implemented a dedicated offset management capability for ConcentrationRules
. To enable virtual offsets, add the following line to the ConcentrationRule
:
spec.offsetCorrectness
only applies to concentrated topics with the cleanup.policy=delete
spec.offsetCorrectness
is not retroactive on previously created Concentrated Topicsfetch.max.wait.ms
, if non-default).
DeleteRecords
is not supportedIsolationLevel.READ_UNCOMMITTED
is supported (using IsolationLevel.READ_COMMITTED
is undefined behavior)unclean.leader.election=true
) may not be detected by consumersoffsetCorrectness
enabled, there’s currently a limitation for consumer groups where the data in the topics is slow moving, and/or the consumer groups are not committing their offsets frequently.
If a consumer group with a committed offset waits for the backing physical topic longer than the retention time (without committing a new offset), there’s a possibility for that consumer group to become blocked.
In this scenario, a consumer group whose last committed offset has been removed from the topic, the group becomes blocked only if Gateway restarted before the next offset commit. If this limitation happens, the offsets for the affected consumer group will need to be manually reset for it to continue.
FetchResponse
only (i.e., resulting topic is read-only):
SELECT [list of fields] FROM [topic name] WHERE [field filter criteria]
-
characters have to be double quoted, as the dash is not a valid character for a SQL name. For example, if you have a topic our-orders
, use SELECT * FROM "our-orders" WHERE ...
AND
operator is supported=
, >
, >=
, <
, <=
, <>
and REGEXP
(RegExp MySQL Operator).. WHERE record.key = 'some thing'
.. WHERE record.key.someValue.someChildValue = 'some thing'
.. WHERE $.someValue.someChildValue = 'some thing'
.. WHERE record.partition = 1
.. WHERE record.timestamp = 98717823712
.. WHERE record.header.someHeaderKey = 'some thing'
.. WHERE record.offset = 1
SELECT
clause to alter the shape of the data returned; however, for schema’d data (Avro and Protobuf) you must not use a projection, i.e. the select should be in the form:
SELECT * FROM ...
Filtering with the where clause is still supported.
Key | Type | Description |
---|---|---|
virtualTopic | String | if virtualTopic exists, fetch this topic will get the data from the statement without configure it’s own statement. |
statement | String | SQL Statement |
schemaRegistryConfig | Schema Registry | Schema Registry Config |
Key | Type | Default | Description |
---|---|---|---|
type | string | CONFLUENT | The type of schema registry to use: choose CONFLUENT (for Confluent-like schema registries including OSS Kafka) or AWS for AWS Glue schema registries. |
additionalConfigs | map | Additional properties maps to specific security-related parameters. For enhanced security, you can hide the sensitive values using environment variables as secrets. | |
Confluent Like | Configuration for Confluent-like schema registries | ||
host | string | URL of your schema registry. | |
cacheSize | string | 50 | Number of schemas that can be cached locally by this interceptor so that it doesn’t have to query the schema registry every time. |
AWS Glue | Configuration for AWS Glue schema registries | ||
region | string | The AWS region for the schema registry, e.g. us-east-1 | |
registryName | string | The name of the schema registry in AWS (leave blank for the AWS default of default-registry ) | |
basicCredentials | string | Access credentials for AWS (see below for structure) | |
AWS Credentials | AWS credentials configuration | ||
accessKey | string | The access key for the connection to the schema registry. | |
secretKey | string | The secret key for the connection to the schema registry. | |
validateCredentials | bool | true | true / false flag to determine whether the credentials provided should be validated when set. |
accountId | string | The Id for the AWS account to use. |
basicCredentials
section for the AWS Glue schema registry, the client used to connect will attempt to find the connection information from the environment. The required credentials can be passed to Gateway in this way as part of core configuration.
Find out more in AWS documentation.
.. record.key == 'some thing'
.. record.key.someValue.someChildValue == 'some thing'
.. record.value.someValue.someChildValue == 'some thing'
.. record.partition == 1
.. record.timestamp == 98717823712
.. record.header.someHeaderKey == 'some thing'
.. record.offset == 1
key | type | description |
---|---|---|
virtualTopic | String | When accessed, this virtual topic retrieves filtered data from the specified client topic, applying a CEL expression for data filtering. |
topic | String | Specifies the client topic from which data is fetched. |
expression | String | A CEL expression that returns BOOLEAN to filter data. This determines which data from the topic is relevant based on the given criteria. |
schemaRegistryConfig | Schema Registry | Schema Registry Config |
celCacheSize | int | In memory cache size for cel expressions, balancing speed and resource use, optimize performance. |
Key | Type | Default | Description |
---|---|---|---|
type | string | CONFLUENT | The type of schema registry to use: choose CONFLUENT (for Confluent-like schema registries including OSS Kafka) or AWS for AWS Glue schema registries. |
additionalConfigs | map | Additional properties maps to specific security-related parameters. For enhanced security, you can hide the sensitive values using environment variables as secrets. | |
Confluent Like | Configuration for Confluent-like schema registries | ||
host | string | URL of your schema registry. | |
cacheSize | string | 50 | Number of schemas that can be cached locally by this interceptor so that it doesn’t have to query the schema registry every time. |
AWS Glue | Configuration for AWS Glue schema registries | ||
region | string | The AWS region for the schema registry, e.g. us-east-1 | |
registryName | string | The name of the schema registry in AWS (leave blank for the AWS default of default-registry ) | |
basicCredentials | string | Access credentials for AWS (see below section for structure) | |
AWS Credentials | AWS Credentials Configuration | ||
accessKey | string | The access key for the connection to the schema registry. | |
secretKey | string | The secret key for the connection to the schema registry. | |
validateCredentials | bool | true | true / false flag to determine whether the credentials provided should be validated when set. |
accountId | string | The Id for the AWS account to use. |
basicCredentials
section for the AWS Glue schema registry, the client used to connect will attempt to find the connection information from the environment. The required credentials can be passed to Gateway in this way as part of core configuration.
Find out more about the setup for this from AWS documentation.
See more about schema
registry here
Property name | Type | Description |
---|---|---|
logicalTopicRegex | string | A regular expression to match all topic names that will be concentrated, specified as a URL parameter |
topicName | string | The name of the underlying topic to concentrate to |