aboutsummaryrefslogtreecommitdiff
path: root/ClojureCLR/Clojure/Clojure/Lib/Stream.cs
blob: e9fd531d0ee8a83090bb6f87c2ba73864bf80099 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
/**
 *   Copyright (c) David Miller. All rights reserved.
 *   The use and distribution terms for this software are covered by the
 *   Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
 *   which can be found in the file epl-v10.html at the root of this distribution.
 *   By using this software in any fashion, you are agreeing to be bound by
 * 	 the terms of this license.
 *   You must not remove this notice, or any other, from this software.
 **/

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Runtime.CompilerServices;

namespace clojure.lang
{
    public sealed class Stream : Seqable, Streamable, Sequential
    {
        #region Data

        static readonly ISeq NO_SEQ = new Cons(null, null);

        ISeq _sequence = NO_SEQ;
        readonly IFn _src;
	    readonly IFn _xform;
        //Cons _pushed = null;
        IFn _tap = null;

        #endregion

        #region C-tors and factory methods

        public Stream(IFn src)
        {
            _src = src;
            _xform = null;
        }

        public Stream(IFn xform, Stream src)
        {
            _src = src.tap();
            _xform = xform;
        }

        #endregion

        #region Seqable Members

        public ISeq seq()
        {
            return sequence().seq();
        }

        [MethodImpl(MethodImplOptions.Synchronized)]
        public ISeq sequence()
        {
            if (_sequence == NO_SEQ)
            {
                tap();
                _sequence = makeSequence(_tap);
            }
            return _sequence;
        }

        class Seqer : AFn
        {
            IFn _tap;

            public Seqer(IFn tap)
            {
                _tap = tap;
            }

            public override object invoke()
            {
                object  v = _tap.invoke();
                if (v == RT.EOS)
                    return null;
                return new Cons(v, new LazySeq(this));
            }
        }

        static ISeq makeSequence(IFn tap)
        {
            return RT.seq(new LazySeq(new Seqer(tap)));
        }

        #endregion

        #region Streamable Members

        [MethodImpl(MethodImplOptions.Synchronized)]
        public Stream stream()
        {
            return this;
        }

        #endregion

        #region Tapping

        [MethodImpl(MethodImplOptions.Synchronized)]
        public IFn tap() 
        {
        if (_tap != null)
            throw new InvalidOperationException("Stream already tapped");

        return _tap = makeTap(_xform, _src);
        }

        class Tapper : AFn
        {
            IFn _xform;
            IFn _src;

            public Tapper(IFn xform, IFn src)
            {
                _xform = xform;
                _src = src;
            }

            public override object invoke()
            {
                if (_xform == null)
                    return _src.invoke();

                object v;
                object xv;
                do
                {
                    v = _src.invoke();
                    if ( v == RT.EOS)
                        return v;
                    xv = _xform.invoke(v);
                } while (xv == RT.SKIP);

                return xv;
            }

        }
        
        static IFn makeTap(IFn xform,  IFn src)        
        {
		return new Tapper(xform,src);
		}
	

        #endregion
    }
}