itertools/
groupbylazy.rs

1use std::cell::{Cell, RefCell};
2use std::vec;
3
4/// A trait to unify FnMut for GroupBy with the chunk key in IntoChunks
5trait KeyFunction<A> {
6    type Key;
7    fn call_mut(&mut self, arg: A) -> Self::Key;
8}
9
10impl<'a, A, K, F: ?Sized> KeyFunction<A> for F
11    where F: FnMut(A) -> K
12{
13    type Key = K;
14    #[inline]
15    fn call_mut(&mut self, arg: A) -> Self::Key {
16        (*self)(arg)
17    }
18}
19
20
21/// ChunkIndex acts like the grouping key function for IntoChunks
22#[derive(Debug)]
23struct ChunkIndex {
24    size: usize,
25    index: usize,
26    key: usize,
27}
28
29impl ChunkIndex {
30    #[inline(always)]
31    fn new(size: usize) -> Self {
32        ChunkIndex {
33            size: size,
34            index: 0,
35            key: 0,
36        }
37    }
38}
39
40impl<'a, A> KeyFunction<A> for ChunkIndex {
41    type Key = usize;
42    #[inline(always)]
43    fn call_mut(&mut self, _arg: A) -> Self::Key {
44        if self.index == self.size {
45            self.key += 1;
46            self.index = 0;
47        }
48        self.index += 1;
49        self.key
50    }
51}
52
53
54struct GroupInner<K, I, F>
55    where I: Iterator
56{
57    key: F,
58    iter: I,
59    current_key: Option<K>,
60    current_elt: Option<I::Item>,
61    /// flag set if iterator is exhausted
62    done: bool,
63    /// Index of group we are currently buffering or visiting
64    top_group: usize,
65    /// Least index for which we still have elements buffered
66    oldest_buffered_group: usize,
67    /// Group index for `buffer[0]` -- the slots
68    /// bottom_group..oldest_buffered_group are unused and will be erased when
69    /// that range is large enough.
70    bottom_group: usize,
71    /// Buffered groups, from `bottom_group` (index 0) to `top_group`.
72    buffer: Vec<vec::IntoIter<I::Item>>,
73    /// index of last group iter that was dropped, usize::MAX == none
74    dropped_group: usize,
75}
76
77impl<K, I, F> GroupInner<K, I, F>
78    where I: Iterator,
79          F: for<'a> KeyFunction<&'a I::Item, Key=K>,
80          K: PartialEq,
81{
82    /// `client`: Index of group that requests next element
83    #[inline(always)]
84    fn step(&mut self, client: usize) -> Option<I::Item> {
85        /*
86        println!("client={}, bottom_group={}, oldest_buffered_group={}, top_group={}, buffers=[{}]",
87                 client, self.bottom_group, self.oldest_buffered_group,
88                 self.top_group,
89                 self.buffer.iter().map(|elt| elt.len()).format(", "));
90        */
91        if client < self.oldest_buffered_group {
92            None
93        } else if client < self.top_group ||
94            (client == self.top_group &&
95             self.buffer.len() > self.top_group - self.bottom_group)
96        {
97            self.lookup_buffer(client)
98        } else if self.done {
99            None
100        } else if self.top_group == client {
101            self.step_current()
102        } else {
103            self.step_buffering(client)
104        }
105    }
106
107    #[inline(never)]
108    fn lookup_buffer(&mut self, client: usize) -> Option<I::Item> {
109        // if `bufidx` doesn't exist in self.buffer, it might be empty
110        let bufidx = client - self.bottom_group;
111        if client < self.oldest_buffered_group {
112            return None;
113        }
114        let elt = self.buffer.get_mut(bufidx).and_then(|queue| queue.next());
115        if elt.is_none() && client == self.oldest_buffered_group {
116            // FIXME: VecDeque is unfortunately not zero allocation when empty,
117            // so we do this job manually.
118            // `bottom_group..oldest_buffered_group` is unused, and if it's large enough, erase it.
119            self.oldest_buffered_group += 1;
120            // skip forward further empty queues too
121            while self.buffer.get(self.oldest_buffered_group - self.bottom_group)
122                             .map_or(false, |buf| buf.len() == 0)
123            {
124                self.oldest_buffered_group += 1;
125            }
126
127            let nclear = self.oldest_buffered_group - self.bottom_group;
128            if nclear > 0 && nclear >= self.buffer.len() / 2 {
129                let mut i = 0;
130                self.buffer.retain(|buf| {
131                    i += 1;
132                    debug_assert!(buf.len() == 0 || i > nclear);
133                    i > nclear
134                });
135                self.bottom_group = self.oldest_buffered_group;
136            }
137        }
138        elt
139    }
140
141    /// Take the next element from the iterator, and set the done
142    /// flag if exhausted. Must not be called after done.
143    #[inline(always)]
144    fn next_element(&mut self) -> Option<I::Item> {
145        debug_assert!(!self.done);
146        match self.iter.next() {
147            None => { self.done = true; None }
148            otherwise => otherwise,
149        }
150    }
151
152
153    #[inline(never)]
154    fn step_buffering(&mut self, client: usize) -> Option<I::Item> {
155        // requested a later group -- walk through the current group up to
156        // the requested group index, and buffer the elements (unless
157        // the group is marked as dropped).
158        // Because the `Groups` iterator is always the first to request
159        // each group index, client is the next index efter top_group.
160        debug_assert!(self.top_group + 1 == client);
161        let mut group = Vec::new();
162
163        if let Some(elt) = self.current_elt.take() {
164            if self.top_group != self.dropped_group {
165                group.push(elt);
166            }
167        }
168        let mut first_elt = None; // first element of the next group
169
170        while let Some(elt) = self.next_element() {
171            let key = self.key.call_mut(&elt);
172            match self.current_key.take() {
173                None => {}
174                Some(old_key) => if old_key != key {
175                    self.current_key = Some(key);
176                    first_elt = Some(elt);
177                    break;
178                },
179            }
180            self.current_key = Some(key);
181            if self.top_group != self.dropped_group {
182                group.push(elt);
183            }
184        }
185
186        if self.top_group != self.dropped_group {
187            self.push_next_group(group);
188        }
189        if first_elt.is_some() {
190            self.top_group += 1;
191            debug_assert!(self.top_group == client);
192        }
193        first_elt
194    }
195
196    fn push_next_group(&mut self, group: Vec<I::Item>) {
197        // When we add a new buffered group, fill up slots between oldest_buffered_group and top_group
198        while self.top_group - self.bottom_group > self.buffer.len() {
199            if self.buffer.is_empty() {
200                self.bottom_group += 1;
201                self.oldest_buffered_group += 1;
202            } else {
203                self.buffer.push(Vec::new().into_iter());
204            }
205        }
206        self.buffer.push(group.into_iter());
207        debug_assert!(self.top_group + 1 - self.bottom_group == self.buffer.len());
208    }
209
210    /// This is the immediate case, where we use no buffering
211    #[inline]
212    fn step_current(&mut self) -> Option<I::Item> {
213        debug_assert!(!self.done);
214        if let elt @ Some(..) = self.current_elt.take() {
215            return elt;
216        }
217        match self.next_element() {
218            None => None,
219            Some(elt) => {
220                let key = self.key.call_mut(&elt);
221                match self.current_key.take() {
222                    None => {}
223                    Some(old_key) => if old_key != key {
224                        self.current_key = Some(key);
225                        self.current_elt = Some(elt);
226                        self.top_group += 1;
227                        return None;
228                    },
229                }
230                self.current_key = Some(key);
231                Some(elt)
232            }
233        }
234    }
235
236    /// Request the just started groups' key.
237    ///
238    /// `client`: Index of group
239    ///
240    /// **Panics** if no group key is available.
241    fn group_key(&mut self, client: usize) -> K {
242        // This can only be called after we have just returned the first
243        // element of a group.
244        // Perform this by simply buffering one more element, grabbing the
245        // next key.
246        debug_assert!(!self.done);
247        debug_assert!(client == self.top_group);
248        debug_assert!(self.current_key.is_some());
249        debug_assert!(self.current_elt.is_none());
250        let old_key = self.current_key.take().unwrap();
251        if let Some(elt) = self.next_element() {
252            let key = self.key.call_mut(&elt);
253            if old_key != key {
254                self.top_group += 1;
255            }
256            self.current_key = Some(key);
257            self.current_elt = Some(elt);
258        }
259        old_key
260    }
261}
262
263impl<K, I, F> GroupInner<K, I, F>
264    where I: Iterator,
265{
266    /// Called when a group is dropped
267    fn drop_group(&mut self, client: usize) {
268        // It's only useful to track the maximal index
269        if self.dropped_group == !0 || client > self.dropped_group {
270            self.dropped_group = client;
271        }
272    }
273}
274
275/// `GroupBy` is the storage for the lazy grouping operation.
276///
277/// If the groups are consumed in their original order, or if each
278/// group is dropped without keeping it around, then `GroupBy` uses
279/// no allocations. It needs allocations only if several group iterators
280/// are alive at the same time.
281///
282/// This type implements `IntoIterator` (it is **not** an iterator
283/// itself), because the group iterators need to borrow from this
284/// value. It should be stored in a local variable or temporary and
285/// iterated.
286///
287/// See [`.group_by()`](../trait.Itertools.html#method.group_by) for more information.
288#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
289pub struct GroupBy<K, I, F>
290    where I: Iterator,
291{
292    inner: RefCell<GroupInner<K, I, F>>,
293    // the group iterator's current index. Keep this in the main value
294    // so that simultaneous iterators all use the same state.
295    index: Cell<usize>,
296}
297
298/// Create a new
299pub fn new<K, J, F>(iter: J, f: F) -> GroupBy<K, J::IntoIter, F>
300    where J: IntoIterator,
301          F: FnMut(&J::Item) -> K,
302{
303    GroupBy {
304        inner: RefCell::new(GroupInner {
305            key: f,
306            iter: iter.into_iter(),
307            current_key: None,
308            current_elt: None,
309            done: false,
310            top_group: 0,
311            oldest_buffered_group: 0,
312            bottom_group: 0,
313            buffer: Vec::new(),
314            dropped_group: !0,
315        }),
316        index: Cell::new(0),
317    }
318}
319
320impl<K, I, F> GroupBy<K, I, F>
321    where I: Iterator,
322{
323    /// `client`: Index of group that requests next element
324    fn step(&self, client: usize) -> Option<I::Item>
325        where F: FnMut(&I::Item) -> K,
326              K: PartialEq,
327    {
328        self.inner.borrow_mut().step(client)
329    }
330
331    /// `client`: Index of group
332    fn drop_group(&self, client: usize) {
333        self.inner.borrow_mut().drop_group(client)
334    }
335}
336
337impl<'a, K, I, F> IntoIterator for &'a GroupBy<K, I, F>
338    where I: Iterator,
339          I::Item: 'a,
340          F: FnMut(&I::Item) -> K,
341          K: PartialEq
342{
343    type Item = (K, Group<'a, K, I, F>);
344    type IntoIter = Groups<'a, K, I, F>;
345
346    fn into_iter(self) -> Self::IntoIter {
347        Groups { parent: self }
348    }
349}
350
351
352/// An iterator that yields the Group iterators.
353///
354/// Iterator element type is `(K, Group)`:
355/// the group's key `K` and the group's iterator.
356///
357/// See [`.group_by()`](../trait.Itertools.html#method.group_by) for more information.
358#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
359pub struct Groups<'a, K: 'a, I: 'a, F: 'a>
360    where I: Iterator,
361          I::Item: 'a
362{
363    parent: &'a GroupBy<K, I, F>,
364}
365
366impl<'a, K, I, F> Iterator for Groups<'a, K, I, F>
367    where I: Iterator,
368          I::Item: 'a,
369          F: FnMut(&I::Item) -> K,
370          K: PartialEq
371{
372    type Item = (K, Group<'a, K, I, F>);
373
374    #[inline]
375    fn next(&mut self) -> Option<Self::Item> {
376        let index = self.parent.index.get();
377        self.parent.index.set(index + 1);
378        let inner = &mut *self.parent.inner.borrow_mut();
379        inner.step(index).map(|elt| {
380            let key = inner.group_key(index);
381            (key, Group {
382                parent: self.parent,
383                index: index,
384                first: Some(elt),
385            })
386        })
387    }
388}
389
390/// An iterator for the elements in a single group.
391///
392/// Iterator element type is `I::Item`.
393pub struct Group<'a, K: 'a, I: 'a, F: 'a>
394    where I: Iterator,
395          I::Item: 'a,
396{
397    parent: &'a GroupBy<K, I, F>,
398    index: usize,
399    first: Option<I::Item>,
400}
401
402impl<'a, K, I, F> Drop for Group<'a, K, I, F>
403    where I: Iterator,
404          I::Item: 'a,
405{
406    fn drop(&mut self) {
407        self.parent.drop_group(self.index);
408    }
409}
410
411impl<'a, K, I, F> Iterator for Group<'a, K, I, F>
412    where I: Iterator,
413          I::Item: 'a,
414          F: FnMut(&I::Item) -> K,
415          K: PartialEq,
416{
417    type Item = I::Item;
418    #[inline]
419    fn next(&mut self) -> Option<Self::Item> {
420        if let elt @ Some(..) = self.first.take() {
421            return elt;
422        }
423        self.parent.step(self.index)
424    }
425}
426
427///// IntoChunks /////
428
429/// Create a new
430pub fn new_chunks<J>(iter: J, size: usize) -> IntoChunks<J::IntoIter>
431    where J: IntoIterator,
432{
433    IntoChunks {
434        inner: RefCell::new(GroupInner {
435            key: ChunkIndex::new(size),
436            iter: iter.into_iter(),
437            current_key: None,
438            current_elt: None,
439            done: false,
440            top_group: 0,
441            oldest_buffered_group: 0,
442            bottom_group: 0,
443            buffer: Vec::new(),
444            dropped_group: !0,
445        }),
446        index: Cell::new(0),
447    }
448}
449
450
451/// `ChunkLazy` is the storage for a lazy chunking operation.
452///
453/// `IntoChunks` behaves just like `GroupBy`: it is iterable, and
454/// it only buffers if several chunk iterators are alive at the same time.
455///
456/// This type implements `IntoIterator` (it is **not** an iterator
457/// itself), because the chunk iterators need to borrow from this
458/// value. It should be stored in a local variable or temporary and
459/// iterated.
460///
461/// Iterator element type is `Chunk`, each chunk's iterator.
462///
463/// See [`.chunks()`](../trait.Itertools.html#method.chunks) for more information.
464#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
465pub struct IntoChunks<I>
466    where I: Iterator,
467{
468    inner: RefCell<GroupInner<usize, I, ChunkIndex>>,
469    // the chunk iterator's current index. Keep this in the main value
470    // so that simultaneous iterators all use the same state.
471    index: Cell<usize>,
472}
473
474
475impl<I> IntoChunks<I>
476    where I: Iterator,
477{
478    /// `client`: Index of chunk that requests next element
479    fn step(&self, client: usize) -> Option<I::Item> {
480        self.inner.borrow_mut().step(client)
481    }
482
483    /// `client`: Index of chunk
484    fn drop_group(&self, client: usize) {
485        self.inner.borrow_mut().drop_group(client)
486    }
487}
488
489impl<'a, I> IntoIterator for &'a IntoChunks<I>
490    where I: Iterator,
491          I::Item: 'a,
492{
493    type Item = Chunk<'a, I>;
494    type IntoIter = Chunks<'a, I>;
495
496    fn into_iter(self) -> Self::IntoIter {
497        Chunks {
498            parent: self,
499        }
500    }
501}
502
503
504/// An iterator that yields the Chunk iterators.
505///
506/// Iterator element type is `Chunk`.
507///
508/// See [`.chunks()`](../trait.Itertools.html#method.chunks) for more information.
509#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
510pub struct Chunks<'a, I: 'a>
511    where I: Iterator,
512          I::Item: 'a,
513{
514    parent: &'a IntoChunks<I>,
515}
516
517impl<'a, I> Iterator for Chunks<'a, I>
518    where I: Iterator,
519          I::Item: 'a,
520{
521    type Item = Chunk<'a, I>;
522
523    #[inline]
524    fn next(&mut self) -> Option<Self::Item> {
525        let index = self.parent.index.get();
526        self.parent.index.set(index + 1);
527        let inner = &mut *self.parent.inner.borrow_mut();
528        inner.step(index).map(|elt| {
529            Chunk {
530                parent: self.parent,
531                index: index,
532                first: Some(elt),
533            }
534        })
535    }
536}
537
538/// An iterator for the elements in a single chunk.
539///
540/// Iterator element type is `I::Item`.
541pub struct Chunk<'a, I: 'a>
542    where I: Iterator,
543          I::Item: 'a,
544{
545    parent: &'a IntoChunks<I>,
546    index: usize,
547    first: Option<I::Item>,
548}
549
550impl<'a, I> Drop for Chunk<'a, I>
551    where I: Iterator,
552          I::Item: 'a,
553{
554    fn drop(&mut self) {
555        self.parent.drop_group(self.index);
556    }
557}
558
559impl<'a, I> Iterator for Chunk<'a, I>
560    where I: Iterator,
561          I::Item: 'a,
562{
563    type Item = I::Item;
564    #[inline]
565    fn next(&mut self) -> Option<Self::Item> {
566        if let elt @ Some(..) = self.first.take() {
567            return elt;
568        }
569        self.parent.step(self.index)
570    }
571}