I want to read from the tail of a stream, but when I run the below program, this doesn't appear to be happening.
I suspect I've misunderstood a concept, but would appreciate some clarification on how to correctly read only new data?
use std::error::Error;
use pravega_client::{
client_factory::ClientFactoryAsync,
event::{reader_group::ReaderGroupConfigBuilder, EventWriter},
};
use pravega_client_config::ClientConfigBuilder;
use pravega_client_shared::{
Retention, ScaleType, Scaling, Scope, ScopedStream, Stream, StreamConfiguration,
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let config = ClientConfigBuilder::default()
.controller_uri("localhost:9090")
.build()?;
let handle = tokio::runtime::Handle::try_current()?;
let factory = ClientFactoryAsync::new(config, handle);
let scope_name = "tutorial";
let stream_name = "parallel-key-numbers";
let scoped_stream = ScopedStream {
scope: Scope::from(scope_name.to_string()),
stream: Stream::from(stream_name.to_string()),
};
setup_server(&factory, scoped_stream.clone(), 5).await?;
let mut event_writer = factory.create_event_writer(scoped_stream.clone());
write_event(&mut event_writer, "a", "my_key").await?;
write_event(&mut event_writer, "b", "my_key").await?;
write_event(&mut event_writer, "c", "my_key").await?;
write_event(&mut event_writer, "d", "my_key").await?;
write_event(&mut event_writer, "e", "my_key").await?;
write_event(&mut event_writer, "f", "my_key").await?;
event_writer.flush().await?;
let event_reader_group = factory
.create_reader_group_with_config(
scoped_stream.scope.clone(),
"parallel_numbers_reader_group".to_string(),
ReaderGroupConfigBuilder::default()
.read_from_tail_of_stream(scoped_stream.clone())
.build(),
)
.await;
let mut event_reader = event_reader_group
.create_reader("reader_1".to_string())
.await;
while let Some(mut slice) = event_reader.acquire_segment().await? {
while let Some(event) = slice.next() {
let text = String::from_utf8(event.value)?;
println!("{}", text);
}
}
Ok(())
}
async fn setup_server(
factory: &ClientFactoryAsync,
scoped_stream: ScopedStream,
min_num_segments: i32,
) -> Result<(), Box<dyn Error>> {
let controller_client = factory.controller_client();
if !controller_client
.check_scope_exists(&scoped_stream.scope)
.await?
{
controller_client.create_scope(&scoped_stream.scope).await?;
}
if !controller_client
.check_stream_exists(&scoped_stream)
.await?
{
let scaling = Scaling {
scale_type: ScaleType::FixedNumSegments,
target_rate: 0,
scale_factor: 0,
min_num_segments,
};
let retention = Retention {
..Default::default()
};
let stream_config = StreamConfiguration::new(scoped_stream, scaling, retention, None);
controller_client.create_stream(&stream_config).await?;
}
Ok(())
}
async fn write_event(
event_writer: &mut EventWriter,
data: &str,
routing_key: &str,
) -> Result<(), Box<dyn Error>> {
event_writer
.write_event_by_routing_key(routing_key.to_string(), data.as_bytes().to_vec())
.await
.await??;
Ok(())
}
I want to read from the tail of a stream, but when I run the below program, this doesn't appear to be happening.
1st run, there is no output, I'm expecting
a b c d e f2nd run, the output is
a b c d e f3rd run the output is
a b c d e f a b c d e f, I'm expectinga b c d e fI suspect I've misunderstood a concept, but would appreciate some clarification on how to correctly read only new data?