/** * Copyright (c) Rich Hickey. All rights reserved. * The use and distribution terms for this software are covered by the * Common Public License 1.0 (http://opensource.org/licenses/cpl.php) * which can be found in the file CPL.TXT at the root of this distribution. * By using this software in any fashion, you are agreeing to be bound by * the terms of this license. * You must not remove this notice, or any other, from this software. **/ /* rich May 30, 2006 */ using System; using System.Threading; using System.Collections.Generic; namespace clojure.lang { public class Transaction{ public const int COMMITTED = 0; public const int WORKING = 1; static readonly Object lockObj = new Object(); [ThreadStatic] private static Transaction transaction; static volatile int tcount = 0; static Transaction getTransaction() { if(tcount == 0) return null; return transaction; } static void setTransaction(Transaction t) { transaction = t; } volatile static int nextSeq = 1; static int getNextSeq(){ lock (lockObj) { return nextSeq++; } } public class Info{ internal int seq; internal int status; internal Info(int seq,int status){ this.seq = seq; this.status = status; } } static Info bigbang = new Info(0,COMMITTED); Info info; int startSeq; Dictionary sets; Dictionary commutates; static public Object runInTransaction(ThreadLocalData tld,IFn fn) { if(getTransaction() != null) return fn.invoke(tld); Transaction t = new Transaction(); setTransaction(t); Interlocked.Increment(ref tcount); try { return t.run(fn); } finally{ setTransaction(null); Interlocked.Decrement(ref tcount); } } static public TRef tref(Object val) { Transaction trans = getTransaction(); TRef tref = new TRef(); if(trans == null) tref.push(val,bigbang); else trans.doSet(tref, val); return tref; } static public Object get(TRef tref) { Transaction trans = getTransaction(); if(trans != null) return trans.doGet(tref); return getCurrent(tref).val; } static public Object set(TRef tref, Object val) { return getTransaction().doSet(tref,val); } static public void touch(TRef tref) { getTransaction().doTouch(tref); } static public void commutate(TRef tref, IFn fn) { getTransaction().doCommutate(tref, fn); } Object run(IFn fn) { bool done = false; Object ret = null; List locks = null; List locked = null; while(!done){ try { ret = fn.invoke(); if(locks == null && (sets != null || commutates != null)) locks = new List(); if(sets != null) locks.AddRange(sets.Keys); if(commutates != null) locks.AddRange(commutates.Keys); if(locks != null) { if(locked == null) locked = new List(locks.Count); //lock in order, to avoid deadlocks locks.Sort(); foreach(TRef tref in locks) { //will block here Monitor.Enter(tref); locked.Add(tref); if(sets.ContainsKey(tref)) { //try again if the thing we are trying to set has changed since we started TVal curr = getCurrent(tref); if(curr != null && curr.tinfo.seq > startSeq) goto loop; } } } //at this point all write targets are locked //turn commutates into sets foreach(KeyValuePair e in commutates) { TRef tref = e.Key; //note this will npe if tref has never been set, as designed Object val = getCurrent(tref).val; for(ISeq c = e.Value;c!=null;c = c.rest()) { IFn f = (IFn) c.first(); val = f.invoke(val); } sets[tref] = val; } //set the new vals foreach(KeyValuePair entry in sets) { TRef tref = entry.Key; tref.push(entry.Value, info); } //atomic commit lock(lockObj){ info.seq = getNextSeq(); info.status = COMMITTED; } done = true; loop: ; } finally{ if(locked != null) { foreach(TRef tref in locked) { Monitor.Exit(tref); } locked.Clear(); } reset(); if(locks != null) locks.Clear(); } } return ret; } private void reset(){ if(sets != null) sets.Clear(); if(commutates != null) commutates.Clear(); } Transaction(){ lock(lockObj){ int seq = getNextSeq(); this.info = new Info(seq, WORKING); this.startSeq = seq; } } Object doGet(TRef tref) { if(sets != null && sets.ContainsKey(tref)) return sets[tref]; for(TVal ver = tref;ver != null;ver = ver.prior) { //note this will npe if tref has never been set, as designed if(ver.tinfo.status == COMMITTED && ver.tinfo.seq <= startSeq) return ver.val; } throw new Exception("Version not found"); } static TVal getCurrent(TRef tref) { for(TVal ver = tref;ver != null;ver = ver.prior) { if(ver.tinfo != null && ver.tinfo.status == COMMITTED) return ver; } //this return only if no value was ever successfully set return null; } Object doSet(TRef tref, Object val) { if(sets == null) sets = new Dictionary(); if(commutates != null && commutates.ContainsKey(tref)) throw new Exception("Can't commutate and set a TRef in the same transaction"); sets[tref] =val; return val; } void doTouch(TRef tref) { doSet(tref, doGet(tref)); } void doCommutate(TRef tref, IFn fn) { if(commutates == null) commutates = new Dictionary(); if(sets != null && sets.ContainsKey(tref)) throw new Exception("Can't commutate and set a TRef in the same transaction"); commutates[tref] = RT.cons(fn, commutates[tref]); } } }