Step 6: Consuming Data
Choose your client
For the producer application you have chosen a certain method. Use the links below to continue the Getting Started for the preferred method.+ Use the selector below to follow the right steps for your client type.
Simple Java Consumer Application using the latest version of the Java Axual Client |
Simple .NET Consumer Application using the latest version of the .NET Axual Client |
Simple REST Consumer using any tool that can perform REST calls |
building a Java application yourself using Axual Java Client to consume data from the stream. |
Building a .NET application yourself using Axual .NET Client to consume data from the stream. |
Creating REST calls to consume data from the stream. |
As of release 2020.3 you can also use the Python client repository |
Creating A Java Consumer Application
When you have completed this step, you will have set up a consumer application that is consuming data from the stream configured in step 2 in Avro format. To get some data onto the stream, follow step 5: Create A Producer Application.
To create a consumer application, the start is similar to creating a producer application. You start by including the maven dependency on the Axual Client and create the configuration for it:
<dependency>
<groupId>io.axual.client</groupId>
<artifactId>axual-client</artifactId>
<version>5.7.0</version>
</dependency>
In order to create a very basic consumer application, this is all you need. Next step is to create the configuration for the AxualClient
and also for the Consumer
that will be used.
ClientConfig config = ClientConfig.newBuilder()
// This is the app ID as it is configured in the self service
.setApplicationId("io.axual.example.client.avro.consumer")
.setApplicationVersion("0.0.1")
// The endoint of the Discovery API (used to retrieve information about bootstrap servers, schema registry, TTL etc...)
.setEndpoint("https://192.168.99.100:443")
// The tenant you are part of
.setTenant("demo")
// The environment corresponding to the shortname of the env in the self service
.setEnvironment("example")
// The ssl configuration for your application. The certificate used should have a DN
// matching the DN that was configured in self service
.setSslConfig(
SslConfig.newBuilder()
.setEnableHostnameVerification(false)
.setKeystoreLocation("/path/to/example-consumer.client.jks") // Absolute path to the application keystore
.setKeystorePassword(new PasswordConfig("notsecret"))
.setKeyPassword(new PasswordConfig("notsecret"))
.setTruststoreLocation("/path/to/common-truststore.jks") // Absolute path to the application truststore
.setTruststorePassword(new PasswordConfig("notsecret"))
.build()
)
.build();
Check your care package for the truststore and keystore files, see also Step 3 |
The next step is creating a ConsumerConfig
, similar to creating the ProducerConfig
in step 5.
SpecificAvroConsumerConfig<Application, ApplicationLogEvent> specificAvroConsumerConfig =
SpecificAvroConsumerConfig.<Application, ApplicationLogEvent>builder()
// We want to make sure we get all the messages at least once. On a Kafka level, this means
// that the offsets are committed once the message have been processed bey the application.
.setDeliveryStrategy(DeliveryStrategy.AT_LEAST_ONCE)
.setStream("applicationlogevents")
.setProxyChain(ProxyChain.newBuilder()
.append(SWITCHING_PROXY_ID)
.append(RESOLVING_PROXY_ID)
.append(LINEAGE_PROXY_ID)
.append(HEADER_PROXY_ID)
.build())
.build();
A consumer will also need a Processor
. This is where all the business logic is defined on how to handle the consumed messages. In this example, we will use a simple LogEventSpecificConsumer that implements the Processor
that logs the key and value for each consumed message.
public class LogEventSpecificConsumer implements Processor<Application, ApplicationLogEvent>, AutoCloseable {
public static final Logger LOG = LoggerFactory.getLogger(LogEventSpecificConsumer.class);
private final Consumer<Application, ApplicationLogEvent> consumer;
private final LinkedList<ConsumerMessage<Application, ApplicationLogEvent>> received = new LinkedList<>();
public LogEventSpecificConsumer(
final AxualClient axualClient
, final SpecificAvroConsumerConfig<Application
, ApplicationLogEvent> consumerConfig) {
this.consumer = axualClient.buildConsumer(consumerConfig, this);
// This will start a user thread that does the actual consumption and processing of messages
this.consumer.startConsuming();
}
// This method is used to process the message.
@Override
public void processMessage(ConsumerMessage<Application, ApplicationLogEvent> msg) {
LOG.info("Received message on topic {} partition {} offset {} key {} value {}", msg.getSystem(), msg.getPartition(), msg.getOffset(), msg.getKey(), msg.getValue());
received.add(msg);
}
public LinkedList<ConsumerMessage<Application, ApplicationLogEvent>> getReceived() {
return received;
}
@Override
public void close() {
this.consumer.stopConsuming();
}
public boolean isConsuming() {
return this.consumer.isConsuming();
}
}
The consumer is then started like this:
/* Both the AxualClient and LogEventSpecificConsumer are AutoClosable and can be used in try-with-resources.*/
try (final AxualClient axualClient = new AxualClient(config);
final LogEventSpecificConsumer consumer = new LogEventSpecificConsumer(axualClient, specificAvroConsumerConfig)) {
// We want to prevent that the main thread exits the try-with-resources block, since that
// will cause the consumer and axualClient to close, ending the user thread that started
// the consumption. So in this example we make the main thread sleep for a second as long
// as the consumer is still consuming.
while (consumer.isConsuming()) {
sleep(1000);
}
}
When all of the above steps have been done correctly, start your consumer app.
With the logging level for the io.axual
package to INFO
, your app will produce logging that will look like this:
[main] INFO io.axual.client.consumer.base.BaseConsumer - Created consumer with source of class: io.axual.client.consumer.avro.AvroMessageSource. [pool-1-thread-1] INFO io.axual.discovery.client.fetcher.DiscoveryLoader - TTL updated to: 600000 (was: 0) [pool-1-thread-1] INFO io.axual.discovery.client.fetcher.DiscoveryLoader - Fetched discovery properties: DiscoveryResult: {...} [pool-1-thread-1] INFO io.axual.client.proxy.switching.discovery.DiscoverySubscriber - Received new DiscoveryResult for SwitchingConsumer: DiscoveryResult: {...} [pool-1-thread-1] INFO io.axual.client.proxy.switching.discovery.DiscoverySubscriber - Switching SwitchingConsumer from null to local [pool-1-thread-1] INFO io.axual.client.proxy.switching.generic.BaseClientProxySwitcher - Creating new backing consumer with Discovery API result: DiscoveryResult: {...} [pool-1-thread-1] INFO io.axual.client.proxy.switching.consumer.ConsumerSwitcher - Creating a new consumer with properties: {...} [pool-1-thread-1] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: {...} [pool-1-thread-1] INFO io.confluent.kafka.serializers.KafkaAvroDeserializerConfig - KafkaAvroDeserializerConfig values: {...} [pool-1-thread-1] INFO io.confluent.kafka.serializers.KafkaAvroDeserializerConfig - KafkaAvroDeserializerConfig values: {...} [pool-1-thread-1] INFO io.confluent.kafka.serializers.KafkaAvroDeserializerConfig - KafkaAvroDeserializerConfig values: {...} [pool-1-thread-1] INFO io.confluent.kafka.serializers.KafkaAvroDeserializerConfig - KafkaAvroDeserializerConfig values: {...} [pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'acl.principal.builder' was supplied but isn't a known config. [pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'cluster' was supplied but isn't a known config. [pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'instance' was supplied but isn't a known config. [pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'resolvingconsumerpartitionassignor.topic.resolver' was supplied but isn't a known config. [pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'topic.pattern' was supplied but isn't a known config. [pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'schema.registry.url' was supplied but isn't a known config. [pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'endpoint' was supplied but isn't a known config. [pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'resolvingconsumerpartitionassignor.backing.assignor' was supplied but isn't a known config. [pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'tenant' was supplied but isn't a known config. [pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'group.id.pattern' was supplied but isn't a known config. [pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'app.id' was supplied but isn't a known config. [pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'distributor.distance' was supplied but isn't a known config. [pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'ttl' was supplied but isn't a known config. [pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'enable.value.headers' was supplied but isn't a known config. [pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'app.version' was supplied but isn't a known config. [pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'rest.proxy.url' was supplied but isn't a known config. [pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'environment' was supplied but isn't a known config. [pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'system' was supplied but isn't a known config. [pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'distributor.timeout' was supplied but isn't a known config. [pool-1-thread-1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.3.0 [pool-1-thread-1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: fc1aaa116b661c8a [pool-1-thread-1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1585853689619 [pool-1-thread-1] INFO io.axual.client.proxy.switching.generic.BaseClientProxySwitcher - Created new backing consumer [pool-1-thread-1] INFO io.axual.client.proxy.switching.consumer.ConsumerSwitcher - Consumer switched, applying assignments and subscriptions [pool-1-thread-1] INFO io.axual.client.proxy.switching.consumer.ConsumerSwitcher - Consumer switch finished [pool-1-thread-1] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=demo-Local-example-io.axual.example.client.avro.consumer] Cluster ID: oA8Db59vRi21SnnMO2-G3g [pool-1-thread-1] INFO io.axual.client.proxy.switching.consumer.SwitchingConsumer - Subscribing to topics: [applicationlogevents] [pool-1-thread-1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=demo-Local-example-io.axual.example.client.avro.consumer] Subscribed to topic(s): demo-Local-example-applicationlogevents [pool-1-thread-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=demo-Local-example-io.axual.example.client.avro.consumer] Discovered group coordinator 192.168.99.100:9096 (id: 2147483646 rack: null) [pool-1-thread-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=demo-Local-example-io.axual.example.client.avro.consumer] Revoking previously assigned partitions [] [pool-1-thread-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=demo-Local-example-io.axual.example.client.avro.consumer] (Re-)joining group [pool-1-thread-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=demo-Local-example-io.axual.example.client.avro.consumer] (Re-)joining group [pool-1-thread-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=demo-Local-example-io.axual.example.client.avro.consumer] Successfully joined group with generation 5 [pool-1-thread-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=demo-Local-example-io.axual.example.client.avro.consumer] Setting newly assigned partitions: demo-Local-example-applicationlogevents-0 [pool-1-thread-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=demo-Local-example-io.axual.example.client.avro.consumer] Setting offset for partition demo-Local-example-applicationlogevents-0 to the committed offset FetchPosition{offset=10, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=192.168.99.100:9096 (id: 1 rack: rack-1), epoch=0}} [pool-1-thread-1] INFO io.axual.client.example.axualclient.avro.LogEventSpecificConsumer - Received message on topic demo-system partition 0 offset 10 key {"name": "app_0", "version": "1.9.9", "owner": "none"} value {"timestamp": 1000, "source": {"name": "logeventproducer", "version": "0.0.1", "owner": "Team Log"}, "context": {"Some key": "Some Value"}, "level": "INFO", "message": "Message 0"}
This is all the coding required to start a basic consumer!
Creating A .NET Consumer Application
When you have completed this step, you will have set up a consumer application that is consuming data from the stream configured in step 2 in Avro format. To get some data onto the stream, follow step 5: Create A Producer Application.
Adding Dependencies
Start by including the dependency on the Axual .NET client library as you would do with any dependency or
by adding PackageReference
tag in your .csproj
project.
<ItemGroup>
<PackageReference Include="Axual.Kafka.Proxy" Version="1.3.0" />
<PackageReference Include="Axual.SchemaRegistry.Serdes.Avro" Version="1.3.0" />
</ItemGroup>
Building The Application
In order to create a very basic consumer application, this is all you need.
Next step is to create the configuration for the AxualConsumerBuilder
.
Consumer Configuration
var config = new AxualConsumerConfig
{
// This is the app ID as it is configured in the self service
ApplicationId = "loganalyzer",
// The endoint of the Discovery API (used to retrieve information about bootstrap servers, schema registry, TTL etc...)
EndPoint = new UriBuilder("https", "192.168.99.100", 443).Uri,
// The tenant you are part of
Tenant = "demo",
// The environment corresponding to the shortname of the env in the self service
Environment = "example",
SecurityProtocol = SecurityProtocol.Ssl,
SslKeystorePassword = "notsecret",
SslKeystoreLocation = SSL_KEYSTORE_LOCATION, (1)
EnableSslCertificateVerification = true,
// Client verifies the identity of the broker
SslCaLocation = SSL_CA_PATH, (2)
SslEndpointIdentificationAlgorithm = SslEndpointIdentificationAlgorithm.None,
};
1 | For SSL_KEYSTORE_LOCATION use the absolute path to the keystore file, see also Enabling security |
2 | For SSL_CA_PATH use the absolute path to the CA file, see also Enabling security |
Creating The Consumer
With the above configurations, we can instantiate the AxualConsumerBuilder
to build a AxualConsumer
instantiate.
var consumer = new AxualConsumerBuilder<Application, ApplicationLogEvent>(config)
.SetKeyDeserializer(new SpecificAvroDeserializer<Application>())
.SetValueDeserializer(new SpecificAvroDeserializer<ApplicationLogEvent>())
.SetLogHandler((_, l) => Console.WriteLine($"> [{l.Level}]: {l.Message}"))
.SetErrorHandler((_, e) => Console.WriteLine($"> [Error]: {e.Reason}"))
.Build()
Consuming Messages
Now, we are ready to start receive records using the AxualConsumer
.
consumer.Subscribe("applicationlogevents");
try
{
while (true)
{
try
{
var consumeResult = consumer.Consume(cancellationToken);
if (consumeResult.IsPartitionEOF)
{
Console.WriteLine(
$"> Reached end of stream {consumeResult.Topic}, partition " +
$"{consumeResult.Partition}, offset {consumeResult.Offset}.");
continue;
}
Console.WriteLine(
$"> Received message at {consumeResult.TopicPartitionOffset}: " + Environment.NewLine +
$"Key: {consumeResult.Message.Key}" + Environment.NewLine +
$"Value: {consumeResult.Message.Value}");
try
{
consumer.Commit(consumeResult);
}
catch (KafkaException e)
{
Console.WriteLine($"> Commit error: {e.Error.Reason}");
}
}
catch (ConsumeException e)
{
Console.WriteLine($"> Consume error: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("> Closing consumer.");
consumer.Close();
}
When all of the above steps have been done correctly, start your consumer app, your app will produce logging that will look like this:
---------------------------------------------------------------------------------------- 'axual_client_proxy_specific_avro_consumer' consuming from stream 'applicationlogevents' ---------------------------------------------------------------------------------------- Started consumer, Ctrl-C to stop consuming > Received message at applicationlogevents [[1]] @41677: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1000, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 0 = some value 0}, log level = INFO, message = Message 0 > Received message at applicationlogevents [[1]] @41678: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1001, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 1 = some value 1}, log level = INFO, message = Message 1 > Received message at applicationlogevents [[1]] @41679: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1002, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 2 = some value 2}, log level = INFO, message = Message 2 > Received message at applicationlogevents [[1]] @41680: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1003, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 3 = some value 3}, log level = INFO, message = Message 3 > Received message at applicationlogevents [[1]] @41681: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1004, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 4 = some value 4}, log level = INFO, message = Message 4 > Received message at applicationlogevents [[1]] @41682: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1005, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 5 = some value 5}, log level = INFO, message = Message 5 > Received message at applicationlogevents [[1]] @41683: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1006, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 6 = some value 6}, log level = INFO, message = Message 6 > Received message at applicationlogevents [[1]] @41684: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1007, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 7 = some value 7}, log level = INFO, message = Message 7 > Received message at applicationlogevents [[1]] @41685: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1008, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 8 = some value 8}, log level = INFO, message = Message 8 > Received message at applicationlogevents [[1]] @41686: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1009, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 9 = some value 9}, log level = INFO, message = Message 9
This is all the coding required to start a basic consumer!
Creating A REST Consumer
When you have completed this step, you will start consuming data from the stream configured in step 2 in Avro format.
Consuming data via REST
For the following steps, we’re going to use curl
tool to consume data.
curl --request GET \
--url "https://192.168.99.100:18100/stream/example/applicationlogevents" \
--header "axual-application-id: io.axual.example.client.avro.consumer" \
--header "axual-application-version: 0.0.1" \
--header "axual-consumer-uuid: log-consumer1" \
--header "axual-key-type: AVRO" \
--header "axual-value-type: AVRO" \
--header "axual-commit-strategy: AFTER_READ" \
--header "axual-polling-timeout-ms: 10000" \
--header "Content-Type: application/json" \
--key ../client-cert/local-config/security/applications/example-consumer/pem/example_consumer.key \
--cert ../client-cert/local-config/security/applications/example-consumer/cer/example_consumer.cer \
--cacert ../client-cert/local-config/security/applications/common-truststore/cachain/tenant-root-ca.cert.pem
Check your care package for the key , cert and cacert parameter values, see also Step 3.
|
When you execute above command, this will return response that will look like this:
{
"cluster":"local",
"messages":[
{
"messageId":"bf4ea734-4cd5-49e8-8b76-67bd63311d5d",
"produceTimestamp":1585904295792,
"partition":0,
"offset":10,
"produceCluster":"local",
"consumeCluster":"local",
"headers":{
"Axual-Producer-Version":[
"MC4wLjE="
],
"Axual-Message-Id":[
"v06nNEzVSeiLdme9YzEdXQ=="
],
"Axual-Tenant":[
"ZGVtbw=="
],
"Axual-Serialization-Time":[
"AAABcT9DCmk="
],
"Axual-Environment":[
"ZXhhbXBsZQ=="
],
"Axual-Deserialization-Time":[
"AAABcT9DWhQ="
],
"Axual-Intermediate-Version":[
"MC4wLjE="
],
"Axual-System":[
"ZGVtby1zeXN0ZW0="
],
"Axual-Intermediate-Id":[
"aW8uYXh1YWwuZXhhbXBsZS5jbGllbnQuYXZyby5jb25zdW1lcg=="
],
"Axual-Cluster":[
"bG9jYWw="
],
"Axual-Instance":[
"bG9jYWw="
],
"Axual-Producer-Id":[
"aW8uYXh1YWwuZXhhbXBsZS5jbGllbnQuYXZyby5wcm9kdWNlcg=="
]
},
"keyMessage":{
"type":"AVRO",
"schema":null,
"schemaId":null,
"message":"{\"name\": \"app_0\", \"version\": \"1.9.9\", \"owner\": \"none\"}"
},
"valueMessage":{
"type":"AVRO",
"schema":null,
"schemaId":null,
"message":"{\"timestamp\": 1000, \"source\": {\"name\": \"logeventproducer\", \"version\": \"0.0.1\", \"owner\": \"Team Log\"}, \"context\": {\"Some key\": \"Some Value\"}, \"level\": \"INFO\", \"message\": \"Message 0\"}"
}
}
]
}
For more details, please refer Rest-Proxy Consume Service
Wrapping up
You have concluded the getting started section by preparing your stream & applications, requesting access to the stream and actually producing and consuming some data. If you are going to deploy your application in another environment, it is advised to enable monitoring.
Proceed to Step 7: Enabling Monitoring
You can also use the menu on the left to find information about other platform features, that might not have been touched in this Getting Started.