datapath: Add ref counting for flows.
authorJesse Gross <jesse@nicira.com>
Sun, 29 Aug 2010 16:49:51 +0000 (09:49 -0700)
committerJesse Gross <jesse@nicira.com>
Wed, 22 Sep 2010 20:43:01 +0000 (13:43 -0700)
Currently flows are only used within the confines of one
rcu_read_lock()/rcu_read_unlock() session.  However, with the
addition of header caching we will need to hold references to flows
for longer periods of time.  This adds support for that by adding
refcounts to flows.  RCU is still used for normal packet handling
to avoid a performance impact from constantly updating the refcount.
However, instead of directly freeing the flow after a grace period
we simply decrement the refcount.

Signed-off-by: Jesse Gross <jesse@nicira.com>
Reviewed-by: Ben Pfaff <blp@nicira.com>
datapath/datapath.c
datapath/flow.c
datapath/flow.h

index 1677927ff4cf4beacba91f1e1647a77eaf0df942..06e1006a88c3a668be2ddfad7a47944ac8cd6d4c 100644 (file)
@@ -1109,7 +1109,8 @@ static int do_put_flow(struct datapath *dp, struct odp_flow_put *uf,
 error_free_flow_acts:
        kfree(flow->sf_acts);
 error_free_flow:
-       flow_free(flow);
+       flow->sf_acts = NULL;
+       flow_put(flow);
 error:
        return error;
 }
index 1f01166c5ef86d2cb842dbdda316b4f5293ae1a0..dfbf76938e0c8f7d1979384b8b0adcb1048318f9 100644 (file)
@@ -132,36 +132,27 @@ struct sw_flow *flow_alloc(void)
                return ERR_PTR(-ENOMEM);
 
        spin_lock_init(&flow->lock);
+       atomic_set(&flow->refcnt, 1);
+       flow->dead = false;
 
        return flow;
 }
 
-void flow_free(struct sw_flow *flow)
-{
-       if (unlikely(!flow))
-               return;
-
-       kmem_cache_free(flow_cache, flow);
-}
-
-/* Frees the entire 'flow' (both base and actions) immediately. */
-static void flow_free_full(struct sw_flow *flow)
-{
-       kfree(flow->sf_acts);
-       flow_free(flow);
-}
-
 void flow_free_tbl(struct tbl_node *node)
 {
        struct sw_flow *flow = flow_cast(node);
-       flow_free_full(flow);
+
+       flow->dead = true;
+       flow_put(flow);
 }
 
 /* RCU callback used by flow_deferred_free. */
 static void rcu_free_flow_callback(struct rcu_head *rcu)
 {
        struct sw_flow *flow = container_of(rcu, struct sw_flow, rcu);
-       flow_free_full(flow);
+
+       flow->dead = true;
+       flow_put(flow);
 }
 
 /* Schedules 'flow' to be freed after the next RCU grace period.
@@ -171,6 +162,22 @@ void flow_deferred_free(struct sw_flow *flow)
        call_rcu(&flow->rcu, rcu_free_flow_callback);
 }
 
+void flow_hold(struct sw_flow *flow)
+{
+       atomic_inc(&flow->refcnt);
+}
+
+void flow_put(struct sw_flow *flow)
+{
+       if (unlikely(!flow))
+               return;
+
+       if (atomic_dec_and_test(&flow->refcnt)) {
+               kfree(flow->sf_acts);
+               kmem_cache_free(flow_cache, flow);
+       }
+}
+
 /* RCU callback used by flow_deferred_free_acts. */
 static void rcu_free_acts_callback(struct rcu_head *rcu)
 {
index 484ca120778367be020ef40e05eb29de1d808102..3f434677bdac956c6450650f8d5b866b4cdc132c 100644 (file)
@@ -36,6 +36,9 @@ struct sw_flow {
        struct odp_flow_key key;
        struct sw_flow_actions *sf_acts;
 
+       atomic_t refcnt;
+       bool dead;
+
        spinlock_t lock;        /* Lock for values below. */
        unsigned long used;     /* Last used time (in jiffies). */
        u64 packet_count;       /* Number of packets matched. */
@@ -62,13 +65,15 @@ int flow_init(void);
 void flow_exit(void);
 
 struct sw_flow *flow_alloc(void);
-void flow_free(struct sw_flow *flow);
 void flow_deferred_free(struct sw_flow *);
 void flow_free_tbl(struct tbl_node *);
 
 struct sw_flow_actions *flow_actions_alloc(size_t n_actions);
 void flow_deferred_free_acts(struct sw_flow_actions *);
 
+void flow_hold(struct sw_flow *);
+void flow_put(struct sw_flow *);
+
 int flow_extract(struct sk_buff *, u16 in_port, struct odp_flow_key *);
 void flow_used(struct sw_flow *, struct sk_buff *);