hydro_lang/live_collections/stream/
networking.rs

1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::stream::Retries;
16#[cfg(stageleft_runtime)]
17use crate::location::dynamic::DynLocation;
18use crate::location::external_process::ExternalBincodeStream;
19use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
20use crate::nondet::NonDet;
21use crate::staging_util::get_this_crate;
22
23// same as the one in `hydro_std`, but internal use only
24fn track_membership<'a, C, L: Location<'a> + NoTick>(
25    membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
26) -> KeyedSingleton<MemberId<C>, bool, L, Unbounded> {
27    membership.fold(
28        q!(|| false),
29        q!(|present, event| {
30            match event {
31                MembershipEvent::Joined => *present = true,
32                MembershipEvent::Left => *present = false,
33            }
34        }),
35    )
36}
37
38fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
39    let root = get_this_crate();
40
41    if is_demux {
42        parse_quote! {
43            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::__staged::location::MemberId<_>, #t_type), _>(
44                |(id, data)| {
45                    (id.raw_id, #root::runtime_support::bincode::serialize(&data).unwrap().into())
46                }
47            )
48        }
49    } else {
50        parse_quote! {
51            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
52                |data| {
53                    #root::runtime_support::bincode::serialize(&data).unwrap().into()
54                }
55            )
56        }
57    }
58}
59
60pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
61    serialize_bincode_with_type(is_demux, &quote_type::<T>())
62}
63
64fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
65    let root = get_this_crate();
66
67    if let Some(c_type) = tagged {
68        parse_quote! {
69            |res| {
70                let (id, b) = res.unwrap();
71                (#root::location::MemberId::<#c_type>::from_raw(id), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
72            }
73        }
74    } else {
75        parse_quote! {
76            |res| {
77                #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
78            }
79        }
80    }
81}
82
83pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
84    deserialize_bincode_with_type(tagged, &quote_type::<T>())
85}
86
87impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
88    /// "Moves" elements of this stream to a new distributed location by sending them over the network,
89    /// using [`bincode`] to serialize/deserialize messages.
90    ///
91    /// The returned stream captures the elements received at the destination, where values will
92    /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
93    /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
94    /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
95    /// dropped no further messages will be sent.
96    ///
97    /// # Example
98    /// ```rust
99    /// # use hydro_lang::prelude::*;
100    /// # use futures::StreamExt;
101    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
102    /// let p1 = flow.process::<()>();
103    /// let numbers: Stream<_, Process<_>, Unbounded> = p1.source_iter(q!(vec![1, 2, 3]));
104    /// let p2 = flow.process::<()>();
105    /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
106    /// // 1, 2, 3
107    /// # on_p2.send_bincode(&p_out)
108    /// # }, |mut stream| async move {
109    /// # for w in 1..=3 {
110    /// #     assert_eq!(stream.next().await, Some(w));
111    /// # }
112    /// # }));
113    /// ```
114    pub fn send_bincode<L2>(
115        self,
116        other: &Process<'a, L2>,
117    ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
118    where
119        T: Serialize + DeserializeOwned,
120    {
121        let serialize_pipeline = Some(serialize_bincode::<T>(false));
122
123        let deserialize_pipeline = Some(deserialize_bincode::<T>(None));
124
125        Stream::new(
126            other.clone(),
127            HydroNode::Network {
128                serialize_fn: serialize_pipeline.map(|e| e.into()),
129                instantiate_fn: DebugInstantiate::Building,
130                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
131                input: Box::new(self.ir_node.into_inner()),
132                metadata: other.new_node_metadata(
133                    Stream::<T, Process<'a, L2>, Unbounded, O, R>::collection_kind(),
134                ),
135            },
136        )
137    }
138
139    /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
140    /// using [`bincode`] to serialize/deserialize messages.
141    ///
142    /// Each element in the stream will be sent to **every** member of the cluster based on the latest
143    /// membership information. This is a common pattern in distributed systems for broadcasting data to
144    /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
145    /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
146    /// each element to all cluster members.
147    ///
148    /// # Non-Determinism
149    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
150    /// to the current cluster members _at that point in time_. Depending on when we are notified of
151    /// membership changes, we will broadcast each element to different members.
152    ///
153    /// # Example
154    /// ```rust
155    /// # use hydro_lang::prelude::*;
156    /// # use futures::StreamExt;
157    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
158    /// let p1 = flow.process::<()>();
159    /// let workers: Cluster<()> = flow.cluster::<()>();
160    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
161    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
162    /// # on_worker.send_bincode(&p2).entries()
163    /// // if there are 4 members in the cluster, each receives one element
164    /// // - MemberId::<()>(0): [123]
165    /// // - MemberId::<()>(1): [123]
166    /// // - MemberId::<()>(2): [123]
167    /// // - MemberId::<()>(3): [123]
168    /// # }, |mut stream| async move {
169    /// # let mut results = Vec::new();
170    /// # for w in 0..4 {
171    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
172    /// # }
173    /// # results.sort();
174    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
175    /// # }));
176    /// ```
177    pub fn broadcast_bincode<L2: 'a>(
178        self,
179        other: &Cluster<'a, L2>,
180        nondet_membership: NonDet,
181    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
182    where
183        T: Clone + Serialize + DeserializeOwned,
184    {
185        let ids = track_membership(self.location.source_cluster_members(other));
186        let join_tick = self.location.tick();
187        let current_members = ids
188            .snapshot(&join_tick, nondet_membership)
189            .filter(q!(|b| *b));
190
191        self.batch(&join_tick, nondet_membership)
192            .repeat_with_keys(current_members)
193            .all_ticks()
194            .demux_bincode(other)
195    }
196
197    /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
198    /// serialization. The external process can receive these elements by establishing a TCP
199    /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
200    ///
201    /// # Example
202    /// ```rust
203    /// # use hydro_lang::prelude::*;
204    /// # use futures::StreamExt;
205    /// # tokio_test::block_on(async move {
206    /// let flow = FlowBuilder::new();
207    /// let process = flow.process::<()>();
208    /// let numbers: Stream<_, Process<_>, Unbounded> = process.source_iter(q!(vec![1, 2, 3]));
209    /// let external = flow.external::<()>();
210    /// let external_handle = numbers.send_bincode_external(&external);
211    ///
212    /// let mut deployment = hydro_deploy::Deployment::new();
213    /// let nodes = flow
214    ///     .with_process(&process, deployment.Localhost())
215    ///     .with_external(&external, deployment.Localhost())
216    ///     .deploy(&mut deployment);
217    ///
218    /// deployment.deploy().await.unwrap();
219    /// // establish the TCP connection
220    /// let mut external_recv_stream = nodes.connect(external_handle).await;
221    /// deployment.start().await.unwrap();
222    ///
223    /// for w in 1..=3 {
224    ///     assert_eq!(external_recv_stream.next().await, Some(w));
225    /// }
226    /// # });
227    /// ```
228    pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
229    where
230        T: Serialize + DeserializeOwned,
231    {
232        let serialize_pipeline = Some(serialize_bincode::<T>(false));
233
234        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
235
236        let external_key = flow_state_borrow.next_external_out;
237        flow_state_borrow.next_external_out += 1;
238
239        flow_state_borrow.push_root(HydroRoot::SendExternal {
240            to_external_id: other.id,
241            to_key: external_key,
242            to_many: false,
243            serialize_fn: serialize_pipeline.map(|e| e.into()),
244            instantiate_fn: DebugInstantiate::Building,
245            input: Box::new(self.ir_node.into_inner()),
246            op_metadata: HydroIrOpMetadata::new(),
247        });
248
249        ExternalBincodeStream {
250            process_id: other.id,
251            port_id: external_key,
252            _phantom: PhantomData,
253        }
254    }
255}
256
257impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
258    Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
259{
260    /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
261    /// using [`bincode`] to serialize/deserialize messages.
262    ///
263    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
264    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
265    /// this API allows precise targeting of specific cluster members rather than broadcasting to
266    /// all members.
267    ///
268    /// # Example
269    /// ```rust
270    /// # use hydro_lang::prelude::*;
271    /// # use futures::StreamExt;
272    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
273    /// let p1 = flow.process::<()>();
274    /// let workers: Cluster<()> = flow.cluster::<()>();
275    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
276    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
277    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)))
278    ///     .demux_bincode(&workers);
279    /// # on_worker.send_bincode(&p2).entries()
280    /// // if there are 4 members in the cluster, each receives one element
281    /// // - MemberId::<()>(0): [0]
282    /// // - MemberId::<()>(1): [1]
283    /// // - MemberId::<()>(2): [2]
284    /// // - MemberId::<()>(3): [3]
285    /// # }, |mut stream| async move {
286    /// # let mut results = Vec::new();
287    /// # for w in 0..4 {
288    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
289    /// # }
290    /// # results.sort();
291    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
292    /// # }));
293    /// ```
294    pub fn demux_bincode(
295        self,
296        other: &Cluster<'a, L2>,
297    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
298    where
299        T: Serialize + DeserializeOwned,
300    {
301        self.into_keyed().demux_bincode(other)
302    }
303}
304
305impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
306    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
307    /// [`bincode`] to serialize/deserialize messages.
308    ///
309    /// This provides load balancing by evenly distributing work across cluster members. The
310    /// distribution is deterministic based on element order - the first element goes to member 0,
311    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
312    ///
313    /// # Non-Determinism
314    /// The set of cluster members may asynchronously change over time. Each element is distributed
315    /// based on the current cluster membership _at that point in time_. Depending on when cluster
316    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
317    /// membership is stable, the order of members in the round-robin pattern may change across runs.
318    ///
319    /// # Ordering Requirements
320    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
321    /// order of messages and retries affects the round-robin pattern.
322    ///
323    /// # Example
324    /// ```rust
325    /// # use hydro_lang::prelude::*;
326    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
327    /// # use futures::StreamExt;
328    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
329    /// let p1 = flow.process::<()>();
330    /// let workers: Cluster<()> = flow.cluster::<()>();
331    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
332    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
333    /// on_worker.send_bincode(&p2)
334    /// # .first().values() // we use first to assert that each member gets one element
335    /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
336    /// // - MemberId::<()>(?): [1]
337    /// // - MemberId::<()>(?): [2]
338    /// // - MemberId::<()>(?): [3]
339    /// // - MemberId::<()>(?): [4]
340    /// # }, |mut stream| async move {
341    /// # let mut results = Vec::new();
342    /// # for w in 0..4 {
343    /// #     results.push(stream.next().await.unwrap());
344    /// # }
345    /// # results.sort();
346    /// # assert_eq!(results, vec![1, 2, 3, 4]);
347    /// # }));
348    /// ```
349    pub fn round_robin_bincode<L2: 'a>(
350        self,
351        other: &Cluster<'a, L2>,
352        nondet_membership: NonDet,
353    ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
354    where
355        T: Serialize + DeserializeOwned,
356    {
357        let ids = track_membership(self.location.source_cluster_members(other));
358        let join_tick = self.location.tick();
359        let current_members = ids
360            .snapshot(&join_tick, nondet_membership)
361            .filter(q!(|b| *b))
362            .keys()
363            .assume_ordering(nondet_membership)
364            .collect_vec();
365
366        self.enumerate()
367            .batch(&join_tick, nondet_membership)
368            .cross_singleton(current_members)
369            .map(q!(|(data, members)| (
370                members[data.0 % members.len()],
371                data.1
372            )))
373            .all_ticks()
374            .demux_bincode(other)
375    }
376}
377
378impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
379    /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
380    /// using [`bincode`] to serialize/deserialize messages.
381    ///
382    /// Each cluster member sends its local stream elements, and they are collected at the destination
383    /// as a [`KeyedStream`] where keys identify the source cluster member.
384    ///
385    /// # Example
386    /// ```rust
387    /// # use hydro_lang::prelude::*;
388    /// # use futures::StreamExt;
389    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
390    /// let workers: Cluster<()> = flow.cluster::<()>();
391    /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
392    /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
393    /// # all_received.entries()
394    /// # }, |mut stream| async move {
395    /// // if there are 4 members in the cluster, we should receive 4 elements
396    /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
397    /// # let mut results = Vec::new();
398    /// # for w in 0..4 {
399    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
400    /// # }
401    /// # results.sort();
402    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
403    /// # }));
404    /// ```
405    ///
406    /// If you don't need to know the source for each element, you can use `.values()`
407    /// to get just the data:
408    /// ```rust
409    /// # use hydro_lang::prelude::*;
410    /// # use hydro_lang::live_collections::stream::NoOrder;
411    /// # use futures::StreamExt;
412    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
413    /// # let workers: Cluster<()> = flow.cluster::<()>();
414    /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
415    /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
416    /// # values
417    /// # }, |mut stream| async move {
418    /// # let mut results = Vec::new();
419    /// # for w in 0..4 {
420    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
421    /// # }
422    /// # results.sort();
423    /// // if there are 4 members in the cluster, we should receive 4 elements
424    /// // 1, 1, 1, 1
425    /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
426    /// # }));
427    /// ```
428    pub fn send_bincode<L2>(
429        self,
430        other: &Process<'a, L2>,
431    ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
432    where
433        T: Serialize + DeserializeOwned,
434    {
435        let serialize_pipeline = Some(serialize_bincode::<T>(false));
436
437        let deserialize_pipeline = Some(deserialize_bincode::<T>(Some(&quote_type::<L>())));
438
439        let raw_stream: Stream<(MemberId<L>, T), Process<'a, L2>, Unbounded, O, R> = Stream::new(
440            other.clone(),
441            HydroNode::Network {
442                serialize_fn: serialize_pipeline.map(|e| e.into()),
443                instantiate_fn: DebugInstantiate::Building,
444                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
445                input: Box::new(self.ir_node.into_inner()),
446                metadata: other.new_node_metadata(Stream::<
447                    (MemberId<L>, T),
448                    Process<'a, L2>,
449                    Unbounded,
450                    O,
451                    R,
452                >::collection_kind()),
453            },
454        );
455
456        raw_stream.into_keyed()
457    }
458
459    /// Broadcasts elements of this stream at each source member to all members of a destination
460    /// cluster, using [`bincode`] to serialize/deserialize messages.
461    ///
462    /// Each source member sends each of its stream elements to **every** member of the cluster
463    /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
464    /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
465    /// **only data elements** and sends each element to all cluster members.
466    ///
467    /// # Non-Determinism
468    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
469    /// to the current cluster members known _at that point in time_ at the source member. Depending
470    /// on when each source member is notified of membership changes, it will broadcast each element
471    /// to different members.
472    ///
473    /// # Example
474    /// ```rust
475    /// # use hydro_lang::prelude::*;
476    /// # use hydro_lang::location::MemberId;
477    /// # use futures::StreamExt;
478    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
479    /// # type Source = ();
480    /// # type Destination = ();
481    /// let source: Cluster<Source> = flow.cluster::<Source>();
482    /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
483    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
484    /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
485    /// # on_destination.entries().send_bincode(&p2).entries()
486    /// // if there are 4 members in the desination, each receives one element from each source member
487    /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
488    /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
489    /// // - ...
490    /// # }, |mut stream| async move {
491    /// # let mut results = Vec::new();
492    /// # for w in 0..16 {
493    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
494    /// # }
495    /// # results.sort();
496    /// # assert_eq!(results, vec![
497    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
498    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
499    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
500    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
501    /// # ]);
502    /// # }));
503    /// ```
504    pub fn broadcast_bincode<L2: 'a>(
505        self,
506        other: &Cluster<'a, L2>,
507        nondet_membership: NonDet,
508    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
509    where
510        T: Clone + Serialize + DeserializeOwned,
511    {
512        let ids = track_membership(self.location.source_cluster_members(other));
513        let join_tick = self.location.tick();
514        let current_members = ids
515            .snapshot(&join_tick, nondet_membership)
516            .filter(q!(|b| *b));
517
518        self.batch(&join_tick, nondet_membership)
519            .repeat_with_keys(current_members)
520            .all_ticks()
521            .demux_bincode(other)
522    }
523}
524
525impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
526    Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
527{
528    /// Sends elements of this stream at each source member to specific members of a destination
529    /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
530    ///
531    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
532    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
533    /// this API allows precise targeting of specific cluster members rather than broadcasting to
534    /// all members.
535    ///
536    /// Each cluster member sends its local stream elements, and they are collected at each
537    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
538    ///
539    /// # Example
540    /// ```rust
541    /// # use hydro_lang::prelude::*;
542    /// # use futures::StreamExt;
543    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
544    /// # type Source = ();
545    /// # type Destination = ();
546    /// let source: Cluster<Source> = flow.cluster::<Source>();
547    /// let to_send: Stream<_, Cluster<_>, _> = source
548    ///     .source_iter(q!(vec![0, 1, 2, 3]))
549    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)));
550    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
551    /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
552    /// # all_received.entries().send_bincode(&p2).entries()
553    /// # }, |mut stream| async move {
554    /// // if there are 4 members in the destination cluster, each receives one message from each source member
555    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
556    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
557    /// // - ...
558    /// # let mut results = Vec::new();
559    /// # for w in 0..16 {
560    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
561    /// # }
562    /// # results.sort();
563    /// # assert_eq!(results, vec![
564    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
565    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
566    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
567    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
568    /// # ]);
569    /// # }));
570    /// ```
571    pub fn demux_bincode(
572        self,
573        other: &Cluster<'a, L2>,
574    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
575    where
576        T: Serialize + DeserializeOwned,
577    {
578        self.into_keyed().demux_bincode(other)
579    }
580}
581
582#[cfg(test)]
583mod tests {
584    use stageleft::q;
585
586    use crate::location::{Location, MemberId};
587    use crate::nondet::nondet;
588    use crate::prelude::FlowBuilder;
589
590    #[test]
591    fn sim_send_bincode_o2o() {
592        let flow = FlowBuilder::new();
593        let external = flow.external::<()>();
594        let node = flow.process::<()>();
595        let node2 = flow.process::<()>();
596
597        let (port, input) = node.source_external_bincode(&external);
598
599        let out_port = input
600            .send_bincode(&node2)
601            .batch(&node2.tick(), nondet!(/** test */))
602            .count()
603            .all_ticks()
604            .send_bincode_external(&external);
605
606        let instances = flow.sim().exhaustive(async |mut compiled| {
607            let in_send = compiled.connect(&port);
608            let out_recv = compiled.connect(&out_port);
609            compiled.launch();
610
611            in_send.send(()).unwrap();
612            in_send.send(()).unwrap();
613            in_send.send(()).unwrap();
614
615            let received = out_recv.collect::<Vec<_>>().await;
616            assert!(received.into_iter().sum::<usize>() == 3);
617        });
618
619        assert_eq!(instances, 4); // 2^{3 - 1}
620    }
621
622    #[test]
623    fn sim_send_bincode_m2o() {
624        let flow = FlowBuilder::new();
625        let external = flow.external::<()>();
626        let cluster = flow.cluster::<()>();
627        let node = flow.process::<()>();
628
629        let input = cluster.source_iter(q!(vec![1]));
630
631        let out_port = input
632            .send_bincode(&node)
633            .entries()
634            .batch(&node.tick(), nondet!(/** test */))
635            .all_ticks()
636            .send_bincode_external(&external);
637
638        let instances =
639            flow.sim()
640                .with_cluster_size(&cluster, 4)
641                .exhaustive(async |mut compiled| {
642                    let out_recv = compiled.connect(&out_port);
643                    compiled.launch();
644
645                    out_recv
646                        .assert_yields_only_unordered(vec![
647                            (MemberId::from_raw(0), 1),
648                            (MemberId::from_raw(1), 1),
649                            (MemberId::from_raw(2), 1),
650                            (MemberId::from_raw(3), 1),
651                        ])
652                        .await
653                });
654
655        assert_eq!(instances, 75); // ∑ (k=1 to 4) S(4,k) × k! = 75
656    }
657
658    #[test]
659    fn sim_send_bincode_multiple_m2o() {
660        let flow = FlowBuilder::new();
661        let external = flow.external::<()>();
662        let cluster1 = flow.cluster::<()>();
663        let cluster2 = flow.cluster::<()>();
664        let node = flow.process::<()>();
665
666        let out_port_1 = cluster1
667            .source_iter(q!(vec![1]))
668            .send_bincode(&node)
669            .entries()
670            .send_bincode_external(&external);
671
672        let out_port_2 = cluster2
673            .source_iter(q!(vec![2]))
674            .send_bincode(&node)
675            .entries()
676            .send_bincode_external(&external);
677
678        let instances = flow
679            .sim()
680            .with_cluster_size(&cluster1, 3)
681            .with_cluster_size(&cluster2, 4)
682            .exhaustive(async |mut compiled| {
683                let out_recv_1 = compiled.connect(&out_port_1);
684                let out_recv_2 = compiled.connect(&out_port_2);
685                compiled.launch();
686
687                out_recv_1
688                    .assert_yields_only_unordered(vec![
689                        (MemberId::from_raw(0), 1),
690                        (MemberId::from_raw(1), 1),
691                        (MemberId::from_raw(2), 1),
692                    ])
693                    .await;
694
695                out_recv_2
696                    .assert_yields_only_unordered(vec![
697                        (MemberId::from_raw(0), 2),
698                        (MemberId::from_raw(1), 2),
699                        (MemberId::from_raw(2), 2),
700                        (MemberId::from_raw(3), 2),
701                    ])
702                    .await;
703            });
704
705        assert_eq!(instances, 1);
706    }
707
708    #[test]
709    fn sim_send_bincode_o2m() {
710        let flow = FlowBuilder::new();
711        let external = flow.external::<()>();
712        let cluster = flow.cluster::<()>();
713        let node = flow.process::<()>();
714
715        let input = node.source_iter(q!(vec![
716            (MemberId::from_raw(0), 123),
717            (MemberId::from_raw(1), 456),
718        ]));
719
720        let out_port = input
721            .demux_bincode(&cluster)
722            .map(q!(|x| x + 1))
723            .send_bincode(&node)
724            .entries()
725            .send_bincode_external(&external);
726
727        flow.sim()
728            .with_cluster_size(&cluster, 4)
729            .exhaustive(async |mut compiled| {
730                let out_recv = compiled.connect(&out_port);
731                compiled.launch();
732
733                out_recv
734                    .assert_yields_only_unordered(vec![
735                        (MemberId::from_raw(0), 124),
736                        (MemberId::from_raw(1), 457),
737                    ])
738                    .await
739            });
740    }
741
742    #[test]
743    fn sim_broadcast_bincode_o2m() {
744        let flow = FlowBuilder::new();
745        let external = flow.external::<()>();
746        let cluster = flow.cluster::<()>();
747        let node = flow.process::<()>();
748
749        let input = node.source_iter(q!(vec![123, 456]));
750
751        let out_port = input
752            .broadcast_bincode(&cluster, nondet!(/** test */))
753            .map(q!(|x| x + 1))
754            .send_bincode(&node)
755            .entries()
756            .send_bincode_external(&external);
757
758        let mut c_1_produced = false;
759        let mut c_2_produced = false;
760
761        flow.sim()
762            .with_cluster_size(&cluster, 2)
763            .exhaustive(async |mut compiled| {
764                let out_recv = compiled.connect(&out_port);
765                compiled.launch();
766
767                let all_out = out_recv.collect_sorted::<Vec<_>>().await;
768
769                // check that order is preserved
770                if all_out.contains(&(MemberId::from_raw(0), 124)) {
771                    assert!(all_out.contains(&(MemberId::from_raw(0), 457)));
772                    c_1_produced = true;
773                }
774
775                if all_out.contains(&(MemberId::from_raw(1), 124)) {
776                    assert!(all_out.contains(&(MemberId::from_raw(1), 457)));
777                    c_2_produced = true;
778                }
779            });
780
781        assert!(c_1_produced && c_2_produced); // in at least one execution each, the cluster member received both messages
782    }
783
784    #[test]
785    fn sim_send_bincode_m2m() {
786        let flow = FlowBuilder::new();
787        let external = flow.external::<()>();
788        let cluster = flow.cluster::<()>();
789        let node = flow.process::<()>();
790
791        let input = node.source_iter(q!(vec![
792            (MemberId::from_raw(0), 123),
793            (MemberId::from_raw(1), 456),
794        ]));
795
796        let out_port = input
797            .demux_bincode(&cluster)
798            .map(q!(|x| x + 1))
799            .flat_map_ordered(q!(|x| vec![
800                (MemberId::from_raw(0), x),
801                (MemberId::from_raw(1), x),
802            ]))
803            .demux_bincode(&cluster)
804            .entries()
805            .send_bincode(&node)
806            .entries()
807            .send_bincode_external(&external);
808
809        flow.sim()
810            .with_cluster_size(&cluster, 4)
811            .exhaustive(async |mut compiled| {
812                let out_recv = compiled.connect(&out_port);
813                compiled.launch();
814
815                out_recv
816                    .assert_yields_only_unordered(vec![
817                        (MemberId::from_raw(0), (MemberId::from_raw(0), 124)),
818                        (MemberId::from_raw(0), (MemberId::from_raw(1), 457)),
819                        (MemberId::from_raw(1), (MemberId::from_raw(0), 124)),
820                        (MemberId::from_raw(1), (MemberId::from_raw(1), 457)),
821                    ])
822                    .await
823            });
824    }
825}