1use core::panic;
2use std::cell::RefCell;
3#[cfg(feature = "build")]
4use std::collections::BTreeMap;
5use std::collections::HashMap;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use syn::parse_quote;
21use syn::visit::{self, Visit};
22use syn::visit_mut::VisitMut;
23
24#[cfg(feature = "build")]
25use crate::compile::deploy_provider::{Deploy, RegisterPort};
26use crate::location::NetworkHint;
27use crate::location::dynamic::LocationId;
28
29pub mod backtrace;
30use backtrace::Backtrace;
31
32#[derive(Clone, Hash)]
36pub struct DebugExpr(pub Box<syn::Expr>);
37
38impl From<syn::Expr> for DebugExpr {
39 fn from(expr: syn::Expr) -> Self {
40 Self(Box::new(expr))
41 }
42}
43
44impl Deref for DebugExpr {
45 type Target = syn::Expr;
46
47 fn deref(&self) -> &Self::Target {
48 &self.0
49 }
50}
51
52impl ToTokens for DebugExpr {
53 fn to_tokens(&self, tokens: &mut TokenStream) {
54 self.0.to_tokens(tokens);
55 }
56}
57
58impl Debug for DebugExpr {
59 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60 write!(f, "{}", self.0.to_token_stream())
61 }
62}
63
64impl Display for DebugExpr {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 let original = self.0.as_ref().clone();
67 let simplified = simplify_q_macro(original);
68
69 write!(f, "q!({})", quote::quote!(#simplified))
72 }
73}
74
75fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
77 let mut simplifier = QMacroSimplifier::new();
80 simplifier.visit_expr_mut(&mut expr);
81
82 if let Some(simplified) = simplifier.simplified_result {
84 simplified
85 } else {
86 expr
87 }
88}
89
90#[derive(Default)]
92pub struct QMacroSimplifier {
93 pub simplified_result: Option<syn::Expr>,
94}
95
96impl QMacroSimplifier {
97 pub fn new() -> Self {
98 Self::default()
99 }
100}
101
102impl VisitMut for QMacroSimplifier {
103 fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
104 if self.simplified_result.is_some() {
106 return;
107 }
108
109 if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
110 && self.is_stageleft_runtime_support_call(&path_expr.path)
112 && let Some(closure) = self.extract_closure_from_args(&call.args)
114 {
115 self.simplified_result = Some(closure);
116 return;
117 }
118
119 syn::visit_mut::visit_expr_mut(self, expr);
122 }
123}
124
125impl QMacroSimplifier {
126 fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
127 if let Some(last_segment) = path.segments.last() {
129 let fn_name = last_segment.ident.to_string();
130 fn_name.contains("_type_hint")
132 && path.segments.len() > 2
133 && path.segments[0].ident == "stageleft"
134 && path.segments[1].ident == "runtime_support"
135 } else {
136 false
137 }
138 }
139
140 fn extract_closure_from_args(
141 &self,
142 args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
143 ) -> Option<syn::Expr> {
144 for arg in args {
146 if let syn::Expr::Closure(_) = arg {
147 return Some(arg.clone());
148 }
149 if let Some(closure_expr) = self.find_closure_in_expr(arg) {
151 return Some(closure_expr);
152 }
153 }
154 None
155 }
156
157 fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
158 let mut visitor = ClosureFinder {
159 found_closure: None,
160 prefer_inner_blocks: true,
161 };
162 visitor.visit_expr(expr);
163 visitor.found_closure
164 }
165}
166
167struct ClosureFinder {
169 found_closure: Option<syn::Expr>,
170 prefer_inner_blocks: bool,
171}
172
173impl<'ast> Visit<'ast> for ClosureFinder {
174 fn visit_expr(&mut self, expr: &'ast syn::Expr) {
175 if self.found_closure.is_some() {
177 return;
178 }
179
180 match expr {
181 syn::Expr::Closure(_) => {
182 self.found_closure = Some(expr.clone());
183 }
184 syn::Expr::Block(block) if self.prefer_inner_blocks => {
185 for stmt in &block.block.stmts {
187 if let syn::Stmt::Expr(stmt_expr, _) = stmt
188 && let syn::Expr::Block(_) = stmt_expr
189 {
190 let mut inner_visitor = ClosureFinder {
192 found_closure: None,
193 prefer_inner_blocks: false, };
195 inner_visitor.visit_expr(stmt_expr);
196 if inner_visitor.found_closure.is_some() {
197 self.found_closure = Some(stmt_expr.clone());
199 return;
200 }
201 }
202 }
203
204 visit::visit_expr(self, expr);
206
207 if self.found_closure.is_some() {
210 }
212 }
213 _ => {
214 visit::visit_expr(self, expr);
216 }
217 }
218 }
219}
220
221#[derive(Clone, PartialEq, Eq, Hash)]
225pub struct DebugType(pub Box<syn::Type>);
226
227impl From<syn::Type> for DebugType {
228 fn from(t: syn::Type) -> Self {
229 Self(Box::new(t))
230 }
231}
232
233impl Deref for DebugType {
234 type Target = syn::Type;
235
236 fn deref(&self) -> &Self::Target {
237 &self.0
238 }
239}
240
241impl ToTokens for DebugType {
242 fn to_tokens(&self, tokens: &mut TokenStream) {
243 self.0.to_tokens(tokens);
244 }
245}
246
247impl Debug for DebugType {
248 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249 write!(f, "{}", self.0.to_token_stream())
250 }
251}
252
253pub enum DebugInstantiate {
254 Building,
255 Finalized(Box<DebugInstantiateFinalized>),
256}
257
258#[cfg_attr(
259 not(feature = "build"),
260 expect(
261 dead_code,
262 reason = "sink, source unused without `feature = \"build\"`."
263 )
264)]
265pub struct DebugInstantiateFinalized {
266 sink: syn::Expr,
267 source: syn::Expr,
268 connect_fn: Option<Box<dyn FnOnce()>>,
269}
270
271impl From<DebugInstantiateFinalized> for DebugInstantiate {
272 fn from(f: DebugInstantiateFinalized) -> Self {
273 Self::Finalized(Box::new(f))
274 }
275}
276
277impl Debug for DebugInstantiate {
278 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279 write!(f, "<network instantiate>")
280 }
281}
282
283impl Hash for DebugInstantiate {
284 fn hash<H: Hasher>(&self, _state: &mut H) {
285 }
287}
288
289impl Clone for DebugInstantiate {
290 fn clone(&self) -> Self {
291 match self {
292 DebugInstantiate::Building => DebugInstantiate::Building,
293 DebugInstantiate::Finalized(_) => {
294 panic!("DebugInstantiate::Finalized should not be cloned")
295 }
296 }
297 }
298}
299
300#[derive(Debug, Hash, Clone)]
302pub enum HydroSource {
303 Stream(DebugExpr),
304 ExternalNetwork(),
305 Iter(DebugExpr),
306 Spin(),
307}
308
309#[cfg(feature = "build")]
310pub trait DfirBuilder {
316 fn singleton_intermediates(&self) -> bool;
318
319 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
321
322 fn batch(
323 &mut self,
324 in_ident: syn::Ident,
325 in_location: &LocationId,
326 in_kind: &CollectionKind,
327 out_ident: &syn::Ident,
328 out_location: &LocationId,
329 op_meta: &HydroIrOpMetadata,
330 );
331 fn yield_from_tick(
332 &mut self,
333 in_ident: syn::Ident,
334 in_location: &LocationId,
335 in_kind: &CollectionKind,
336 out_ident: &syn::Ident,
337 out_location: &LocationId,
338 );
339
340 fn begin_atomic(
341 &mut self,
342 in_ident: syn::Ident,
343 in_location: &LocationId,
344 in_kind: &CollectionKind,
345 out_ident: &syn::Ident,
346 out_location: &LocationId,
347 op_meta: &HydroIrOpMetadata,
348 );
349 fn end_atomic(
350 &mut self,
351 in_ident: syn::Ident,
352 in_location: &LocationId,
353 in_kind: &CollectionKind,
354 out_ident: &syn::Ident,
355 );
356
357 fn observe_nondet(
358 &mut self,
359 trusted: bool,
360 location: &LocationId,
361 in_ident: syn::Ident,
362 in_kind: &CollectionKind,
363 out_ident: &syn::Ident,
364 out_kind: &CollectionKind,
365 );
366
367 #[expect(clippy::too_many_arguments, reason = "TODO")]
368 fn create_network(
369 &mut self,
370 from: &LocationId,
371 to: &LocationId,
372 input_ident: syn::Ident,
373 out_ident: &syn::Ident,
374 serialize: &Option<DebugExpr>,
375 sink: syn::Expr,
376 source: syn::Expr,
377 deserialize: &Option<DebugExpr>,
378 tag_id: usize,
379 );
380
381 fn create_external_source(
382 &mut self,
383 on: &LocationId,
384 source_expr: syn::Expr,
385 out_ident: &syn::Ident,
386 deserialize: &Option<DebugExpr>,
387 tag_id: usize,
388 );
389
390 fn create_external_output(
391 &mut self,
392 on: &LocationId,
393 sink_expr: syn::Expr,
394 input_ident: &syn::Ident,
395 serialize: &Option<DebugExpr>,
396 tag_id: usize,
397 );
398}
399
400#[cfg(feature = "build")]
401impl DfirBuilder for BTreeMap<usize, FlatGraphBuilder> {
402 fn singleton_intermediates(&self) -> bool {
403 false
404 }
405
406 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
407 self.entry(location.root().raw_id()).or_default()
408 }
409
410 fn batch(
411 &mut self,
412 in_ident: syn::Ident,
413 in_location: &LocationId,
414 _in_kind: &CollectionKind,
415 out_ident: &syn::Ident,
416 _out_location: &LocationId,
417 _op_meta: &HydroIrOpMetadata,
418 ) {
419 let builder = self.get_dfir_mut(in_location.root());
420 builder.add_dfir(
421 parse_quote! {
422 #out_ident = #in_ident;
423 },
424 None,
425 None,
426 );
427 }
428
429 fn yield_from_tick(
430 &mut self,
431 in_ident: syn::Ident,
432 in_location: &LocationId,
433 _in_kind: &CollectionKind,
434 out_ident: &syn::Ident,
435 _out_location: &LocationId,
436 ) {
437 let builder = self.get_dfir_mut(in_location.root());
438 builder.add_dfir(
439 parse_quote! {
440 #out_ident = #in_ident;
441 },
442 None,
443 None,
444 );
445 }
446
447 fn begin_atomic(
448 &mut self,
449 in_ident: syn::Ident,
450 in_location: &LocationId,
451 _in_kind: &CollectionKind,
452 out_ident: &syn::Ident,
453 _out_location: &LocationId,
454 _op_meta: &HydroIrOpMetadata,
455 ) {
456 let builder = self.get_dfir_mut(in_location.root());
457 builder.add_dfir(
458 parse_quote! {
459 #out_ident = #in_ident;
460 },
461 None,
462 None,
463 );
464 }
465
466 fn end_atomic(
467 &mut self,
468 in_ident: syn::Ident,
469 in_location: &LocationId,
470 _in_kind: &CollectionKind,
471 out_ident: &syn::Ident,
472 ) {
473 let builder = self.get_dfir_mut(in_location.root());
474 builder.add_dfir(
475 parse_quote! {
476 #out_ident = #in_ident;
477 },
478 None,
479 None,
480 );
481 }
482
483 fn observe_nondet(
484 &mut self,
485 _trusted: bool,
486 location: &LocationId,
487 in_ident: syn::Ident,
488 _in_kind: &CollectionKind,
489 out_ident: &syn::Ident,
490 _out_kind: &CollectionKind,
491 ) {
492 let builder = self.get_dfir_mut(location);
493 builder.add_dfir(
494 parse_quote! {
495 #out_ident = #in_ident;
496 },
497 None,
498 None,
499 );
500 }
501
502 fn create_network(
503 &mut self,
504 from: &LocationId,
505 to: &LocationId,
506 input_ident: syn::Ident,
507 out_ident: &syn::Ident,
508 serialize: &Option<DebugExpr>,
509 sink: syn::Expr,
510 source: syn::Expr,
511 deserialize: &Option<DebugExpr>,
512 tag_id: usize,
513 ) {
514 let sender_builder = self.get_dfir_mut(from);
515 if let Some(serialize_pipeline) = serialize {
516 sender_builder.add_dfir(
517 parse_quote! {
518 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
519 },
520 None,
521 Some(&format!("send{}", tag_id)),
523 );
524 } else {
525 sender_builder.add_dfir(
526 parse_quote! {
527 #input_ident -> dest_sink(#sink);
528 },
529 None,
530 Some(&format!("send{}", tag_id)),
531 );
532 }
533
534 let receiver_builder = self.get_dfir_mut(to);
535 if let Some(deserialize_pipeline) = deserialize {
536 receiver_builder.add_dfir(
537 parse_quote! {
538 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
539 },
540 None,
541 Some(&format!("recv{}", tag_id)),
542 );
543 } else {
544 receiver_builder.add_dfir(
545 parse_quote! {
546 #out_ident = source_stream(#source);
547 },
548 None,
549 Some(&format!("recv{}", tag_id)),
550 );
551 }
552 }
553
554 fn create_external_source(
555 &mut self,
556 on: &LocationId,
557 source_expr: syn::Expr,
558 out_ident: &syn::Ident,
559 deserialize: &Option<DebugExpr>,
560 tag_id: usize,
561 ) {
562 let receiver_builder = self.get_dfir_mut(on);
563 if let Some(deserialize_pipeline) = deserialize {
564 receiver_builder.add_dfir(
565 parse_quote! {
566 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
567 },
568 None,
569 Some(&format!("recv{}", tag_id)),
570 );
571 } else {
572 receiver_builder.add_dfir(
573 parse_quote! {
574 #out_ident = source_stream(#source_expr);
575 },
576 None,
577 Some(&format!("recv{}", tag_id)),
578 );
579 }
580 }
581
582 fn create_external_output(
583 &mut self,
584 on: &LocationId,
585 sink_expr: syn::Expr,
586 input_ident: &syn::Ident,
587 serialize: &Option<DebugExpr>,
588 tag_id: usize,
589 ) {
590 let sender_builder = self.get_dfir_mut(on);
591 if let Some(serialize_fn) = serialize {
592 sender_builder.add_dfir(
593 parse_quote! {
594 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
595 },
596 None,
597 Some(&format!("send{}", tag_id)),
599 );
600 } else {
601 sender_builder.add_dfir(
602 parse_quote! {
603 #input_ident -> dest_sink(#sink_expr);
604 },
605 None,
606 Some(&format!("send{}", tag_id)),
607 );
608 }
609 }
610}
611
612#[cfg(feature = "build")]
613pub enum BuildersOrCallback<'a, L, N>
614where
615 L: FnMut(&mut HydroRoot, &mut usize),
616 N: FnMut(&mut HydroNode, &mut usize),
617{
618 Builders(&'a mut dyn DfirBuilder),
619 Callback(L, N),
620}
621
622#[derive(Debug, Hash)]
626pub enum HydroRoot {
627 ForEach {
628 f: DebugExpr,
629 input: Box<HydroNode>,
630 op_metadata: HydroIrOpMetadata,
631 },
632 SendExternal {
633 to_external_id: usize,
634 to_key: usize,
635 to_many: bool,
636 serialize_fn: Option<DebugExpr>,
637 instantiate_fn: DebugInstantiate,
638 input: Box<HydroNode>,
639 op_metadata: HydroIrOpMetadata,
640 },
641 DestSink {
642 sink: DebugExpr,
643 input: Box<HydroNode>,
644 op_metadata: HydroIrOpMetadata,
645 },
646 CycleSink {
647 ident: syn::Ident,
648 input: Box<HydroNode>,
649 op_metadata: HydroIrOpMetadata,
650 },
651}
652
653impl HydroRoot {
654 #[cfg(feature = "build")]
655 pub fn compile_network<'a, D>(
656 &mut self,
657 compile_env: &D::CompileEnv,
658 extra_stmts: &mut BTreeMap<usize, Vec<syn::Stmt>>,
659 seen_tees: &mut SeenTees,
660 processes: &HashMap<usize, D::Process>,
661 clusters: &HashMap<usize, D::Cluster>,
662 externals: &HashMap<usize, D::External>,
663 ) where
664 D: Deploy<'a>,
665 {
666 self.transform_bottom_up(
667 &mut |l| {
668 if let HydroRoot::SendExternal {
669 input,
670 to_external_id,
671 to_key,
672 to_many,
673 instantiate_fn,
674 ..
675 } = l
676 {
677 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
678 DebugInstantiate::Building => {
679 let to_node = externals
680 .get(to_external_id)
681 .unwrap_or_else(|| {
682 panic!("A external used in the graph was not instantiated: {}", to_external_id)
683 })
684 .clone();
685
686 match input.metadata().location_kind.root() {
687 LocationId::Process(process_id) => {
688 if *to_many {
689 (
690 (
691 D::e2o_many_sink(format!("{}_{}", *to_external_id, *to_key)),
692 parse_quote!(DUMMY),
693 ),
694 Box::new(|| {}) as Box<dyn FnOnce()>,
695 )
696 } else {
697 let from_node = processes
698 .get(process_id)
699 .unwrap_or_else(|| {
700 panic!("A process used in the graph was not instantiated: {}", process_id)
701 })
702 .clone();
703
704 let sink_port = D::allocate_process_port(&from_node);
705 let source_port = D::allocate_external_port(&to_node);
706
707 to_node.register(*to_key, source_port.clone());
708
709 (
710 (
711 D::o2e_sink(compile_env, &from_node, &sink_port, &to_node, &source_port),
712 parse_quote!(DUMMY),
713 ),
714 D::o2e_connect(&from_node, &sink_port, &to_node, &source_port),
715 )
716 }
717 }
718 LocationId::Cluster(_) => todo!(),
719 _ => panic!()
720 }
721 },
722
723 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
724 };
725
726 *instantiate_fn = DebugInstantiateFinalized {
727 sink: sink_expr,
728 source: source_expr,
729 connect_fn: Some(connect_fn),
730 }
731 .into();
732 }
733 },
734 &mut |n| {
735 if let HydroNode::Network {
736 input,
737 instantiate_fn,
738 metadata,
739 ..
740 } = n
741 {
742 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
743 DebugInstantiate::Building => instantiate_network::<D>(
744 input.metadata().location_kind.root(),
745 metadata.location_kind.root(),
746 processes,
747 clusters,
748 compile_env,
749 ),
750
751 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
752 };
753
754 *instantiate_fn = DebugInstantiateFinalized {
755 sink: sink_expr,
756 source: source_expr,
757 connect_fn: Some(connect_fn),
758 }
759 .into();
760 } else if let HydroNode::ExternalInput {
761 from_external_id,
762 from_key,
763 from_many,
764 codec_type,
765 port_hint,
766 instantiate_fn,
767 metadata,
768 ..
769 } = n
770 {
771 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
772 DebugInstantiate::Building => {
773 let from_node = externals
774 .get(from_external_id)
775 .unwrap_or_else(|| {
776 panic!(
777 "A external used in the graph was not instantiated: {}",
778 from_external_id
779 )
780 })
781 .clone();
782
783 match metadata.location_kind.root() {
784 LocationId::Process(process_id) => {
785 let to_node = processes
786 .get(process_id)
787 .unwrap_or_else(|| {
788 panic!("A process used in the graph was not instantiated: {}", process_id)
789 })
790 .clone();
791
792 let sink_port = D::allocate_external_port(&from_node);
793 let source_port = D::allocate_process_port(&to_node);
794
795 from_node.register(*from_key, sink_port.clone());
796
797 (
798 (
799 parse_quote!(DUMMY),
800 if *from_many {
801 D::e2o_many_source(
802 compile_env,
803 extra_stmts.entry(*process_id).or_default(),
804 &to_node, &source_port,
805 codec_type.0.as_ref(),
806 format!("{}_{}", *from_external_id, *from_key)
807 )
808 } else {
809 D::e2o_source(compile_env, &from_node, &sink_port, &to_node, &source_port)
810 },
811 ),
812 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
813 )
814 }
815 LocationId::Cluster(_) => todo!(),
816 _ => panic!()
817 }
818 },
819
820 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
821 };
822
823 *instantiate_fn = DebugInstantiateFinalized {
824 sink: sink_expr,
825 source: source_expr,
826 connect_fn: Some(connect_fn),
827 }
828 .into();
829 }
830 },
831 seen_tees,
832 false,
833 );
834 }
835
836 pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
837 self.transform_bottom_up(
838 &mut |l| {
839 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
840 match instantiate_fn {
841 DebugInstantiate::Building => panic!("network not built"),
842
843 DebugInstantiate::Finalized(finalized) => {
844 (finalized.connect_fn.take().unwrap())();
845 }
846 }
847 }
848 },
849 &mut |n| {
850 if let HydroNode::Network { instantiate_fn, .. }
851 | HydroNode::ExternalInput { instantiate_fn, .. } = n
852 {
853 match instantiate_fn {
854 DebugInstantiate::Building => panic!("network not built"),
855
856 DebugInstantiate::Finalized(finalized) => {
857 (finalized.connect_fn.take().unwrap())();
858 }
859 }
860 }
861 },
862 seen_tees,
863 false,
864 );
865 }
866
867 pub fn transform_bottom_up(
868 &mut self,
869 transform_root: &mut impl FnMut(&mut HydroRoot),
870 transform_node: &mut impl FnMut(&mut HydroNode),
871 seen_tees: &mut SeenTees,
872 check_well_formed: bool,
873 ) {
874 self.transform_children(
875 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
876 seen_tees,
877 );
878
879 transform_root(self);
880 }
881
882 pub fn transform_children(
883 &mut self,
884 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
885 seen_tees: &mut SeenTees,
886 ) {
887 match self {
888 HydroRoot::ForEach { input, .. }
889 | HydroRoot::SendExternal { input, .. }
890 | HydroRoot::DestSink { input, .. }
891 | HydroRoot::CycleSink { input, .. } => {
892 transform(input, seen_tees);
893 }
894 }
895 }
896
897 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroRoot {
898 match self {
899 HydroRoot::ForEach {
900 f,
901 input,
902 op_metadata,
903 } => HydroRoot::ForEach {
904 f: f.clone(),
905 input: Box::new(input.deep_clone(seen_tees)),
906 op_metadata: op_metadata.clone(),
907 },
908 HydroRoot::SendExternal {
909 to_external_id,
910 to_key,
911 to_many,
912 serialize_fn,
913 instantiate_fn,
914 input,
915 op_metadata,
916 } => HydroRoot::SendExternal {
917 to_external_id: *to_external_id,
918 to_key: *to_key,
919 to_many: *to_many,
920 serialize_fn: serialize_fn.clone(),
921 instantiate_fn: instantiate_fn.clone(),
922 input: Box::new(input.deep_clone(seen_tees)),
923 op_metadata: op_metadata.clone(),
924 },
925 HydroRoot::DestSink {
926 sink,
927 input,
928 op_metadata,
929 } => HydroRoot::DestSink {
930 sink: sink.clone(),
931 input: Box::new(input.deep_clone(seen_tees)),
932 op_metadata: op_metadata.clone(),
933 },
934 HydroRoot::CycleSink {
935 ident,
936 input,
937 op_metadata,
938 } => HydroRoot::CycleSink {
939 ident: ident.clone(),
940 input: Box::new(input.deep_clone(seen_tees)),
941 op_metadata: op_metadata.clone(),
942 },
943 }
944 }
945
946 #[cfg(feature = "build")]
947 pub fn emit(
948 &mut self,
949 graph_builders: &mut dyn DfirBuilder,
950 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
951 next_stmt_id: &mut usize,
952 ) {
953 self.emit_core(
954 &mut BuildersOrCallback::Builders::<
955 fn(&mut HydroRoot, &mut usize),
956 fn(&mut HydroNode, &mut usize),
957 >(graph_builders),
958 built_tees,
959 next_stmt_id,
960 );
961 }
962
963 #[cfg(feature = "build")]
964 pub fn emit_core(
965 &mut self,
966 builders_or_callback: &mut BuildersOrCallback<
967 impl FnMut(&mut HydroRoot, &mut usize),
968 impl FnMut(&mut HydroNode, &mut usize),
969 >,
970 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
971 next_stmt_id: &mut usize,
972 ) {
973 match self {
974 HydroRoot::ForEach { f, input, .. } => {
975 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
976
977 match builders_or_callback {
978 BuildersOrCallback::Builders(graph_builders) => {
979 graph_builders
980 .get_dfir_mut(&input.metadata().location_kind)
981 .add_dfir(
982 parse_quote! {
983 #input_ident -> for_each(#f);
984 },
985 None,
986 Some(&next_stmt_id.to_string()),
987 );
988 }
989 BuildersOrCallback::Callback(leaf_callback, _) => {
990 leaf_callback(self, next_stmt_id);
991 }
992 }
993
994 *next_stmt_id += 1;
995 }
996
997 HydroRoot::SendExternal {
998 serialize_fn,
999 instantiate_fn,
1000 input,
1001 ..
1002 } => {
1003 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1004
1005 match builders_or_callback {
1006 BuildersOrCallback::Builders(graph_builders) => {
1007 let (sink_expr, _) = match instantiate_fn {
1008 DebugInstantiate::Building => (
1009 syn::parse_quote!(DUMMY_SINK),
1010 syn::parse_quote!(DUMMY_SOURCE),
1011 ),
1012
1013 DebugInstantiate::Finalized(finalized) => {
1014 (finalized.sink.clone(), finalized.source.clone())
1015 }
1016 };
1017
1018 graph_builders.create_external_output(
1019 &input.metadata().location_kind,
1020 sink_expr,
1021 &input_ident,
1022 serialize_fn,
1023 *next_stmt_id,
1024 );
1025 }
1026 BuildersOrCallback::Callback(leaf_callback, _) => {
1027 leaf_callback(self, next_stmt_id);
1028 }
1029 }
1030
1031 *next_stmt_id += 1;
1032 }
1033
1034 HydroRoot::DestSink { sink, input, .. } => {
1035 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1036
1037 match builders_or_callback {
1038 BuildersOrCallback::Builders(graph_builders) => {
1039 graph_builders
1040 .get_dfir_mut(&input.metadata().location_kind)
1041 .add_dfir(
1042 parse_quote! {
1043 #input_ident -> dest_sink(#sink);
1044 },
1045 None,
1046 Some(&next_stmt_id.to_string()),
1047 );
1048 }
1049 BuildersOrCallback::Callback(leaf_callback, _) => {
1050 leaf_callback(self, next_stmt_id);
1051 }
1052 }
1053
1054 *next_stmt_id += 1;
1055 }
1056
1057 HydroRoot::CycleSink { ident, input, .. } => {
1058 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1059
1060 match builders_or_callback {
1061 BuildersOrCallback::Builders(graph_builders) => {
1062 graph_builders
1063 .get_dfir_mut(&input.metadata().location_kind)
1064 .add_dfir(
1065 parse_quote! {
1066 #ident = #input_ident;
1067 },
1068 None,
1069 None,
1070 );
1071 }
1072 BuildersOrCallback::Callback(_, _) => {}
1074 }
1075 }
1076 }
1077 }
1078
1079 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1080 match self {
1081 HydroRoot::ForEach { op_metadata, .. }
1082 | HydroRoot::SendExternal { op_metadata, .. }
1083 | HydroRoot::DestSink { op_metadata, .. }
1084 | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
1085 }
1086 }
1087
1088 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1089 match self {
1090 HydroRoot::ForEach { op_metadata, .. }
1091 | HydroRoot::SendExternal { op_metadata, .. }
1092 | HydroRoot::DestSink { op_metadata, .. }
1093 | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
1094 }
1095 }
1096
1097 pub fn input(&self) -> &HydroNode {
1098 match self {
1099 HydroRoot::ForEach { input, .. }
1100 | HydroRoot::SendExternal { input, .. }
1101 | HydroRoot::DestSink { input, .. }
1102 | HydroRoot::CycleSink { input, .. } => input,
1103 }
1104 }
1105
1106 pub fn input_metadata(&self) -> &HydroIrMetadata {
1107 self.input().metadata()
1108 }
1109
1110 pub fn print_root(&self) -> String {
1111 match self {
1112 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1113 HydroRoot::SendExternal { .. } => "SendExternal".to_string(),
1114 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1115 HydroRoot::CycleSink { ident, .. } => format!("CycleSink({:?})", ident),
1116 }
1117 }
1118
1119 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1120 match self {
1121 HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1122 transform(f);
1123 }
1124 HydroRoot::SendExternal { .. } | HydroRoot::CycleSink { .. } => {}
1125 }
1126 }
1127}
1128
1129#[cfg(feature = "build")]
1130pub fn emit(ir: &mut Vec<HydroRoot>) -> BTreeMap<usize, FlatGraphBuilder> {
1131 let mut builders = BTreeMap::new();
1132 let mut built_tees = HashMap::new();
1133 let mut next_stmt_id = 0;
1134 for leaf in ir {
1135 leaf.emit(&mut builders, &mut built_tees, &mut next_stmt_id);
1136 }
1137 builders
1138}
1139
1140#[cfg(feature = "build")]
1141pub fn traverse_dfir(
1142 ir: &mut [HydroRoot],
1143 transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1144 transform_node: impl FnMut(&mut HydroNode, &mut usize),
1145) {
1146 let mut seen_tees = HashMap::new();
1147 let mut next_stmt_id = 0;
1148 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1149 ir.iter_mut().for_each(|leaf| {
1150 leaf.emit_core(&mut callback, &mut seen_tees, &mut next_stmt_id);
1151 });
1152}
1153
1154pub fn transform_bottom_up(
1155 ir: &mut [HydroRoot],
1156 transform_root: &mut impl FnMut(&mut HydroRoot),
1157 transform_node: &mut impl FnMut(&mut HydroNode),
1158 check_well_formed: bool,
1159) {
1160 let mut seen_tees = HashMap::new();
1161 ir.iter_mut().for_each(|leaf| {
1162 leaf.transform_bottom_up(
1163 transform_root,
1164 transform_node,
1165 &mut seen_tees,
1166 check_well_formed,
1167 );
1168 });
1169}
1170
1171pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1172 let mut seen_tees = HashMap::new();
1173 ir.iter()
1174 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1175 .collect()
1176}
1177
1178type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1179thread_local! {
1180 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1181}
1182
1183pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1184 PRINTED_TEES.with(|printed_tees| {
1185 let mut printed_tees_mut = printed_tees.borrow_mut();
1186 *printed_tees_mut = Some((0, HashMap::new()));
1187 drop(printed_tees_mut);
1188
1189 let ret = f();
1190
1191 let mut printed_tees_mut = printed_tees.borrow_mut();
1192 *printed_tees_mut = None;
1193
1194 ret
1195 })
1196}
1197
1198pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
1199
1200impl TeeNode {
1201 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1202 Rc::as_ptr(&self.0)
1203 }
1204}
1205
1206impl Debug for TeeNode {
1207 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1208 PRINTED_TEES.with(|printed_tees| {
1209 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1210 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1211
1212 if let Some(printed_tees_mut) = printed_tees_mut {
1213 if let Some(existing) = printed_tees_mut
1214 .1
1215 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1216 {
1217 write!(f, "<tee {}>", existing)
1218 } else {
1219 let next_id = printed_tees_mut.0;
1220 printed_tees_mut.0 += 1;
1221 printed_tees_mut
1222 .1
1223 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1224 drop(printed_tees_mut_borrow);
1225 write!(f, "<tee {}>: ", next_id)?;
1226 Debug::fmt(&self.0.borrow(), f)
1227 }
1228 } else {
1229 drop(printed_tees_mut_borrow);
1230 write!(f, "<tee>: ")?;
1231 Debug::fmt(&self.0.borrow(), f)
1232 }
1233 })
1234 }
1235}
1236
1237impl Hash for TeeNode {
1238 fn hash<H: Hasher>(&self, state: &mut H) {
1239 self.0.borrow_mut().hash(state);
1240 }
1241}
1242
1243#[derive(Clone, PartialEq, Eq, Debug)]
1244pub enum BoundKind {
1245 Unbounded,
1246 Bounded,
1247}
1248
1249#[derive(Clone, PartialEq, Eq, Debug)]
1250pub enum StreamOrder {
1251 NoOrder,
1252 TotalOrder,
1253}
1254
1255#[derive(Clone, PartialEq, Eq, Debug)]
1256pub enum StreamRetry {
1257 AtLeastOnce,
1258 ExactlyOnce,
1259}
1260
1261#[derive(Clone, PartialEq, Eq, Debug)]
1262pub enum KeyedSingletonBoundKind {
1263 Unbounded,
1264 BoundedValue,
1265 Bounded,
1266}
1267
1268#[derive(Clone, PartialEq, Eq, Debug)]
1269pub enum CollectionKind {
1270 Stream {
1271 bound: BoundKind,
1272 order: StreamOrder,
1273 retry: StreamRetry,
1274 element_type: DebugType,
1275 },
1276 Singleton {
1277 bound: BoundKind,
1278 element_type: DebugType,
1279 },
1280 Optional {
1281 bound: BoundKind,
1282 element_type: DebugType,
1283 },
1284 KeyedStream {
1285 bound: BoundKind,
1286 value_order: StreamOrder,
1287 value_retry: StreamRetry,
1288 key_type: DebugType,
1289 value_type: DebugType,
1290 },
1291 KeyedSingleton {
1292 bound: KeyedSingletonBoundKind,
1293 key_type: DebugType,
1294 value_type: DebugType,
1295 },
1296}
1297
1298#[derive(Clone)]
1299pub struct HydroIrMetadata {
1300 pub location_kind: LocationId,
1301 pub collection_kind: CollectionKind,
1302 pub cardinality: Option<usize>,
1303 pub tag: Option<String>,
1304 pub op: HydroIrOpMetadata,
1305}
1306
1307impl Hash for HydroIrMetadata {
1309 fn hash<H: Hasher>(&self, _: &mut H) {}
1310}
1311
1312impl PartialEq for HydroIrMetadata {
1313 fn eq(&self, _: &Self) -> bool {
1314 true
1315 }
1316}
1317
1318impl Eq for HydroIrMetadata {}
1319
1320impl Debug for HydroIrMetadata {
1321 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1322 f.debug_struct("HydroIrMetadata")
1323 .field("location_kind", &self.location_kind)
1324 .field("collection_kind", &self.collection_kind)
1325 .finish()
1326 }
1327}
1328
1329#[derive(Clone)]
1332pub struct HydroIrOpMetadata {
1333 pub backtrace: Backtrace,
1334 pub cpu_usage: Option<f64>,
1335 pub network_recv_cpu_usage: Option<f64>,
1336 pub id: Option<usize>,
1337}
1338
1339impl HydroIrOpMetadata {
1340 #[expect(
1341 clippy::new_without_default,
1342 reason = "explicit calls to new ensure correct backtrace bounds"
1343 )]
1344 pub fn new() -> HydroIrOpMetadata {
1345 Self::new_with_skip(1)
1346 }
1347
1348 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1349 HydroIrOpMetadata {
1350 backtrace: Backtrace::get_backtrace(2 + skip_count),
1351 cpu_usage: None,
1352 network_recv_cpu_usage: None,
1353 id: None,
1354 }
1355 }
1356}
1357
1358impl Debug for HydroIrOpMetadata {
1359 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1360 f.debug_struct("HydroIrOpMetadata").finish()
1361 }
1362}
1363
1364impl Hash for HydroIrOpMetadata {
1365 fn hash<H: Hasher>(&self, _: &mut H) {}
1366}
1367
1368#[derive(Debug, Hash)]
1371pub enum HydroNode {
1372 Placeholder,
1373
1374 Cast {
1382 inner: Box<HydroNode>,
1383 metadata: HydroIrMetadata,
1384 },
1385
1386 ObserveNonDet {
1392 inner: Box<HydroNode>,
1393 trusted: bool, metadata: HydroIrMetadata,
1395 },
1396
1397 Source {
1398 source: HydroSource,
1399 metadata: HydroIrMetadata,
1400 },
1401
1402 SingletonSource {
1403 value: DebugExpr,
1404 metadata: HydroIrMetadata,
1405 },
1406
1407 CycleSource {
1408 ident: syn::Ident,
1409 metadata: HydroIrMetadata,
1410 },
1411
1412 Tee {
1413 inner: TeeNode,
1414 metadata: HydroIrMetadata,
1415 },
1416
1417 Persist {
1418 inner: Box<HydroNode>,
1419 metadata: HydroIrMetadata,
1420 },
1421
1422 BeginAtomic {
1423 inner: Box<HydroNode>,
1424 metadata: HydroIrMetadata,
1425 },
1426
1427 EndAtomic {
1428 inner: Box<HydroNode>,
1429 metadata: HydroIrMetadata,
1430 },
1431
1432 Batch {
1433 inner: Box<HydroNode>,
1434 metadata: HydroIrMetadata,
1435 },
1436
1437 YieldConcat {
1438 inner: Box<HydroNode>,
1439 metadata: HydroIrMetadata,
1440 },
1441
1442 Chain {
1443 first: Box<HydroNode>,
1444 second: Box<HydroNode>,
1445 metadata: HydroIrMetadata,
1446 },
1447
1448 ChainFirst {
1449 first: Box<HydroNode>,
1450 second: Box<HydroNode>,
1451 metadata: HydroIrMetadata,
1452 },
1453
1454 CrossProduct {
1455 left: Box<HydroNode>,
1456 right: Box<HydroNode>,
1457 metadata: HydroIrMetadata,
1458 },
1459
1460 CrossSingleton {
1461 left: Box<HydroNode>,
1462 right: Box<HydroNode>,
1463 metadata: HydroIrMetadata,
1464 },
1465
1466 Join {
1467 left: Box<HydroNode>,
1468 right: Box<HydroNode>,
1469 metadata: HydroIrMetadata,
1470 },
1471
1472 Difference {
1473 pos: Box<HydroNode>,
1474 neg: Box<HydroNode>,
1475 metadata: HydroIrMetadata,
1476 },
1477
1478 AntiJoin {
1479 pos: Box<HydroNode>,
1480 neg: Box<HydroNode>,
1481 metadata: HydroIrMetadata,
1482 },
1483
1484 ResolveFutures {
1485 input: Box<HydroNode>,
1486 metadata: HydroIrMetadata,
1487 },
1488 ResolveFuturesOrdered {
1489 input: Box<HydroNode>,
1490 metadata: HydroIrMetadata,
1491 },
1492
1493 Map {
1494 f: DebugExpr,
1495 input: Box<HydroNode>,
1496 metadata: HydroIrMetadata,
1497 },
1498 FlatMap {
1499 f: DebugExpr,
1500 input: Box<HydroNode>,
1501 metadata: HydroIrMetadata,
1502 },
1503 Filter {
1504 f: DebugExpr,
1505 input: Box<HydroNode>,
1506 metadata: HydroIrMetadata,
1507 },
1508 FilterMap {
1509 f: DebugExpr,
1510 input: Box<HydroNode>,
1511 metadata: HydroIrMetadata,
1512 },
1513
1514 DeferTick {
1515 input: Box<HydroNode>,
1516 metadata: HydroIrMetadata,
1517 },
1518 Enumerate {
1519 input: Box<HydroNode>,
1520 metadata: HydroIrMetadata,
1521 },
1522 Inspect {
1523 f: DebugExpr,
1524 input: Box<HydroNode>,
1525 metadata: HydroIrMetadata,
1526 },
1527
1528 Unique {
1529 input: Box<HydroNode>,
1530 metadata: HydroIrMetadata,
1531 },
1532
1533 Sort {
1534 input: Box<HydroNode>,
1535 metadata: HydroIrMetadata,
1536 },
1537 Fold {
1538 init: DebugExpr,
1539 acc: DebugExpr,
1540 input: Box<HydroNode>,
1541 metadata: HydroIrMetadata,
1542 },
1543
1544 Scan {
1545 init: DebugExpr,
1546 acc: DebugExpr,
1547 input: Box<HydroNode>,
1548 metadata: HydroIrMetadata,
1549 },
1550 FoldKeyed {
1551 init: DebugExpr,
1552 acc: DebugExpr,
1553 input: Box<HydroNode>,
1554 metadata: HydroIrMetadata,
1555 },
1556
1557 Reduce {
1558 f: DebugExpr,
1559 input: Box<HydroNode>,
1560 metadata: HydroIrMetadata,
1561 },
1562 ReduceKeyed {
1563 f: DebugExpr,
1564 input: Box<HydroNode>,
1565 metadata: HydroIrMetadata,
1566 },
1567 ReduceKeyedWatermark {
1568 f: DebugExpr,
1569 input: Box<HydroNode>,
1570 watermark: Box<HydroNode>,
1571 metadata: HydroIrMetadata,
1572 },
1573
1574 Network {
1575 serialize_fn: Option<DebugExpr>,
1576 instantiate_fn: DebugInstantiate,
1577 deserialize_fn: Option<DebugExpr>,
1578 input: Box<HydroNode>,
1579 metadata: HydroIrMetadata,
1580 },
1581
1582 ExternalInput {
1583 from_external_id: usize,
1584 from_key: usize,
1585 from_many: bool,
1586 codec_type: DebugType,
1587 port_hint: NetworkHint,
1588 instantiate_fn: DebugInstantiate,
1589 deserialize_fn: Option<DebugExpr>,
1590 metadata: HydroIrMetadata,
1591 },
1592
1593 Counter {
1594 tag: String,
1595 duration: DebugExpr,
1596 prefix: String,
1597 input: Box<HydroNode>,
1598 metadata: HydroIrMetadata,
1599 },
1600}
1601
1602pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1603pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1604
1605impl HydroNode {
1606 pub fn transform_bottom_up(
1607 &mut self,
1608 transform: &mut impl FnMut(&mut HydroNode),
1609 seen_tees: &mut SeenTees,
1610 check_well_formed: bool,
1611 ) {
1612 self.transform_children(
1613 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1614 seen_tees,
1615 );
1616
1617 transform(self);
1618
1619 let self_location = self.metadata().location_kind.root();
1620
1621 if check_well_formed {
1622 match &*self {
1623 HydroNode::Network { .. } => {}
1624 _ => {
1625 self.input_metadata().iter().for_each(|i| {
1626 if i.location_kind.root() != self_location {
1627 panic!(
1628 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1629 i,
1630 i.location_kind.root(),
1631 self,
1632 self_location
1633 )
1634 }
1635 });
1636 }
1637 }
1638 }
1639 }
1640
1641 #[inline(always)]
1642 pub fn transform_children(
1643 &mut self,
1644 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1645 seen_tees: &mut SeenTees,
1646 ) {
1647 match self {
1648 HydroNode::Placeholder => {
1649 panic!();
1650 }
1651
1652 HydroNode::Source { .. }
1653 | HydroNode::SingletonSource { .. }
1654 | HydroNode::CycleSource { .. }
1655 | HydroNode::ExternalInput { .. } => {}
1656
1657 HydroNode::Tee { inner, .. } => {
1658 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1659 *inner = TeeNode(transformed.clone());
1660 } else {
1661 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1662 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1663 let mut orig = inner.0.replace(HydroNode::Placeholder);
1664 transform(&mut orig, seen_tees);
1665 *transformed_cell.borrow_mut() = orig;
1666 *inner = TeeNode(transformed_cell);
1667 }
1668 }
1669
1670 HydroNode::Cast { inner, .. }
1671 | HydroNode::ObserveNonDet { inner, .. }
1672 | HydroNode::Persist { inner, .. }
1673 | HydroNode::BeginAtomic { inner, .. }
1674 | HydroNode::EndAtomic { inner, .. }
1675 | HydroNode::Batch { inner, .. }
1676 | HydroNode::YieldConcat { inner, .. } => {
1677 transform(inner.as_mut(), seen_tees);
1678 }
1679
1680 HydroNode::Chain { first, second, .. } => {
1681 transform(first.as_mut(), seen_tees);
1682 transform(second.as_mut(), seen_tees);
1683 }
1684
1685 HydroNode::ChainFirst { first, second, .. } => {
1686 transform(first.as_mut(), seen_tees);
1687 transform(second.as_mut(), seen_tees);
1688 }
1689
1690 HydroNode::CrossSingleton { left, right, .. }
1691 | HydroNode::CrossProduct { left, right, .. }
1692 | HydroNode::Join { left, right, .. } => {
1693 transform(left.as_mut(), seen_tees);
1694 transform(right.as_mut(), seen_tees);
1695 }
1696
1697 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1698 transform(pos.as_mut(), seen_tees);
1699 transform(neg.as_mut(), seen_tees);
1700 }
1701
1702 HydroNode::ReduceKeyedWatermark {
1703 input, watermark, ..
1704 } => {
1705 transform(input.as_mut(), seen_tees);
1706 transform(watermark.as_mut(), seen_tees);
1707 }
1708
1709 HydroNode::Map { input, .. }
1710 | HydroNode::ResolveFutures { input, .. }
1711 | HydroNode::ResolveFuturesOrdered { input, .. }
1712 | HydroNode::FlatMap { input, .. }
1713 | HydroNode::Filter { input, .. }
1714 | HydroNode::FilterMap { input, .. }
1715 | HydroNode::Sort { input, .. }
1716 | HydroNode::DeferTick { input, .. }
1717 | HydroNode::Enumerate { input, .. }
1718 | HydroNode::Inspect { input, .. }
1719 | HydroNode::Unique { input, .. }
1720 | HydroNode::Network { input, .. }
1721 | HydroNode::Fold { input, .. }
1722 | HydroNode::Scan { input, .. }
1723 | HydroNode::FoldKeyed { input, .. }
1724 | HydroNode::Reduce { input, .. }
1725 | HydroNode::ReduceKeyed { input, .. }
1726 | HydroNode::Counter { input, .. } => {
1727 transform(input.as_mut(), seen_tees);
1728 }
1729 }
1730 }
1731
1732 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
1733 match self {
1734 HydroNode::Placeholder => HydroNode::Placeholder,
1735 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
1736 inner: Box::new(inner.deep_clone(seen_tees)),
1737 metadata: metadata.clone(),
1738 },
1739 HydroNode::ObserveNonDet {
1740 inner,
1741 trusted,
1742 metadata,
1743 } => HydroNode::ObserveNonDet {
1744 inner: Box::new(inner.deep_clone(seen_tees)),
1745 trusted: *trusted,
1746 metadata: metadata.clone(),
1747 },
1748 HydroNode::Source { source, metadata } => HydroNode::Source {
1749 source: source.clone(),
1750 metadata: metadata.clone(),
1751 },
1752 HydroNode::SingletonSource { value, metadata } => HydroNode::SingletonSource {
1753 value: value.clone(),
1754 metadata: metadata.clone(),
1755 },
1756 HydroNode::CycleSource { ident, metadata } => HydroNode::CycleSource {
1757 ident: ident.clone(),
1758 metadata: metadata.clone(),
1759 },
1760 HydroNode::Tee { inner, metadata } => {
1761 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1762 HydroNode::Tee {
1763 inner: TeeNode(transformed.clone()),
1764 metadata: metadata.clone(),
1765 }
1766 } else {
1767 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
1768 seen_tees.insert(inner.as_ptr(), new_rc.clone());
1769 let cloned = inner.0.borrow().deep_clone(seen_tees);
1770 *new_rc.borrow_mut() = cloned;
1771 HydroNode::Tee {
1772 inner: TeeNode(new_rc),
1773 metadata: metadata.clone(),
1774 }
1775 }
1776 }
1777 HydroNode::Persist { inner, metadata } => HydroNode::Persist {
1778 inner: Box::new(inner.deep_clone(seen_tees)),
1779 metadata: metadata.clone(),
1780 },
1781 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
1782 inner: Box::new(inner.deep_clone(seen_tees)),
1783 metadata: metadata.clone(),
1784 },
1785 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
1786 inner: Box::new(inner.deep_clone(seen_tees)),
1787 metadata: metadata.clone(),
1788 },
1789 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
1790 inner: Box::new(inner.deep_clone(seen_tees)),
1791 metadata: metadata.clone(),
1792 },
1793 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
1794 inner: Box::new(inner.deep_clone(seen_tees)),
1795 metadata: metadata.clone(),
1796 },
1797 HydroNode::Chain {
1798 first,
1799 second,
1800 metadata,
1801 } => HydroNode::Chain {
1802 first: Box::new(first.deep_clone(seen_tees)),
1803 second: Box::new(second.deep_clone(seen_tees)),
1804 metadata: metadata.clone(),
1805 },
1806 HydroNode::ChainFirst {
1807 first,
1808 second,
1809 metadata,
1810 } => HydroNode::ChainFirst {
1811 first: Box::new(first.deep_clone(seen_tees)),
1812 second: Box::new(second.deep_clone(seen_tees)),
1813 metadata: metadata.clone(),
1814 },
1815 HydroNode::CrossProduct {
1816 left,
1817 right,
1818 metadata,
1819 } => HydroNode::CrossProduct {
1820 left: Box::new(left.deep_clone(seen_tees)),
1821 right: Box::new(right.deep_clone(seen_tees)),
1822 metadata: metadata.clone(),
1823 },
1824 HydroNode::CrossSingleton {
1825 left,
1826 right,
1827 metadata,
1828 } => HydroNode::CrossSingleton {
1829 left: Box::new(left.deep_clone(seen_tees)),
1830 right: Box::new(right.deep_clone(seen_tees)),
1831 metadata: metadata.clone(),
1832 },
1833 HydroNode::Join {
1834 left,
1835 right,
1836 metadata,
1837 } => HydroNode::Join {
1838 left: Box::new(left.deep_clone(seen_tees)),
1839 right: Box::new(right.deep_clone(seen_tees)),
1840 metadata: metadata.clone(),
1841 },
1842 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
1843 pos: Box::new(pos.deep_clone(seen_tees)),
1844 neg: Box::new(neg.deep_clone(seen_tees)),
1845 metadata: metadata.clone(),
1846 },
1847 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
1848 pos: Box::new(pos.deep_clone(seen_tees)),
1849 neg: Box::new(neg.deep_clone(seen_tees)),
1850 metadata: metadata.clone(),
1851 },
1852 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
1853 input: Box::new(input.deep_clone(seen_tees)),
1854 metadata: metadata.clone(),
1855 },
1856 HydroNode::ResolveFuturesOrdered { input, metadata } => {
1857 HydroNode::ResolveFuturesOrdered {
1858 input: Box::new(input.deep_clone(seen_tees)),
1859 metadata: metadata.clone(),
1860 }
1861 }
1862 HydroNode::Map { f, input, metadata } => HydroNode::Map {
1863 f: f.clone(),
1864 input: Box::new(input.deep_clone(seen_tees)),
1865 metadata: metadata.clone(),
1866 },
1867 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
1868 f: f.clone(),
1869 input: Box::new(input.deep_clone(seen_tees)),
1870 metadata: metadata.clone(),
1871 },
1872 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
1873 f: f.clone(),
1874 input: Box::new(input.deep_clone(seen_tees)),
1875 metadata: metadata.clone(),
1876 },
1877 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
1878 f: f.clone(),
1879 input: Box::new(input.deep_clone(seen_tees)),
1880 metadata: metadata.clone(),
1881 },
1882 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
1883 input: Box::new(input.deep_clone(seen_tees)),
1884 metadata: metadata.clone(),
1885 },
1886 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
1887 input: Box::new(input.deep_clone(seen_tees)),
1888 metadata: metadata.clone(),
1889 },
1890 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
1891 f: f.clone(),
1892 input: Box::new(input.deep_clone(seen_tees)),
1893 metadata: metadata.clone(),
1894 },
1895 HydroNode::Unique { input, metadata } => HydroNode::Unique {
1896 input: Box::new(input.deep_clone(seen_tees)),
1897 metadata: metadata.clone(),
1898 },
1899 HydroNode::Sort { input, metadata } => HydroNode::Sort {
1900 input: Box::new(input.deep_clone(seen_tees)),
1901 metadata: metadata.clone(),
1902 },
1903 HydroNode::Fold {
1904 init,
1905 acc,
1906 input,
1907 metadata,
1908 } => HydroNode::Fold {
1909 init: init.clone(),
1910 acc: acc.clone(),
1911 input: Box::new(input.deep_clone(seen_tees)),
1912 metadata: metadata.clone(),
1913 },
1914 HydroNode::Scan {
1915 init,
1916 acc,
1917 input,
1918 metadata,
1919 } => HydroNode::Scan {
1920 init: init.clone(),
1921 acc: acc.clone(),
1922 input: Box::new(input.deep_clone(seen_tees)),
1923 metadata: metadata.clone(),
1924 },
1925 HydroNode::FoldKeyed {
1926 init,
1927 acc,
1928 input,
1929 metadata,
1930 } => HydroNode::FoldKeyed {
1931 init: init.clone(),
1932 acc: acc.clone(),
1933 input: Box::new(input.deep_clone(seen_tees)),
1934 metadata: metadata.clone(),
1935 },
1936 HydroNode::ReduceKeyedWatermark {
1937 f,
1938 input,
1939 watermark,
1940 metadata,
1941 } => HydroNode::ReduceKeyedWatermark {
1942 f: f.clone(),
1943 input: Box::new(input.deep_clone(seen_tees)),
1944 watermark: Box::new(watermark.deep_clone(seen_tees)),
1945 metadata: metadata.clone(),
1946 },
1947 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
1948 f: f.clone(),
1949 input: Box::new(input.deep_clone(seen_tees)),
1950 metadata: metadata.clone(),
1951 },
1952 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
1953 f: f.clone(),
1954 input: Box::new(input.deep_clone(seen_tees)),
1955 metadata: metadata.clone(),
1956 },
1957 HydroNode::Network {
1958 serialize_fn,
1959 instantiate_fn,
1960 deserialize_fn,
1961 input,
1962 metadata,
1963 } => HydroNode::Network {
1964 serialize_fn: serialize_fn.clone(),
1965 instantiate_fn: instantiate_fn.clone(),
1966 deserialize_fn: deserialize_fn.clone(),
1967 input: Box::new(input.deep_clone(seen_tees)),
1968 metadata: metadata.clone(),
1969 },
1970 HydroNode::ExternalInput {
1971 from_external_id,
1972 from_key,
1973 from_many,
1974 codec_type,
1975 port_hint,
1976 instantiate_fn,
1977 deserialize_fn,
1978 metadata,
1979 } => HydroNode::ExternalInput {
1980 from_external_id: *from_external_id,
1981 from_key: *from_key,
1982 from_many: *from_many,
1983 codec_type: codec_type.clone(),
1984 port_hint: *port_hint,
1985 instantiate_fn: instantiate_fn.clone(),
1986 deserialize_fn: deserialize_fn.clone(),
1987 metadata: metadata.clone(),
1988 },
1989 HydroNode::Counter {
1990 tag,
1991 duration,
1992 prefix,
1993 input,
1994 metadata,
1995 } => HydroNode::Counter {
1996 tag: tag.clone(),
1997 duration: duration.clone(),
1998 prefix: prefix.clone(),
1999 input: Box::new(input.deep_clone(seen_tees)),
2000 metadata: metadata.clone(),
2001 },
2002 }
2003 }
2004
2005 #[cfg(feature = "build")]
2006 pub fn emit_core(
2007 &mut self,
2008 builders_or_callback: &mut BuildersOrCallback<
2009 impl FnMut(&mut HydroRoot, &mut usize),
2010 impl FnMut(&mut HydroNode, &mut usize),
2011 >,
2012 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
2013 next_stmt_id: &mut usize,
2014 ) -> syn::Ident {
2015 let out_location = self.metadata().location_kind.clone();
2016 match self {
2017 HydroNode::Placeholder => {
2018 panic!()
2019 }
2020
2021 HydroNode::Cast { inner, .. } => {
2022 let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2023
2024 match builders_or_callback {
2025 BuildersOrCallback::Builders(_) => {}
2026 BuildersOrCallback::Callback(_, node_callback) => {
2027 node_callback(self, next_stmt_id);
2028 }
2029 }
2030
2031 *next_stmt_id += 1;
2032
2033 inner_ident
2034 }
2035
2036 HydroNode::ObserveNonDet {
2037 inner,
2038 trusted,
2039 metadata,
2040 ..
2041 } => {
2042 let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2043
2044 let observe_ident =
2045 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2046
2047 match builders_or_callback {
2048 BuildersOrCallback::Builders(graph_builders) => {
2049 graph_builders.observe_nondet(
2050 *trusted,
2051 &inner.metadata().location_kind,
2052 inner_ident,
2053 &inner.metadata().collection_kind,
2054 &observe_ident,
2055 &metadata.collection_kind,
2056 );
2057 }
2058 BuildersOrCallback::Callback(_, node_callback) => {
2059 node_callback(self, next_stmt_id);
2060 }
2061 }
2062
2063 *next_stmt_id += 1;
2064
2065 observe_ident
2066 }
2067
2068 HydroNode::Persist { inner, .. } => {
2069 let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2070
2071 let persist_ident =
2072 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2073
2074 match builders_or_callback {
2075 BuildersOrCallback::Builders(graph_builders) => {
2076 let builder = graph_builders.get_dfir_mut(&out_location);
2077 builder.add_dfir(
2078 parse_quote! {
2079 #persist_ident = #inner_ident -> persist::<'static>();
2080 },
2081 None,
2082 Some(&next_stmt_id.to_string()),
2083 );
2084 }
2085 BuildersOrCallback::Callback(_, node_callback) => {
2086 node_callback(self, next_stmt_id);
2087 }
2088 }
2089
2090 *next_stmt_id += 1;
2091
2092 persist_ident
2093 }
2094
2095 HydroNode::Batch {
2096 inner, metadata, ..
2097 } => {
2098 let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2099
2100 let batch_ident =
2101 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2102
2103 match builders_or_callback {
2104 BuildersOrCallback::Builders(graph_builders) => {
2105 graph_builders.batch(
2106 inner_ident,
2107 &inner.metadata().location_kind,
2108 &inner.metadata().collection_kind,
2109 &batch_ident,
2110 &out_location,
2111 &metadata.op,
2112 );
2113 }
2114 BuildersOrCallback::Callback(_, node_callback) => {
2115 node_callback(self, next_stmt_id);
2116 }
2117 }
2118
2119 *next_stmt_id += 1;
2120
2121 batch_ident
2122 }
2123
2124 HydroNode::YieldConcat { inner, .. } => {
2125 let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2126
2127 let yield_ident =
2128 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2129
2130 match builders_or_callback {
2131 BuildersOrCallback::Builders(graph_builders) => {
2132 graph_builders.yield_from_tick(
2133 inner_ident,
2134 &inner.metadata().location_kind,
2135 &inner.metadata().collection_kind,
2136 &yield_ident,
2137 &out_location,
2138 );
2139 }
2140 BuildersOrCallback::Callback(_, node_callback) => {
2141 node_callback(self, next_stmt_id);
2142 }
2143 }
2144
2145 *next_stmt_id += 1;
2146
2147 yield_ident
2148 }
2149
2150 HydroNode::BeginAtomic { inner, metadata } => {
2151 let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2152
2153 let begin_ident =
2154 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2155
2156 match builders_or_callback {
2157 BuildersOrCallback::Builders(graph_builders) => {
2158 graph_builders.begin_atomic(
2159 inner_ident,
2160 &inner.metadata().location_kind,
2161 &inner.metadata().collection_kind,
2162 &begin_ident,
2163 &out_location,
2164 &metadata.op,
2165 );
2166 }
2167 BuildersOrCallback::Callback(_, node_callback) => {
2168 node_callback(self, next_stmt_id);
2169 }
2170 }
2171
2172 *next_stmt_id += 1;
2173
2174 begin_ident
2175 }
2176
2177 HydroNode::EndAtomic { inner, .. } => {
2178 let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2179
2180 let end_ident =
2181 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2182
2183 match builders_or_callback {
2184 BuildersOrCallback::Builders(graph_builders) => {
2185 graph_builders.end_atomic(
2186 inner_ident,
2187 &inner.metadata().location_kind,
2188 &inner.metadata().collection_kind,
2189 &end_ident,
2190 );
2191 }
2192 BuildersOrCallback::Callback(_, node_callback) => {
2193 node_callback(self, next_stmt_id);
2194 }
2195 }
2196
2197 *next_stmt_id += 1;
2198
2199 end_ident
2200 }
2201
2202 HydroNode::Source {
2203 source, metadata, ..
2204 } => {
2205 if let HydroSource::ExternalNetwork() = source {
2206 syn::Ident::new("DUMMY", Span::call_site())
2207 } else {
2208 let source_ident =
2209 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2210
2211 let source_stmt = match source {
2212 HydroSource::Stream(expr) => {
2213 debug_assert!(metadata.location_kind.is_top_level());
2214 parse_quote! {
2215 #source_ident = source_stream(#expr);
2216 }
2217 }
2218
2219 HydroSource::ExternalNetwork() => {
2220 unreachable!()
2221 }
2222
2223 HydroSource::Iter(expr) => {
2224 if metadata.location_kind.is_top_level() {
2225 parse_quote! {
2226 #source_ident = source_iter(#expr);
2227 }
2228 } else {
2229 parse_quote! {
2231 #source_ident = source_iter(#expr) -> persist::<'static>();
2232 }
2233 }
2234 }
2235
2236 HydroSource::Spin() => {
2237 debug_assert!(metadata.location_kind.is_top_level());
2238 parse_quote! {
2239 #source_ident = spin();
2240 }
2241 }
2242 };
2243
2244 match builders_or_callback {
2245 BuildersOrCallback::Builders(graph_builders) => {
2246 let builder = graph_builders.get_dfir_mut(&out_location);
2247 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2248 }
2249 BuildersOrCallback::Callback(_, node_callback) => {
2250 node_callback(self, next_stmt_id);
2251 }
2252 }
2253
2254 *next_stmt_id += 1;
2255
2256 source_ident
2257 }
2258 }
2259
2260 HydroNode::SingletonSource { value, metadata } => {
2261 let source_ident =
2262 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2263
2264 match builders_or_callback {
2265 BuildersOrCallback::Builders(graph_builders) => {
2266 let should_replay = !graph_builders.singleton_intermediates();
2267 let builder = graph_builders.get_dfir_mut(&out_location);
2268
2269 if should_replay || !metadata.location_kind.is_top_level() {
2270 builder.add_dfir(
2271 parse_quote! {
2272 #source_ident = source_iter([#value]) -> persist::<'static>();
2273 },
2274 None,
2275 Some(&next_stmt_id.to_string()),
2276 );
2277 } else {
2278 builder.add_dfir(
2279 parse_quote! {
2280 #source_ident = source_iter([#value]);
2281 },
2282 None,
2283 Some(&next_stmt_id.to_string()),
2284 );
2285 }
2286 }
2287 BuildersOrCallback::Callback(_, node_callback) => {
2288 node_callback(self, next_stmt_id);
2289 }
2290 }
2291
2292 *next_stmt_id += 1;
2293
2294 source_ident
2295 }
2296
2297 HydroNode::CycleSource { ident, .. } => {
2298 let ident = ident.clone();
2299
2300 match builders_or_callback {
2301 BuildersOrCallback::Builders(_) => {}
2302 BuildersOrCallback::Callback(_, node_callback) => {
2303 node_callback(self, next_stmt_id);
2304 }
2305 }
2306
2307 *next_stmt_id += 1;
2309
2310 ident
2311 }
2312
2313 HydroNode::Tee { inner, .. } => {
2314 let ret_ident = if let Some(teed_from) =
2315 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2316 {
2317 match builders_or_callback {
2318 BuildersOrCallback::Builders(_) => {}
2319 BuildersOrCallback::Callback(_, node_callback) => {
2320 node_callback(self, next_stmt_id);
2321 }
2322 }
2323
2324 teed_from.clone()
2325 } else {
2326 let inner_ident = inner.0.borrow_mut().emit_core(
2327 builders_or_callback,
2328 built_tees,
2329 next_stmt_id,
2330 );
2331
2332 let tee_ident =
2333 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2334
2335 built_tees.insert(
2336 inner.0.as_ref() as *const RefCell<HydroNode>,
2337 tee_ident.clone(),
2338 );
2339
2340 match builders_or_callback {
2341 BuildersOrCallback::Builders(graph_builders) => {
2342 let builder = graph_builders.get_dfir_mut(&out_location);
2343 builder.add_dfir(
2344 parse_quote! {
2345 #tee_ident = #inner_ident -> tee();
2346 },
2347 None,
2348 Some(&next_stmt_id.to_string()),
2349 );
2350 }
2351 BuildersOrCallback::Callback(_, node_callback) => {
2352 node_callback(self, next_stmt_id);
2353 }
2354 }
2355
2356 tee_ident
2357 };
2358
2359 *next_stmt_id += 1;
2363 ret_ident
2364 }
2365
2366 HydroNode::Chain { first, second, .. } => {
2367 let first_ident = first.emit_core(builders_or_callback, built_tees, next_stmt_id);
2368 let second_ident = second.emit_core(builders_or_callback, built_tees, next_stmt_id);
2369
2370 let chain_ident =
2371 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2372
2373 match builders_or_callback {
2374 BuildersOrCallback::Builders(graph_builders) => {
2375 let builder = graph_builders.get_dfir_mut(&out_location);
2376 builder.add_dfir(
2377 parse_quote! {
2378 #chain_ident = chain();
2379 #first_ident -> [0]#chain_ident;
2380 #second_ident -> [1]#chain_ident;
2381 },
2382 None,
2383 Some(&next_stmt_id.to_string()),
2384 );
2385 }
2386 BuildersOrCallback::Callback(_, node_callback) => {
2387 node_callback(self, next_stmt_id);
2388 }
2389 }
2390
2391 *next_stmt_id += 1;
2392
2393 chain_ident
2394 }
2395
2396 HydroNode::ChainFirst { first, second, .. } => {
2397 let first_ident = first.emit_core(builders_or_callback, built_tees, next_stmt_id);
2398 let second_ident = second.emit_core(builders_or_callback, built_tees, next_stmt_id);
2399
2400 let chain_ident =
2401 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2402
2403 match builders_or_callback {
2404 BuildersOrCallback::Builders(graph_builders) => {
2405 let builder = graph_builders.get_dfir_mut(&out_location);
2406 builder.add_dfir(
2407 parse_quote! {
2408 #chain_ident = chain_first_n(1);
2409 #first_ident -> [0]#chain_ident;
2410 #second_ident -> [1]#chain_ident;
2411 },
2412 None,
2413 Some(&next_stmt_id.to_string()),
2414 );
2415 }
2416 BuildersOrCallback::Callback(_, node_callback) => {
2417 node_callback(self, next_stmt_id);
2418 }
2419 }
2420
2421 *next_stmt_id += 1;
2422
2423 chain_ident
2424 }
2425
2426 HydroNode::CrossSingleton { left, right, .. } => {
2427 let left_ident = left.emit_core(builders_or_callback, built_tees, next_stmt_id);
2428 let right_ident = right.emit_core(builders_or_callback, built_tees, next_stmt_id);
2429
2430 let cross_ident =
2431 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2432
2433 match builders_or_callback {
2434 BuildersOrCallback::Builders(graph_builders) => {
2435 let builder = graph_builders.get_dfir_mut(&out_location);
2436 builder.add_dfir(
2437 parse_quote! {
2438 #cross_ident = cross_singleton();
2439 #left_ident -> [input]#cross_ident;
2440 #right_ident -> [single]#cross_ident;
2441 },
2442 None,
2443 Some(&next_stmt_id.to_string()),
2444 );
2445 }
2446 BuildersOrCallback::Callback(_, node_callback) => {
2447 node_callback(self, next_stmt_id);
2448 }
2449 }
2450
2451 *next_stmt_id += 1;
2452
2453 cross_ident
2454 }
2455
2456 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
2457 let operator: syn::Ident = if matches!(self, HydroNode::CrossProduct { .. }) {
2458 parse_quote!(cross_join_multiset)
2459 } else {
2460 parse_quote!(join_multiset)
2461 };
2462
2463 let (HydroNode::CrossProduct { left, right, .. }
2464 | HydroNode::Join { left, right, .. }) = self
2465 else {
2466 unreachable!()
2467 };
2468
2469 let is_top_level = left.metadata().location_kind.is_top_level()
2470 && right.metadata().location_kind.is_top_level();
2471 let (left_inner, left_lifetime) =
2472 if let HydroNode::Persist { inner: left, .. } = left.as_mut() {
2473 debug_assert!(!left.metadata().location_kind.is_top_level());
2474 (left, quote!('static))
2475 } else if left.metadata().location_kind.is_top_level() {
2476 (left, quote!('static))
2477 } else {
2478 (left, quote!('tick))
2479 };
2480
2481 let (right_inner, right_lifetime) =
2482 if let HydroNode::Persist { inner: right, .. } = right.as_mut() {
2483 debug_assert!(!right.metadata().location_kind.is_top_level());
2484 (right, quote!('static))
2485 } else if right.metadata().location_kind.is_top_level() {
2486 (right, quote!('static))
2487 } else {
2488 (right, quote!('tick))
2489 };
2490
2491 let left_ident =
2492 left_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2493 let right_ident =
2494 right_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2495
2496 let stream_ident =
2497 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2498
2499 match builders_or_callback {
2500 BuildersOrCallback::Builders(graph_builders) => {
2501 let builder = graph_builders.get_dfir_mut(&out_location);
2502 builder.add_dfir(
2503 if is_top_level {
2504 parse_quote! {
2507 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
2508 #left_ident -> [0]#stream_ident;
2509 #right_ident -> [1]#stream_ident;
2510 }
2511 } else {
2512 parse_quote! {
2513 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
2514 #left_ident -> [0]#stream_ident;
2515 #right_ident -> [1]#stream_ident;
2516 }
2517 }
2518 ,
2519 None,
2520 Some(&next_stmt_id.to_string()),
2521 );
2522 }
2523 BuildersOrCallback::Callback(_, node_callback) => {
2524 node_callback(self, next_stmt_id);
2525 }
2526 }
2527
2528 *next_stmt_id += 1;
2529
2530 stream_ident
2531 }
2532
2533 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
2534 let operator: syn::Ident = if matches!(self, HydroNode::Difference { .. }) {
2535 parse_quote!(difference_multiset)
2536 } else {
2537 parse_quote!(anti_join_multiset)
2538 };
2539
2540 let (HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. }) =
2541 self
2542 else {
2543 unreachable!()
2544 };
2545
2546 let (neg, neg_lifetime) =
2547 if let HydroNode::Persist { inner: neg, .. } = neg.as_mut() {
2548 debug_assert!(!neg.metadata().location_kind.is_top_level());
2549 (neg, quote!('static))
2550 } else if neg.metadata().location_kind.is_top_level() {
2551 (neg, quote!('static))
2552 } else {
2553 (neg, quote!('tick))
2554 };
2555
2556 let pos_ident = pos.emit_core(builders_or_callback, built_tees, next_stmt_id);
2557 let neg_ident = neg.emit_core(builders_or_callback, built_tees, next_stmt_id);
2558
2559 let stream_ident =
2560 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2561
2562 match builders_or_callback {
2563 BuildersOrCallback::Builders(graph_builders) => {
2564 let builder = graph_builders.get_dfir_mut(&out_location);
2565 builder.add_dfir(
2566 parse_quote! {
2567 #stream_ident = #operator::<'tick, #neg_lifetime>();
2568 #pos_ident -> [pos]#stream_ident;
2569 #neg_ident -> [neg]#stream_ident;
2570 },
2571 None,
2572 Some(&next_stmt_id.to_string()),
2573 );
2574 }
2575 BuildersOrCallback::Callback(_, node_callback) => {
2576 node_callback(self, next_stmt_id);
2577 }
2578 }
2579
2580 *next_stmt_id += 1;
2581
2582 stream_ident
2583 }
2584
2585 HydroNode::ResolveFutures { input, .. } => {
2586 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2587
2588 let futures_ident =
2589 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2590
2591 match builders_or_callback {
2592 BuildersOrCallback::Builders(graph_builders) => {
2593 let builder = graph_builders.get_dfir_mut(&out_location);
2594 builder.add_dfir(
2595 parse_quote! {
2596 #futures_ident = #input_ident -> resolve_futures();
2597 },
2598 None,
2599 Some(&next_stmt_id.to_string()),
2600 );
2601 }
2602 BuildersOrCallback::Callback(_, node_callback) => {
2603 node_callback(self, next_stmt_id);
2604 }
2605 }
2606
2607 *next_stmt_id += 1;
2608
2609 futures_ident
2610 }
2611
2612 HydroNode::ResolveFuturesOrdered { input, .. } => {
2613 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2614
2615 let futures_ident =
2616 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2617
2618 match builders_or_callback {
2619 BuildersOrCallback::Builders(graph_builders) => {
2620 let builder = graph_builders.get_dfir_mut(&out_location);
2621 builder.add_dfir(
2622 parse_quote! {
2623 #futures_ident = #input_ident -> resolve_futures_ordered();
2624 },
2625 None,
2626 Some(&next_stmt_id.to_string()),
2627 );
2628 }
2629 BuildersOrCallback::Callback(_, node_callback) => {
2630 node_callback(self, next_stmt_id);
2631 }
2632 }
2633
2634 *next_stmt_id += 1;
2635
2636 futures_ident
2637 }
2638
2639 HydroNode::Map { f, input, .. } => {
2640 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2641
2642 let map_ident =
2643 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2644
2645 match builders_or_callback {
2646 BuildersOrCallback::Builders(graph_builders) => {
2647 let builder = graph_builders.get_dfir_mut(&out_location);
2648 builder.add_dfir(
2649 parse_quote! {
2650 #map_ident = #input_ident -> map(#f);
2651 },
2652 None,
2653 Some(&next_stmt_id.to_string()),
2654 );
2655 }
2656 BuildersOrCallback::Callback(_, node_callback) => {
2657 node_callback(self, next_stmt_id);
2658 }
2659 }
2660
2661 *next_stmt_id += 1;
2662
2663 map_ident
2664 }
2665
2666 HydroNode::FlatMap { f, input, .. } => {
2667 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2668
2669 let flat_map_ident =
2670 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2671
2672 match builders_or_callback {
2673 BuildersOrCallback::Builders(graph_builders) => {
2674 let builder = graph_builders.get_dfir_mut(&out_location);
2675 builder.add_dfir(
2676 parse_quote! {
2677 #flat_map_ident = #input_ident -> flat_map(#f);
2678 },
2679 None,
2680 Some(&next_stmt_id.to_string()),
2681 );
2682 }
2683 BuildersOrCallback::Callback(_, node_callback) => {
2684 node_callback(self, next_stmt_id);
2685 }
2686 }
2687
2688 *next_stmt_id += 1;
2689
2690 flat_map_ident
2691 }
2692
2693 HydroNode::Filter { f, input, .. } => {
2694 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2695
2696 let filter_ident =
2697 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2698
2699 match builders_or_callback {
2700 BuildersOrCallback::Builders(graph_builders) => {
2701 let builder = graph_builders.get_dfir_mut(&out_location);
2702 builder.add_dfir(
2703 parse_quote! {
2704 #filter_ident = #input_ident -> filter(#f);
2705 },
2706 None,
2707 Some(&next_stmt_id.to_string()),
2708 );
2709 }
2710 BuildersOrCallback::Callback(_, node_callback) => {
2711 node_callback(self, next_stmt_id);
2712 }
2713 }
2714
2715 *next_stmt_id += 1;
2716
2717 filter_ident
2718 }
2719
2720 HydroNode::FilterMap { f, input, .. } => {
2721 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2722
2723 let filter_map_ident =
2724 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2725
2726 match builders_or_callback {
2727 BuildersOrCallback::Builders(graph_builders) => {
2728 let builder = graph_builders.get_dfir_mut(&out_location);
2729 builder.add_dfir(
2730 parse_quote! {
2731 #filter_map_ident = #input_ident -> filter_map(#f);
2732 },
2733 None,
2734 Some(&next_stmt_id.to_string()),
2735 );
2736 }
2737 BuildersOrCallback::Callback(_, node_callback) => {
2738 node_callback(self, next_stmt_id);
2739 }
2740 }
2741
2742 *next_stmt_id += 1;
2743
2744 filter_map_ident
2745 }
2746
2747 HydroNode::Sort { input, .. } => {
2748 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2749
2750 let sort_ident =
2751 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2752
2753 match builders_or_callback {
2754 BuildersOrCallback::Builders(graph_builders) => {
2755 let builder = graph_builders.get_dfir_mut(&out_location);
2756 builder.add_dfir(
2757 parse_quote! {
2758 #sort_ident = #input_ident -> sort();
2759 },
2760 None,
2761 Some(&next_stmt_id.to_string()),
2762 );
2763 }
2764 BuildersOrCallback::Callback(_, node_callback) => {
2765 node_callback(self, next_stmt_id);
2766 }
2767 }
2768
2769 *next_stmt_id += 1;
2770
2771 sort_ident
2772 }
2773
2774 HydroNode::DeferTick { input, .. } => {
2775 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2776
2777 let defer_tick_ident =
2778 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2779
2780 match builders_or_callback {
2781 BuildersOrCallback::Builders(graph_builders) => {
2782 let builder = graph_builders.get_dfir_mut(&out_location);
2783 builder.add_dfir(
2784 parse_quote! {
2785 #defer_tick_ident = #input_ident -> defer_tick_lazy();
2786 },
2787 None,
2788 Some(&next_stmt_id.to_string()),
2789 );
2790 }
2791 BuildersOrCallback::Callback(_, node_callback) => {
2792 node_callback(self, next_stmt_id);
2793 }
2794 }
2795
2796 *next_stmt_id += 1;
2797
2798 defer_tick_ident
2799 }
2800
2801 HydroNode::Enumerate { input, .. } => {
2802 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2803
2804 let enumerate_ident =
2805 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2806
2807 match builders_or_callback {
2808 BuildersOrCallback::Builders(graph_builders) => {
2809 let builder = graph_builders.get_dfir_mut(&out_location);
2810 let lifetime = if input.metadata().location_kind.is_top_level() {
2811 quote!('static)
2812 } else {
2813 quote!('tick)
2814 };
2815 builder.add_dfir(
2816 parse_quote! {
2817 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
2818 },
2819 None,
2820 Some(&next_stmt_id.to_string()),
2821 );
2822 }
2823 BuildersOrCallback::Callback(_, node_callback) => {
2824 node_callback(self, next_stmt_id);
2825 }
2826 }
2827
2828 *next_stmt_id += 1;
2829
2830 enumerate_ident
2831 }
2832
2833 HydroNode::Inspect { f, input, .. } => {
2834 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2835
2836 let inspect_ident =
2837 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2838
2839 match builders_or_callback {
2840 BuildersOrCallback::Builders(graph_builders) => {
2841 let builder = graph_builders.get_dfir_mut(&out_location);
2842 builder.add_dfir(
2843 parse_quote! {
2844 #inspect_ident = #input_ident -> inspect(#f);
2845 },
2846 None,
2847 Some(&next_stmt_id.to_string()),
2848 );
2849 }
2850 BuildersOrCallback::Callback(_, node_callback) => {
2851 node_callback(self, next_stmt_id);
2852 }
2853 }
2854
2855 *next_stmt_id += 1;
2856
2857 inspect_ident
2858 }
2859
2860 HydroNode::Unique { input, .. } => {
2861 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2862
2863 let unique_ident =
2864 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2865
2866 match builders_or_callback {
2867 BuildersOrCallback::Builders(graph_builders) => {
2868 let builder = graph_builders.get_dfir_mut(&out_location);
2869 let lifetime = if input.metadata().location_kind.is_top_level() {
2870 quote!('static)
2871 } else {
2872 quote!('tick)
2873 };
2874
2875 builder.add_dfir(
2876 parse_quote! {
2877 #unique_ident = #input_ident -> unique::<#lifetime>();
2878 },
2879 None,
2880 Some(&next_stmt_id.to_string()),
2881 );
2882 }
2883 BuildersOrCallback::Callback(_, node_callback) => {
2884 node_callback(self, next_stmt_id);
2885 }
2886 }
2887
2888 *next_stmt_id += 1;
2889
2890 unique_ident
2891 }
2892
2893 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
2894 let operator: syn::Ident = if matches!(self, HydroNode::Fold { .. }) {
2895 parse_quote!(fold)
2896 } else if matches!(self, HydroNode::Scan { .. }) {
2897 parse_quote!(scan)
2898 } else {
2899 parse_quote!(fold_keyed)
2900 };
2901
2902 let (HydroNode::Fold { input, .. }
2903 | HydroNode::FoldKeyed { input, .. }
2904 | HydroNode::Scan { input, .. }) = self
2905 else {
2906 unreachable!()
2907 };
2908
2909 let (input, lifetime) =
2910 if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
2911 debug_assert!(!input.metadata().location_kind.is_top_level());
2912 (input, quote!('static))
2913 } else if input.metadata().location_kind.is_top_level() {
2914 (input, quote!('static))
2915 } else {
2916 (input, quote!('tick))
2917 };
2918
2919 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2920
2921 let (HydroNode::Fold { init, acc, .. }
2922 | HydroNode::FoldKeyed { init, acc, .. }
2923 | HydroNode::Scan { init, acc, .. }) = &*self
2924 else {
2925 unreachable!()
2926 };
2927
2928 let fold_ident =
2929 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2930
2931 match builders_or_callback {
2932 BuildersOrCallback::Builders(graph_builders) => {
2933 if matches!(self, HydroNode::Fold { .. })
2934 && self.metadata().location_kind.is_top_level()
2935 && !(matches!(self.metadata().location_kind, LocationId::Atomic(_)))
2936 && graph_builders.singleton_intermediates()
2937 {
2938 let builder = graph_builders.get_dfir_mut(&out_location);
2939
2940 let acc: syn::Expr = parse_quote!({
2941 let mut __inner = #acc;
2942 move |__state, __value| {
2943 __inner(__state, __value);
2944 Some(__state.clone())
2945 }
2946 });
2947
2948 builder.add_dfir(
2949 parse_quote! {
2950 source_iter([(#init)()]) -> [0]#fold_ident;
2951 #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
2952 #fold_ident = chain();
2953 },
2954 None,
2955 Some(&next_stmt_id.to_string()),
2956 );
2957 } else if matches!(self, HydroNode::FoldKeyed { .. })
2958 && self.metadata().location_kind.is_top_level()
2959 && !(matches!(self.metadata().location_kind, LocationId::Atomic(_)))
2960 && graph_builders.singleton_intermediates()
2961 {
2962 let builder = graph_builders.get_dfir_mut(&out_location);
2963
2964 let acc: syn::Expr = parse_quote!({
2965 let mut __init = #init;
2966 let mut __inner = #acc;
2967 move |__state, (__key, __value)| {
2968 let __state = __state.entry(__key.clone()).or_insert_with(|| (__init)());
2970 __inner(__state, __value);
2971 Some((__key, __state.clone()))
2972 }
2973 });
2974
2975 builder.add_dfir(
2976 parse_quote! {
2977 source_iter([(#init)()]) -> [0]#fold_ident;
2978 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
2979 },
2980 None,
2981 Some(&next_stmt_id.to_string()),
2982 );
2983 } else {
2984 let builder = graph_builders.get_dfir_mut(&out_location);
2985 builder.add_dfir(
2986 parse_quote! {
2987 #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
2988 },
2989 None,
2990 Some(&next_stmt_id.to_string()),
2991 );
2992 }
2993 }
2994 BuildersOrCallback::Callback(_, node_callback) => {
2995 node_callback(self, next_stmt_id);
2996 }
2997 }
2998
2999 *next_stmt_id += 1;
3000
3001 fold_ident
3002 }
3003
3004 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3005 let operator: syn::Ident = if matches!(self, HydroNode::Reduce { .. }) {
3006 parse_quote!(reduce)
3007 } else {
3008 parse_quote!(reduce_keyed)
3009 };
3010
3011 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = self
3012 else {
3013 unreachable!()
3014 };
3015
3016 let (input, lifetime) =
3017 if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
3018 debug_assert!(!input.metadata().location_kind.is_top_level());
3019 (input, quote!('static))
3020 } else if input.metadata().location_kind.is_top_level() {
3021 (input, quote!('static))
3022 } else {
3023 (input, quote!('tick))
3024 };
3025
3026 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
3027
3028 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*self
3029 else {
3030 unreachable!()
3031 };
3032
3033 let reduce_ident =
3034 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3035
3036 match builders_or_callback {
3037 BuildersOrCallback::Builders(graph_builders) => {
3038 if matches!(self, HydroNode::Reduce { .. })
3039 && self.metadata().location_kind.is_top_level()
3040 && !(matches!(self.metadata().location_kind, LocationId::Atomic(_)))
3041 && graph_builders.singleton_intermediates()
3042 {
3043 todo!(
3044 "Reduce with optional intermediates is not yet supported in simulator"
3045 );
3046 } else if matches!(self, HydroNode::ReduceKeyed { .. })
3047 && self.metadata().location_kind.is_top_level()
3048 && !(matches!(self.metadata().location_kind, LocationId::Atomic(_)))
3049 && graph_builders.singleton_intermediates()
3050 {
3051 todo!();
3052 } else {
3053 let builder = graph_builders.get_dfir_mut(&out_location);
3054 builder.add_dfir(
3055 parse_quote! {
3056 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3057 },
3058 None,
3059 Some(&next_stmt_id.to_string()),
3060 );
3061 }
3062 }
3063 BuildersOrCallback::Callback(_, node_callback) => {
3064 node_callback(self, next_stmt_id);
3065 }
3066 }
3067
3068 *next_stmt_id += 1;
3069
3070 reduce_ident
3071 }
3072
3073 HydroNode::ReduceKeyedWatermark {
3074 f,
3075 input,
3076 watermark,
3077 ..
3078 } => {
3079 let (input, lifetime) =
3080 if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
3081 debug_assert!(!input.metadata().location_kind.is_top_level());
3082 (input, quote!('static))
3083 } else if input.metadata().location_kind.is_top_level() {
3084 (input, quote!('static))
3085 } else {
3086 (input, quote!('tick))
3087 };
3088
3089 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
3090
3091 let watermark_ident =
3092 watermark.emit_core(builders_or_callback, built_tees, next_stmt_id);
3093
3094 let chain_ident = syn::Ident::new(
3095 &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3096 Span::call_site(),
3097 );
3098
3099 let fold_ident =
3100 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3101
3102 match builders_or_callback {
3103 BuildersOrCallback::Builders(graph_builders) => {
3104 let builder = graph_builders.get_dfir_mut(&out_location);
3105 builder.add_dfir(
3110 parse_quote! {
3111 #chain_ident = chain();
3112 #input_ident
3113 -> map(|x| (Some(x), None))
3114 -> [0]#chain_ident;
3115 #watermark_ident
3116 -> map(|watermark| (None, Some(watermark)))
3117 -> [1]#chain_ident;
3118
3119 #fold_ident = #chain_ident
3120 -> fold::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3121 let __reduce_keyed_fn = #f;
3122 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3123 if let Some((k, v)) = opt_payload {
3124 if let Some(curr_watermark) = *opt_curr_watermark {
3125 if k <= curr_watermark {
3126 return;
3127 }
3128 }
3129 match map.entry(k) {
3130 ::std::collections::hash_map::Entry::Vacant(e) => {
3131 e.insert(v);
3132 }
3133 ::std::collections::hash_map::Entry::Occupied(mut e) => {
3134 __reduce_keyed_fn(e.get_mut(), v);
3135 }
3136 }
3137 } else {
3138 let watermark = opt_watermark.unwrap();
3139 if let Some(curr_watermark) = *opt_curr_watermark {
3140 if watermark <= curr_watermark {
3141 return;
3142 }
3143 }
3144 *opt_curr_watermark = opt_watermark;
3145 map.retain(|k, _| *k > watermark);
3146 }
3147 }
3148 })
3149 -> flat_map(|(map, _curr_watermark)| map);
3150 },
3151 None,
3152 Some(&next_stmt_id.to_string()),
3153 );
3154 }
3155 BuildersOrCallback::Callback(_, node_callback) => {
3156 node_callback(self, next_stmt_id);
3157 }
3158 }
3159
3160 *next_stmt_id += 1;
3161
3162 fold_ident
3163 }
3164
3165 HydroNode::Network {
3166 serialize_fn: serialize_pipeline,
3167 instantiate_fn,
3168 deserialize_fn: deserialize_pipeline,
3169 input,
3170 ..
3171 } => {
3172 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
3173
3174 let receiver_stream_ident =
3175 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3176
3177 match builders_or_callback {
3178 BuildersOrCallback::Builders(graph_builders) => {
3179 let (sink_expr, source_expr) = match instantiate_fn {
3180 DebugInstantiate::Building => (
3181 syn::parse_quote!(DUMMY_SINK),
3182 syn::parse_quote!(DUMMY_SOURCE),
3183 ),
3184
3185 DebugInstantiate::Finalized(finalized) => {
3186 (finalized.sink.clone(), finalized.source.clone())
3187 }
3188 };
3189
3190 graph_builders.create_network(
3191 &input.metadata().location_kind,
3192 &out_location,
3193 input_ident,
3194 &receiver_stream_ident,
3195 serialize_pipeline,
3196 sink_expr,
3197 source_expr,
3198 deserialize_pipeline,
3199 *next_stmt_id,
3200 );
3201 }
3202 BuildersOrCallback::Callback(_, node_callback) => {
3203 node_callback(self, next_stmt_id);
3204 }
3205 }
3206
3207 *next_stmt_id += 1;
3208
3209 receiver_stream_ident
3210 }
3211
3212 HydroNode::ExternalInput {
3213 instantiate_fn,
3214 deserialize_fn: deserialize_pipeline,
3215 ..
3216 } => {
3217 let receiver_stream_ident =
3218 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3219
3220 match builders_or_callback {
3221 BuildersOrCallback::Builders(graph_builders) => {
3222 let (_, source_expr) = match instantiate_fn {
3223 DebugInstantiate::Building => (
3224 syn::parse_quote!(DUMMY_SINK),
3225 syn::parse_quote!(DUMMY_SOURCE),
3226 ),
3227
3228 DebugInstantiate::Finalized(finalized) => {
3229 (finalized.sink.clone(), finalized.source.clone())
3230 }
3231 };
3232
3233 graph_builders.create_external_source(
3234 &out_location,
3235 source_expr,
3236 &receiver_stream_ident,
3237 deserialize_pipeline,
3238 *next_stmt_id,
3239 );
3240 }
3241 BuildersOrCallback::Callback(_, node_callback) => {
3242 node_callback(self, next_stmt_id);
3243 }
3244 }
3245
3246 *next_stmt_id += 1;
3247
3248 receiver_stream_ident
3249 }
3250
3251 HydroNode::Counter {
3252 tag,
3253 duration,
3254 prefix,
3255 input,
3256 ..
3257 } => {
3258 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
3259
3260 let counter_ident =
3261 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3262
3263 match builders_or_callback {
3264 BuildersOrCallback::Builders(graph_builders) => {
3265 let builder = graph_builders.get_dfir_mut(&out_location);
3266 builder.add_dfir(
3267 parse_quote! {
3268 #counter_ident = #input_ident -> _counter(#tag, #duration, #prefix);
3269 },
3270 None,
3271 Some(&next_stmt_id.to_string()),
3272 );
3273 }
3274 BuildersOrCallback::Callback(_, node_callback) => {
3275 node_callback(self, next_stmt_id);
3276 }
3277 }
3278
3279 *next_stmt_id += 1;
3280
3281 counter_ident
3282 }
3283 }
3284 }
3285
3286 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
3287 match self {
3288 HydroNode::Placeholder => {
3289 panic!()
3290 }
3291 HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
3292 HydroNode::Source { source, .. } => match source {
3293 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
3294 HydroSource::ExternalNetwork() | HydroSource::Spin() => {}
3295 },
3296 HydroNode::SingletonSource { value, .. } => {
3297 transform(value);
3298 }
3299 HydroNode::CycleSource { .. }
3300 | HydroNode::Tee { .. }
3301 | HydroNode::Persist { .. }
3302 | HydroNode::YieldConcat { .. }
3303 | HydroNode::BeginAtomic { .. }
3304 | HydroNode::EndAtomic { .. }
3305 | HydroNode::Batch { .. }
3306 | HydroNode::Chain { .. }
3307 | HydroNode::ChainFirst { .. }
3308 | HydroNode::CrossProduct { .. }
3309 | HydroNode::CrossSingleton { .. }
3310 | HydroNode::ResolveFutures { .. }
3311 | HydroNode::ResolveFuturesOrdered { .. }
3312 | HydroNode::Join { .. }
3313 | HydroNode::Difference { .. }
3314 | HydroNode::AntiJoin { .. }
3315 | HydroNode::DeferTick { .. }
3316 | HydroNode::Enumerate { .. }
3317 | HydroNode::Unique { .. }
3318 | HydroNode::Sort { .. } => {}
3319 HydroNode::Map { f, .. }
3320 | HydroNode::FlatMap { f, .. }
3321 | HydroNode::Filter { f, .. }
3322 | HydroNode::FilterMap { f, .. }
3323 | HydroNode::Inspect { f, .. }
3324 | HydroNode::Reduce { f, .. }
3325 | HydroNode::ReduceKeyed { f, .. }
3326 | HydroNode::ReduceKeyedWatermark { f, .. } => {
3327 transform(f);
3328 }
3329 HydroNode::Fold { init, acc, .. }
3330 | HydroNode::Scan { init, acc, .. }
3331 | HydroNode::FoldKeyed { init, acc, .. } => {
3332 transform(init);
3333 transform(acc);
3334 }
3335 HydroNode::Network {
3336 serialize_fn,
3337 deserialize_fn,
3338 ..
3339 } => {
3340 if let Some(serialize_fn) = serialize_fn {
3341 transform(serialize_fn);
3342 }
3343 if let Some(deserialize_fn) = deserialize_fn {
3344 transform(deserialize_fn);
3345 }
3346 }
3347 HydroNode::ExternalInput { deserialize_fn, .. } => {
3348 if let Some(deserialize_fn) = deserialize_fn {
3349 transform(deserialize_fn);
3350 }
3351 }
3352 HydroNode::Counter { duration, .. } => {
3353 transform(duration);
3354 }
3355 }
3356 }
3357
3358 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
3359 &self.metadata().op
3360 }
3361
3362 pub fn metadata(&self) -> &HydroIrMetadata {
3363 match self {
3364 HydroNode::Placeholder => {
3365 panic!()
3366 }
3367 HydroNode::Cast { metadata, .. } => metadata,
3368 HydroNode::ObserveNonDet { metadata, .. } => metadata,
3369 HydroNode::Source { metadata, .. } => metadata,
3370 HydroNode::SingletonSource { metadata, .. } => metadata,
3371 HydroNode::CycleSource { metadata, .. } => metadata,
3372 HydroNode::Tee { metadata, .. } => metadata,
3373 HydroNode::Persist { metadata, .. } => metadata,
3374 HydroNode::YieldConcat { metadata, .. } => metadata,
3375 HydroNode::BeginAtomic { metadata, .. } => metadata,
3376 HydroNode::EndAtomic { metadata, .. } => metadata,
3377 HydroNode::Batch { metadata, .. } => metadata,
3378 HydroNode::Chain { metadata, .. } => metadata,
3379 HydroNode::ChainFirst { metadata, .. } => metadata,
3380 HydroNode::CrossProduct { metadata, .. } => metadata,
3381 HydroNode::CrossSingleton { metadata, .. } => metadata,
3382 HydroNode::Join { metadata, .. } => metadata,
3383 HydroNode::Difference { metadata, .. } => metadata,
3384 HydroNode::AntiJoin { metadata, .. } => metadata,
3385 HydroNode::ResolveFutures { metadata, .. } => metadata,
3386 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3387 HydroNode::Map { metadata, .. } => metadata,
3388 HydroNode::FlatMap { metadata, .. } => metadata,
3389 HydroNode::Filter { metadata, .. } => metadata,
3390 HydroNode::FilterMap { metadata, .. } => metadata,
3391 HydroNode::DeferTick { metadata, .. } => metadata,
3392 HydroNode::Enumerate { metadata, .. } => metadata,
3393 HydroNode::Inspect { metadata, .. } => metadata,
3394 HydroNode::Unique { metadata, .. } => metadata,
3395 HydroNode::Sort { metadata, .. } => metadata,
3396 HydroNode::Scan { metadata, .. } => metadata,
3397 HydroNode::Fold { metadata, .. } => metadata,
3398 HydroNode::FoldKeyed { metadata, .. } => metadata,
3399 HydroNode::Reduce { metadata, .. } => metadata,
3400 HydroNode::ReduceKeyed { metadata, .. } => metadata,
3401 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3402 HydroNode::ExternalInput { metadata, .. } => metadata,
3403 HydroNode::Network { metadata, .. } => metadata,
3404 HydroNode::Counter { metadata, .. } => metadata,
3405 }
3406 }
3407
3408 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
3409 &mut self.metadata_mut().op
3410 }
3411
3412 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
3413 match self {
3414 HydroNode::Placeholder => {
3415 panic!()
3416 }
3417 HydroNode::Cast { metadata, .. } => metadata,
3418 HydroNode::ObserveNonDet { metadata, .. } => metadata,
3419 HydroNode::Source { metadata, .. } => metadata,
3420 HydroNode::SingletonSource { metadata, .. } => metadata,
3421 HydroNode::CycleSource { metadata, .. } => metadata,
3422 HydroNode::Tee { metadata, .. } => metadata,
3423 HydroNode::Persist { metadata, .. } => metadata,
3424 HydroNode::YieldConcat { metadata, .. } => metadata,
3425 HydroNode::BeginAtomic { metadata, .. } => metadata,
3426 HydroNode::EndAtomic { metadata, .. } => metadata,
3427 HydroNode::Batch { metadata, .. } => metadata,
3428 HydroNode::Chain { metadata, .. } => metadata,
3429 HydroNode::ChainFirst { metadata, .. } => metadata,
3430 HydroNode::CrossProduct { metadata, .. } => metadata,
3431 HydroNode::CrossSingleton { metadata, .. } => metadata,
3432 HydroNode::Join { metadata, .. } => metadata,
3433 HydroNode::Difference { metadata, .. } => metadata,
3434 HydroNode::AntiJoin { metadata, .. } => metadata,
3435 HydroNode::ResolveFutures { metadata, .. } => metadata,
3436 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3437 HydroNode::Map { metadata, .. } => metadata,
3438 HydroNode::FlatMap { metadata, .. } => metadata,
3439 HydroNode::Filter { metadata, .. } => metadata,
3440 HydroNode::FilterMap { metadata, .. } => metadata,
3441 HydroNode::DeferTick { metadata, .. } => metadata,
3442 HydroNode::Enumerate { metadata, .. } => metadata,
3443 HydroNode::Inspect { metadata, .. } => metadata,
3444 HydroNode::Unique { metadata, .. } => metadata,
3445 HydroNode::Sort { metadata, .. } => metadata,
3446 HydroNode::Scan { metadata, .. } => metadata,
3447 HydroNode::Fold { metadata, .. } => metadata,
3448 HydroNode::FoldKeyed { metadata, .. } => metadata,
3449 HydroNode::Reduce { metadata, .. } => metadata,
3450 HydroNode::ReduceKeyed { metadata, .. } => metadata,
3451 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3452 HydroNode::ExternalInput { metadata, .. } => metadata,
3453 HydroNode::Network { metadata, .. } => metadata,
3454 HydroNode::Counter { metadata, .. } => metadata,
3455 }
3456 }
3457
3458 pub fn input(&self) -> Vec<&HydroNode> {
3459 match self {
3460 HydroNode::Placeholder => {
3461 panic!()
3462 }
3463 HydroNode::Source { .. }
3464 | HydroNode::SingletonSource { .. }
3465 | HydroNode::ExternalInput { .. }
3466 | HydroNode::CycleSource { .. }
3467 | HydroNode::Tee { .. } => {
3468 vec![]
3470 }
3471 HydroNode::Cast { inner, .. }
3472 | HydroNode::ObserveNonDet { inner, .. }
3473 | HydroNode::Persist { inner, .. }
3474 | HydroNode::YieldConcat { inner, .. }
3475 | HydroNode::BeginAtomic { inner, .. }
3476 | HydroNode::EndAtomic { inner, .. }
3477 | HydroNode::Batch { inner, .. } => {
3478 vec![inner]
3479 }
3480 HydroNode::Chain { first, second, .. } => {
3481 vec![first, second]
3482 }
3483 HydroNode::ChainFirst { first, second, .. } => {
3484 vec![first, second]
3485 }
3486 HydroNode::CrossProduct { left, right, .. }
3487 | HydroNode::CrossSingleton { left, right, .. }
3488 | HydroNode::Join { left, right, .. } => {
3489 vec![left, right]
3490 }
3491 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
3492 vec![pos, neg]
3493 }
3494 HydroNode::Map { input, .. }
3495 | HydroNode::FlatMap { input, .. }
3496 | HydroNode::Filter { input, .. }
3497 | HydroNode::FilterMap { input, .. }
3498 | HydroNode::Sort { input, .. }
3499 | HydroNode::DeferTick { input, .. }
3500 | HydroNode::Enumerate { input, .. }
3501 | HydroNode::Inspect { input, .. }
3502 | HydroNode::Unique { input, .. }
3503 | HydroNode::Network { input, .. }
3504 | HydroNode::Counter { input, .. }
3505 | HydroNode::ResolveFutures { input, .. }
3506 | HydroNode::ResolveFuturesOrdered { input, .. } => {
3507 vec![input]
3508 }
3509 HydroNode::Fold { input, .. }
3510 | HydroNode::FoldKeyed { input, .. }
3511 | HydroNode::Reduce { input, .. }
3512 | HydroNode::ReduceKeyed { input, .. }
3513 | HydroNode::Scan { input, .. } => {
3514 if let HydroNode::Persist { inner, .. } = input.as_ref() {
3516 vec![inner]
3517 } else {
3518 vec![input]
3519 }
3520 }
3521 HydroNode::ReduceKeyedWatermark {
3522 input, watermark, ..
3523 } => {
3524 if let HydroNode::Persist { inner, .. } = input.as_ref() {
3526 vec![inner, watermark]
3527 } else {
3528 vec![input, watermark]
3529 }
3530 }
3531 }
3532 }
3533
3534 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
3535 self.input()
3536 .iter()
3537 .map(|input_node| input_node.metadata())
3538 .collect()
3539 }
3540
3541 pub fn print_root(&self) -> String {
3542 match self {
3543 HydroNode::Placeholder => {
3544 panic!()
3545 }
3546 HydroNode::Cast { .. } => "Cast()".to_string(),
3547 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_string(),
3548 HydroNode::Source { source, .. } => format!("Source({:?})", source),
3549 HydroNode::SingletonSource { value, .. } => format!("SingletonSource({:?})", value),
3550 HydroNode::CycleSource { ident, .. } => format!("CycleSource({})", ident),
3551 HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
3552 HydroNode::Persist { .. } => "Persist()".to_string(),
3553 HydroNode::YieldConcat { .. } => "YieldConcat()".to_string(),
3554 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_string(),
3555 HydroNode::EndAtomic { .. } => "EndAtomic()".to_string(),
3556 HydroNode::Batch { .. } => "Batch()".to_string(),
3557 HydroNode::Chain { first, second, .. } => {
3558 format!("Chain({}, {})", first.print_root(), second.print_root())
3559 }
3560 HydroNode::ChainFirst { first, second, .. } => {
3561 format!(
3562 "ChainFirst({}, {})",
3563 first.print_root(),
3564 second.print_root()
3565 )
3566 }
3567 HydroNode::CrossProduct { left, right, .. } => {
3568 format!(
3569 "CrossProduct({}, {})",
3570 left.print_root(),
3571 right.print_root()
3572 )
3573 }
3574 HydroNode::CrossSingleton { left, right, .. } => {
3575 format!(
3576 "CrossSingleton({}, {})",
3577 left.print_root(),
3578 right.print_root()
3579 )
3580 }
3581 HydroNode::Join { left, right, .. } => {
3582 format!("Join({}, {})", left.print_root(), right.print_root())
3583 }
3584 HydroNode::Difference { pos, neg, .. } => {
3585 format!("Difference({}, {})", pos.print_root(), neg.print_root())
3586 }
3587 HydroNode::AntiJoin { pos, neg, .. } => {
3588 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
3589 }
3590 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_string(),
3591 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_string(),
3592 HydroNode::Map { f, .. } => format!("Map({:?})", f),
3593 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
3594 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
3595 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
3596 HydroNode::DeferTick { .. } => "DeferTick()".to_string(),
3597 HydroNode::Enumerate { .. } => "Enumerate()".to_string(),
3598 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
3599 HydroNode::Unique { .. } => "Unique()".to_string(),
3600 HydroNode::Sort { .. } => "Sort()".to_string(),
3601 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
3602 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
3603 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
3604 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
3605 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
3606 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
3607 HydroNode::Network { .. } => "Network()".to_string(),
3608 HydroNode::ExternalInput { .. } => "ExternalInput()".to_string(),
3609 HydroNode::Counter { tag, duration, .. } => {
3610 format!("Counter({:?}, {:?})", tag, duration)
3611 }
3612 }
3613 }
3614}
3615
3616#[cfg(feature = "build")]
3617fn instantiate_network<'a, D>(
3618 from_location: &LocationId,
3619 to_location: &LocationId,
3620 processes: &HashMap<usize, D::Process>,
3621 clusters: &HashMap<usize, D::Cluster>,
3622 compile_env: &D::CompileEnv,
3623) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
3624where
3625 D: Deploy<'a>,
3626{
3627 let ((sink, source), connect_fn) = match (from_location, to_location) {
3628 (LocationId::Process(from), LocationId::Process(to)) => {
3629 let from_node = processes
3630 .get(from)
3631 .unwrap_or_else(|| {
3632 panic!("A process used in the graph was not instantiated: {}", from)
3633 })
3634 .clone();
3635 let to_node = processes
3636 .get(to)
3637 .unwrap_or_else(|| {
3638 panic!("A process used in the graph was not instantiated: {}", to)
3639 })
3640 .clone();
3641
3642 let sink_port = D::allocate_process_port(&from_node);
3643 let source_port = D::allocate_process_port(&to_node);
3644
3645 (
3646 D::o2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3647 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
3648 )
3649 }
3650 (LocationId::Process(from), LocationId::Cluster(to)) => {
3651 let from_node = processes
3652 .get(from)
3653 .unwrap_or_else(|| {
3654 panic!("A process used in the graph was not instantiated: {}", from)
3655 })
3656 .clone();
3657 let to_node = clusters
3658 .get(to)
3659 .unwrap_or_else(|| {
3660 panic!("A cluster used in the graph was not instantiated: {}", to)
3661 })
3662 .clone();
3663
3664 let sink_port = D::allocate_process_port(&from_node);
3665 let source_port = D::allocate_cluster_port(&to_node);
3666
3667 (
3668 D::o2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3669 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
3670 )
3671 }
3672 (LocationId::Cluster(from), LocationId::Process(to)) => {
3673 let from_node = clusters
3674 .get(from)
3675 .unwrap_or_else(|| {
3676 panic!("A cluster used in the graph was not instantiated: {}", from)
3677 })
3678 .clone();
3679 let to_node = processes
3680 .get(to)
3681 .unwrap_or_else(|| {
3682 panic!("A process used in the graph was not instantiated: {}", to)
3683 })
3684 .clone();
3685
3686 let sink_port = D::allocate_cluster_port(&from_node);
3687 let source_port = D::allocate_process_port(&to_node);
3688
3689 (
3690 D::m2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3691 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
3692 )
3693 }
3694 (LocationId::Cluster(from), LocationId::Cluster(to)) => {
3695 let from_node = clusters
3696 .get(from)
3697 .unwrap_or_else(|| {
3698 panic!("A cluster used in the graph was not instantiated: {}", from)
3699 })
3700 .clone();
3701 let to_node = clusters
3702 .get(to)
3703 .unwrap_or_else(|| {
3704 panic!("A cluster used in the graph was not instantiated: {}", to)
3705 })
3706 .clone();
3707
3708 let sink_port = D::allocate_cluster_port(&from_node);
3709 let source_port = D::allocate_cluster_port(&to_node);
3710
3711 (
3712 D::m2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3713 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
3714 )
3715 }
3716 (LocationId::Tick(_, _), _) => panic!(),
3717 (_, LocationId::Tick(_, _)) => panic!(),
3718 (LocationId::Atomic(_), _) => panic!(),
3719 (_, LocationId::Atomic(_)) => panic!(),
3720 };
3721 (sink, source, connect_fn)
3722}
3723
3724#[cfg(test)]
3725mod test {
3726 use std::mem::size_of;
3727
3728 use stageleft::{QuotedWithContext, q};
3729
3730 use super::*;
3731
3732 #[test]
3733 fn hydro_node_size() {
3734 assert_eq!(size_of::<HydroNode>(), 264);
3735 }
3736
3737 #[test]
3738 fn hydro_root_size() {
3739 assert_eq!(size_of::<HydroRoot>(), 160);
3740 }
3741
3742 #[test]
3743 fn test_simplify_q_macro_basic() {
3744 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
3746 let result = simplify_q_macro(simple_expr.clone());
3747 assert_eq!(result, simple_expr);
3748 }
3749
3750 #[test]
3751 fn test_simplify_q_macro_actual_stageleft_call() {
3752 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
3754 let result = simplify_q_macro(stageleft_call);
3755 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3758 }
3759
3760 #[test]
3761 fn test_closure_no_pipe_at_start() {
3762 let stageleft_call = q!({
3764 let foo = 123;
3765 move |b: usize| b + foo
3766 })
3767 .splice_fn1_ctx(&());
3768 let result = simplify_q_macro(stageleft_call);
3769 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3770 }
3771}