Merge branch 'tipc_net-next' of git://git.kernel.org/pub/scm/linux/kernel/git/paulg...
authorDavid S. Miller <davem@davemloft.net>
Fri, 23 Nov 2012 19:08:43 +0000 (14:08 -0500)
committerDavid S. Miller <davem@davemloft.net>
Fri, 23 Nov 2012 19:09:27 +0000 (14:09 -0500)
Paul Gortmaker says:

====================
The most interesting thing here, at least from a user perspective,
is the broadcast link fix -- where there was a corner case where
two endpoints could get in a state where they disagree on where
to start Rx and ack of broadcast packets.

There is also the poll/wait changes which could also impact
end users for certain use cases - the fixes there also better
align tipc with the rest of the networking code.

The rest largely falls into routine cleanup category, by getting
rid of some unused routines, some Kconfig clutter, etc.
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
12 files changed:
net/tipc/Kconfig
net/tipc/bcast.c
net/tipc/bearer.c
net/tipc/bearer.h
net/tipc/core.c
net/tipc/discover.c
net/tipc/link.c
net/tipc/link.h
net/tipc/name_distr.c
net/tipc/node.c
net/tipc/node.h
net/tipc/socket.c

index 585460180ffb22df0ac0ae966ecc49cf87dcb8cc..bc41bd31eadc064a112fddb3066a9ee9d3405b9d 100644 (file)
@@ -20,18 +20,9 @@ menuconfig TIPC
 
          If in doubt, say N.
 
-if TIPC
-
-config TIPC_ADVANCED
-       bool "Advanced TIPC configuration"
-       default n
-       help
-         Saying Y here will open some advanced configuration for TIPC.
-         Most users do not need to bother; if unsure, just say N.
-
 config TIPC_PORTS
        int "Maximum number of ports in a node"
-       depends on TIPC_ADVANCED
+       depends on TIPC
        range 127 65535
        default "8191"
        help
@@ -40,5 +31,3 @@ config TIPC_PORTS
 
          Setting this to a smaller value saves some memory,
          setting it to higher allows for more ports.
-
-endif # TIPC
index e4e6d8cd47e6fb8ec63c2cd6893c663b79f3bb0c..54f89f90ac33e2b795c9e48c01e625c8248f4d18 100644 (file)
@@ -347,7 +347,7 @@ static void bclink_peek_nack(struct tipc_msg *msg)
 
        tipc_node_lock(n_ptr);
 
-       if (n_ptr->bclink.supported &&
+       if (n_ptr->bclink.recv_permitted &&
            (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) &&
            (n_ptr->bclink.last_in == msg_bcgap_after(msg)))
                n_ptr->bclink.oos_state = 2;
@@ -429,7 +429,7 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
                goto exit;
 
        tipc_node_lock(node);
-       if (unlikely(!node->bclink.supported))
+       if (unlikely(!node->bclink.recv_permitted))
                goto unlock;
 
        /* Handle broadcast protocol message */
@@ -564,7 +564,7 @@ exit:
 
 u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr)
 {
-       return (n_ptr->bclink.supported &&
+       return (n_ptr->bclink.recv_permitted &&
                (tipc_bclink_get_last_sent() != n_ptr->bclink.acked));
 }
 
@@ -619,16 +619,14 @@ static int tipc_bcbearer_send(struct sk_buff *buf,
                if (bcbearer->remains_new.count == bcbearer->remains.count)
                        continue;       /* bearer pair doesn't add anything */
 
-               if (p->blocked ||
-                   p->media->send_msg(buf, p, &p->media->bcast_addr)) {
+               if (!tipc_bearer_blocked(p))
+                       tipc_bearer_send(p, buf, &p->media->bcast_addr);
+               else if (s && !tipc_bearer_blocked(s))
                        /* unable to send on primary bearer */
-                       if (!s || s->blocked ||
-                           s->media->send_msg(buf, s,
-                                              &s->media->bcast_addr)) {
-                               /* unable to send on either bearer */
-                               continue;
-                       }
-               }
+                       tipc_bearer_send(s, buf, &s->media->bcast_addr);
+               else
+                       /* unable to send on either bearer */
+                       continue;
 
                if (s) {
                        bcbearer->bpairs[bp_index].primary = s;
@@ -731,8 +729,8 @@ int tipc_bclink_stats(char *buf, const u32 buf_size)
                             "  TX naks:%u acks:%u dups:%u\n",
                             s->sent_nacks, s->sent_acks, s->retransmitted);
        ret += tipc_snprintf(buf + ret, buf_size - ret,
-                            "  Congestion bearer:%u link:%u  Send queue max:%u avg:%u\n",
-                            s->bearer_congs, s->link_congs, s->max_queue_sz,
+                            "  Congestion link:%u  Send queue max:%u avg:%u\n",
+                            s->link_congs, s->max_queue_sz,
                             s->queue_sz_counts ?
                             (s->accu_queue_sz / s->queue_sz_counts) : 0);
 
@@ -766,7 +764,6 @@ int tipc_bclink_set_queue_limits(u32 limit)
 
 void tipc_bclink_init(void)
 {
-       INIT_LIST_HEAD(&bcbearer->bearer.cong_links);
        bcbearer->bearer.media = &bcbearer->media;
        bcbearer->media.send_msg = tipc_bcbearer_send;
        sprintf(bcbearer->media.name, "tipc-broadcast");
index 4ec5c80e8a7ca0b20c7291db9641636f4dc0a3a6..aa62f93a91275b355b994c66318fddb8b42e1af6 100644 (file)
@@ -279,115 +279,30 @@ void tipc_bearer_remove_dest(struct tipc_bearer *b_ptr, u32 dest)
 }
 
 /*
- * bearer_push(): Resolve bearer congestion. Force the waiting
- * links to push out their unsent packets, one packet per link
- * per iteration, until all packets are gone or congestion reoccurs.
- * 'tipc_net_lock' is read_locked when this function is called
- * bearer.lock must be taken before calling
- * Returns binary true(1) ore false(0)
- */
-static int bearer_push(struct tipc_bearer *b_ptr)
-{
-       u32 res = 0;
-       struct tipc_link *ln, *tln;
-
-       if (b_ptr->blocked)
-               return 0;
-
-       while (!list_empty(&b_ptr->cong_links) && (res != PUSH_FAILED)) {
-               list_for_each_entry_safe(ln, tln, &b_ptr->cong_links, link_list) {
-                       res = tipc_link_push_packet(ln);
-                       if (res == PUSH_FAILED)
-                               break;
-                       if (res == PUSH_FINISHED)
-                               list_move_tail(&ln->link_list, &b_ptr->links);
-               }
-       }
-       return list_empty(&b_ptr->cong_links);
-}
-
-void tipc_bearer_lock_push(struct tipc_bearer *b_ptr)
-{
-       spin_lock_bh(&b_ptr->lock);
-       bearer_push(b_ptr);
-       spin_unlock_bh(&b_ptr->lock);
-}
-
-
-/*
- * Interrupt enabling new requests after bearer congestion or blocking:
+ * Interrupt enabling new requests after bearer blocking:
  * See bearer_send().
  */
-void tipc_continue(struct tipc_bearer *b_ptr)
+void tipc_continue(struct tipc_bearer *b)
 {
-       spin_lock_bh(&b_ptr->lock);
-       if (!list_empty(&b_ptr->cong_links))
-               tipc_k_signal((Handler)tipc_bearer_lock_push, (unsigned long)b_ptr);
-       b_ptr->blocked = 0;
-       spin_unlock_bh(&b_ptr->lock);
+       spin_lock_bh(&b->lock);
+       b->blocked = 0;
+       spin_unlock_bh(&b->lock);
 }
 
 /*
- * Schedule link for sending of messages after the bearer
- * has been deblocked by 'continue()'. This method is called
- * when somebody tries to send a message via this link while
- * the bearer is congested. 'tipc_net_lock' is in read_lock here
- * bearer.lock is busy
+ * tipc_bearer_blocked - determines if bearer is currently blocked
  */
-static void tipc_bearer_schedule_unlocked(struct tipc_bearer *b_ptr,
-                                               struct tipc_link *l_ptr)
+int tipc_bearer_blocked(struct tipc_bearer *b)
 {
-       list_move_tail(&l_ptr->link_list, &b_ptr->cong_links);
-}
-
-/*
- * Schedule link for sending of messages after the bearer
- * has been deblocked by 'continue()'. This method is called
- * when somebody tries to send a message via this link while
- * the bearer is congested. 'tipc_net_lock' is in read_lock here,
- * bearer.lock is free
- */
-void tipc_bearer_schedule(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr)
-{
-       spin_lock_bh(&b_ptr->lock);
-       tipc_bearer_schedule_unlocked(b_ptr, l_ptr);
-       spin_unlock_bh(&b_ptr->lock);
-}
-
+       int res;
 
-/*
- * tipc_bearer_resolve_congestion(): Check if there is bearer congestion,
- * and if there is, try to resolve it before returning.
- * 'tipc_net_lock' is read_locked when this function is called
- */
-int tipc_bearer_resolve_congestion(struct tipc_bearer *b_ptr,
-                                       struct tipc_link *l_ptr)
-{
-       int res = 1;
+       spin_lock_bh(&b->lock);
+       res = b->blocked;
+       spin_unlock_bh(&b->lock);
 
-       if (list_empty(&b_ptr->cong_links))
-               return 1;
-       spin_lock_bh(&b_ptr->lock);
-       if (!bearer_push(b_ptr)) {
-               tipc_bearer_schedule_unlocked(b_ptr, l_ptr);
-               res = 0;
-       }
-       spin_unlock_bh(&b_ptr->lock);
        return res;
 }
 
-/**
- * tipc_bearer_congested - determines if bearer is currently congested
- */
-int tipc_bearer_congested(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr)
-{
-       if (unlikely(b_ptr->blocked))
-               return 1;
-       if (likely(list_empty(&b_ptr->cong_links)))
-               return 0;
-       return !tipc_bearer_resolve_congestion(b_ptr, l_ptr);
-}
-
 /**
  * tipc_enable_bearer - enable bearer with the given name
  */
@@ -489,7 +404,6 @@ restart:
        b_ptr->net_plane = bearer_id + 'A';
        b_ptr->active = 1;
        b_ptr->priority = priority;
-       INIT_LIST_HEAD(&b_ptr->cong_links);
        INIT_LIST_HEAD(&b_ptr->links);
        spin_lock_init(&b_ptr->lock);
 
@@ -528,7 +442,6 @@ int tipc_block_bearer(const char *name)
        pr_info("Blocking bearer <%s>\n", name);
        spin_lock_bh(&b_ptr->lock);
        b_ptr->blocked = 1;
-       list_splice_init(&b_ptr->cong_links, &b_ptr->links);
        list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) {
                struct tipc_node *n_ptr = l_ptr->owner;
 
@@ -555,7 +468,6 @@ static void bearer_disable(struct tipc_bearer *b_ptr)
        spin_lock_bh(&b_ptr->lock);
        b_ptr->blocked = 1;
        b_ptr->media->disable_bearer(b_ptr);
-       list_splice_init(&b_ptr->cong_links, &b_ptr->links);
        list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) {
                tipc_link_delete(l_ptr);
        }
index dd4c2abf08e7de95085fb6b3e9e87cf22eb4b9ec..39f1192d04bff34dfd2295cbe99b491d603e7cbb 100644 (file)
@@ -120,7 +120,6 @@ struct tipc_media {
  * @identity: array index of this bearer within TIPC bearer array
  * @link_req: ptr to (optional) structure making periodic link setup requests
  * @links: list of non-congested links associated with bearer
- * @cong_links: list of congested links associated with bearer
  * @active: non-zero if bearer structure is represents a bearer
  * @net_plane: network plane ('A' through 'H') currently associated with bearer
  * @nodes: indicates which nodes in cluster can be reached through bearer
@@ -143,7 +142,6 @@ struct tipc_bearer {
        u32 identity;
        struct tipc_link_req *link_req;
        struct list_head links;
-       struct list_head cong_links;
        int active;
        char net_plane;
        struct tipc_node_map nodes;
@@ -185,39 +183,23 @@ struct sk_buff *tipc_media_get_names(void);
 struct sk_buff *tipc_bearer_get_names(void);
 void tipc_bearer_add_dest(struct tipc_bearer *b_ptr, u32 dest);
 void tipc_bearer_remove_dest(struct tipc_bearer *b_ptr, u32 dest);
-void tipc_bearer_schedule(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr);
 struct tipc_bearer *tipc_bearer_find(const char *name);
 struct tipc_bearer *tipc_bearer_find_interface(const char *if_name);
 struct tipc_media *tipc_media_find(const char *name);
-int tipc_bearer_resolve_congestion(struct tipc_bearer *b_ptr,
-                                  struct tipc_link *l_ptr);
-int tipc_bearer_congested(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr);
+int tipc_bearer_blocked(struct tipc_bearer *b_ptr);
 void tipc_bearer_stop(void);
-void tipc_bearer_lock_push(struct tipc_bearer *b_ptr);
-
 
 /**
  * tipc_bearer_send- sends buffer to destination over bearer
  *
- * Returns true (1) if successful, or false (0) if unable to send
- *
  * IMPORTANT:
  * The media send routine must not alter the buffer being passed in
  * as it may be needed for later retransmission!
- *
- * If the media send routine returns a non-zero value (indicating that
- * it was unable to send the buffer), it must:
- *   1) mark the bearer as blocked,
- *   2) call tipc_continue() once the bearer is able to send again.
- * Media types that are unable to meet these two critera must ensure their
- * send routine always returns success -- even if the buffer was not sent --
- * and let TIPC's link code deal with the undelivered message.
  */
-static inline int tipc_bearer_send(struct tipc_bearer *b_ptr,
-                                  struct sk_buff *buf,
+static inline void tipc_bearer_send(struct tipc_bearer *b, struct sk_buff *buf,
                                   struct tipc_media_addr *dest)
 {
-       return !b_ptr->media->send_msg(buf, b_ptr, dest);
+       b->media->send_msg(buf, b, dest);
 }
 
 #endif /* _TIPC_BEARER_H */
index bfe8af88469a95b5012d1cb34e3e9415120a2808..fc05cecd7481652dd57431727dc2355d39627859 100644 (file)
 
 #include <linux/module.h>
 
-#ifndef CONFIG_TIPC_PORTS
-#define CONFIG_TIPC_PORTS 8191
-#endif
-
-
 /* global variables used by multiple sub-systems within TIPC */
 int tipc_random __read_mostly;
 
index 50eaa403eb6e696b6a8e8a9a552f9ca60eb25e2b..1074b9587e81ed57a77bc20042c829be18bfc4a0 100644 (file)
@@ -243,7 +243,7 @@ void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr)
        if ((type == DSC_REQ_MSG) && !link_fully_up && !b_ptr->blocked) {
                rbuf = tipc_disc_init_msg(DSC_RESP_MSG, orig, b_ptr);
                if (rbuf) {
-                       b_ptr->media->send_msg(rbuf, b_ptr, &media_addr);
+                       tipc_bearer_send(b_ptr, rbuf, &media_addr);
                        kfree_skb(rbuf);
                }
        }
index a79c755cb41714bf40c66de615ce6d0cc737cb3b..87bf5aad704ba09a1eaa4f2ffac6366e57eb5050 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * net/tipc/link.c: TIPC link code
  *
- * Copyright (c) 1996-2007, Ericsson AB
+ * Copyright (c) 1996-2007, 2012, Ericsson AB
  * Copyright (c) 2004-2007, 2010-2011, Wind River Systems
  * All rights reserved.
  *
@@ -103,6 +103,8 @@ static void link_reset_statistics(struct tipc_link *l_ptr);
 static void link_print(struct tipc_link *l_ptr, const char *str);
 static void link_start(struct tipc_link *l_ptr);
 static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf);
+static void tipc_link_send_sync(struct tipc_link *l);
+static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf);
 
 /*
  *  Simple link routines
@@ -712,6 +714,8 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
                        link_activate(l_ptr);
                        tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
                        l_ptr->fsm_msg_cnt++;
+                       if (l_ptr->owner->working_links == 1)
+                               tipc_link_send_sync(l_ptr);
                        link_set_timer(l_ptr, cont_intv);
                        break;
                case RESET_MSG:
@@ -745,6 +749,8 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
                        link_activate(l_ptr);
                        tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
                        l_ptr->fsm_msg_cnt++;
+                       if (l_ptr->owner->working_links == 1)
+                               tipc_link_send_sync(l_ptr);
                        link_set_timer(l_ptr, cont_intv);
                        break;
                case RESET_MSG:
@@ -872,17 +878,12 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
                return link_send_long_buf(l_ptr, buf);
 
        /* Packet can be queued or sent. */
-       if (likely(!tipc_bearer_congested(l_ptr->b_ptr, l_ptr) &&
+       if (likely(!tipc_bearer_blocked(l_ptr->b_ptr) &&
                   !link_congested(l_ptr))) {
                link_add_to_outqueue(l_ptr, buf, msg);
 
-               if (likely(tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr))) {
-                       l_ptr->unacked_window = 0;
-               } else {
-                       tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
-                       l_ptr->stats.bearer_congs++;
-                       l_ptr->next_out = buf;
-               }
+               tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
+               l_ptr->unacked_window = 0;
                return dsz;
        }
        /* Congestion: can message be bundled ? */
@@ -891,10 +892,8 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
 
                /* Try adding message to an existing bundle */
                if (l_ptr->next_out &&
-                   link_bundle_buf(l_ptr, l_ptr->last_out, buf)) {
-                       tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
+                   link_bundle_buf(l_ptr, l_ptr->last_out, buf))
                        return dsz;
-               }
 
                /* Try creating a new bundle */
                if (size <= max_packet * 2 / 3) {
@@ -917,7 +916,6 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
        if (!l_ptr->next_out)
                l_ptr->next_out = buf;
        link_add_to_outqueue(l_ptr, buf, msg);
-       tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
        return dsz;
 }
 
@@ -949,7 +947,48 @@ int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)
        return res;
 }
 
-/**
+/*
+ * tipc_link_send_sync - synchronize broadcast link endpoints.
+ *
+ * Give a newly added peer node the sequence number where it should
+ * start receiving and acking broadcast packets.
+ *
+ * Called with node locked
+ */
+static void tipc_link_send_sync(struct tipc_link *l)
+{
+       struct sk_buff *buf;
+       struct tipc_msg *msg;
+
+       buf = tipc_buf_acquire(INT_H_SIZE);
+       if (!buf)
+               return;
+
+       msg = buf_msg(buf);
+       tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, l->addr);
+       msg_set_last_bcast(msg, l->owner->bclink.acked);
+       link_add_chain_to_outqueue(l, buf, 0);
+       tipc_link_push_queue(l);
+}
+
+/*
+ * tipc_link_recv_sync - synchronize broadcast link endpoints.
+ * Receive the sequence number where we should start receiving and
+ * acking broadcast packets from a newly added peer node, and open
+ * up for reception of such packets.
+ *
+ * Called with node locked
+ */
+static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf)
+{
+       struct tipc_msg *msg = buf_msg(buf);
+
+       n->bclink.last_sent = n->bclink.last_in = msg_last_bcast(msg);
+       n->bclink.recv_permitted = true;
+       kfree_skb(buf);
+}
+
+/*
  * tipc_link_send_names - send name table entries to new neighbor
  *
  * Send routine for bulk delivery of name table messages when contact
@@ -1006,16 +1045,11 @@ static int link_send_buf_fast(struct tipc_link *l_ptr, struct sk_buff *buf,
 
        if (likely(!link_congested(l_ptr))) {
                if (likely(msg_size(msg) <= l_ptr->max_pkt)) {
-                       if (likely(list_empty(&l_ptr->b_ptr->cong_links))) {
+                       if (likely(!tipc_bearer_blocked(l_ptr->b_ptr))) {
                                link_add_to_outqueue(l_ptr, buf, msg);
-                               if (likely(tipc_bearer_send(l_ptr->b_ptr, buf,
-                                                           &l_ptr->media_addr))) {
-                                       l_ptr->unacked_window = 0;
-                                       return res;
-                               }
-                               tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
-                               l_ptr->stats.bearer_congs++;
-                               l_ptr->next_out = buf;
+                               tipc_bearer_send(l_ptr->b_ptr, buf,
+                                                &l_ptr->media_addr);
+                               l_ptr->unacked_window = 0;
                                return res;
                        }
                } else
@@ -1106,7 +1140,7 @@ exit:
 
                        /* Exit if link (or bearer) is congested */
                        if (link_congested(l_ptr) ||
-                           !list_empty(&l_ptr->b_ptr->cong_links)) {
+                           tipc_bearer_blocked(l_ptr->b_ptr)) {
                                res = link_schedule_port(l_ptr,
                                                         sender->ref, res);
                                goto exit;
@@ -1329,15 +1363,11 @@ u32 tipc_link_push_packet(struct tipc_link *l_ptr)
        if (r_q_size && buf) {
                msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
                msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
-               if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
-                       l_ptr->retransm_queue_head = mod(++r_q_head);
-                       l_ptr->retransm_queue_size = --r_q_size;
-                       l_ptr->stats.retransmitted++;
-                       return 0;
-               } else {
-                       l_ptr->stats.bearer_congs++;
-                       return PUSH_FAILED;
-               }
+               tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
+               l_ptr->retransm_queue_head = mod(++r_q_head);
+               l_ptr->retransm_queue_size = --r_q_size;
+               l_ptr->stats.retransmitted++;
+               return 0;
        }
 
        /* Send deferred protocol message, if any: */
@@ -1345,15 +1375,11 @@ u32 tipc_link_push_packet(struct tipc_link *l_ptr)
        if (buf) {
                msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
                msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
-               if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
-                       l_ptr->unacked_window = 0;
-                       kfree_skb(buf);
-                       l_ptr->proto_msg_queue = NULL;
-                       return 0;
-               } else {
-                       l_ptr->stats.bearer_congs++;
-                       return PUSH_FAILED;
-               }
+               tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
+               l_ptr->unacked_window = 0;
+               kfree_skb(buf);
+               l_ptr->proto_msg_queue = NULL;
+               return 0;
        }
 
        /* Send one deferred data message, if send window not full: */
@@ -1366,18 +1392,14 @@ u32 tipc_link_push_packet(struct tipc_link *l_ptr)
                if (mod(next - first) < l_ptr->queue_limit[0]) {
                        msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
                        msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
-                       if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
-                               if (msg_user(msg) == MSG_BUNDLER)
-                                       msg_set_type(msg, CLOSED_MSG);
-                               l_ptr->next_out = buf->next;
-                               return 0;
-                       } else {
-                               l_ptr->stats.bearer_congs++;
-                               return PUSH_FAILED;
-                       }
+                       tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
+                       if (msg_user(msg) == MSG_BUNDLER)
+                               msg_set_type(msg, CLOSED_MSG);
+                       l_ptr->next_out = buf->next;
+                       return 0;
                }
        }
-       return PUSH_FINISHED;
+       return 1;
 }
 
 /*
@@ -1388,15 +1410,12 @@ void tipc_link_push_queue(struct tipc_link *l_ptr)
 {
        u32 res;
 
-       if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr))
+       if (tipc_bearer_blocked(l_ptr->b_ptr))
                return;
 
        do {
                res = tipc_link_push_packet(l_ptr);
        } while (!res);
-
-       if (res == PUSH_FAILED)
-               tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
 }
 
 static void link_reset_all(unsigned long addr)
@@ -1454,9 +1473,8 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,
 
                tipc_addr_string_fill(addr_string, n_ptr->addr);
                pr_info("Broadcast link info for %s\n", addr_string);
-               pr_info("Supportable: %d,  Supported: %d,  Acked: %u\n",
-                       n_ptr->bclink.supportable,
-                       n_ptr->bclink.supported,
+               pr_info("Reception permitted: %d,  Acked: %u\n",
+                       n_ptr->bclink.recv_permitted,
                        n_ptr->bclink.acked);
                pr_info("Last in: %u,  Oos state: %u,  Last sent: %u\n",
                        n_ptr->bclink.last_in,
@@ -1481,7 +1499,7 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
 
        msg = buf_msg(buf);
 
-       if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
+       if (tipc_bearer_blocked(l_ptr->b_ptr)) {
                if (l_ptr->retransm_queue_size == 0) {
                        l_ptr->retransm_queue_head = msg_seqno(msg);
                        l_ptr->retransm_queue_size = retransmits;
@@ -1491,7 +1509,7 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
                }
                return;
        } else {
-               /* Detect repeated retransmit failures on uncongested bearer */
+               /* Detect repeated retransmit failures on unblocked bearer */
                if (l_ptr->last_retransmitted == msg_seqno(msg)) {
                        if (++l_ptr->stale_count > 100) {
                                link_retransmit_failure(l_ptr, buf);
@@ -1507,17 +1525,10 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
                msg = buf_msg(buf);
                msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
                msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
-               if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
-                       buf = buf->next;
-                       retransmits--;
-                       l_ptr->stats.retransmitted++;
-               } else {
-                       tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
-                       l_ptr->stats.bearer_congs++;
-                       l_ptr->retransm_queue_head = buf_seqno(buf);
-                       l_ptr->retransm_queue_size = retransmits;
-                       return;
-               }
+               tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
+               buf = buf->next;
+               retransmits--;
+               l_ptr->stats.retransmitted++;
        }
 
        l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0;
@@ -1676,7 +1687,7 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
                ackd = msg_ack(msg);
 
                /* Release acked messages */
-               if (n_ptr->bclink.supported)
+               if (n_ptr->bclink.recv_permitted)
                        tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
 
                crs = l_ptr->first_out;
@@ -1727,9 +1738,14 @@ deliver:
                                        tipc_link_recv_bundle(buf);
                                        continue;
                                case NAME_DISTRIBUTOR:
+                                       n_ptr->bclink.recv_permitted = true;
                                        tipc_node_unlock(n_ptr);
                                        tipc_named_recv(buf);
                                        continue;
+                               case BCAST_PROTOCOL:
+                                       tipc_link_recv_sync(n_ptr, buf);
+                                       tipc_node_unlock(n_ptr);
+                                       continue;
                                case CONN_MANAGER:
                                        tipc_node_unlock(n_ptr);
                                        tipc_port_recv_proto_msg(buf);
@@ -1772,16 +1788,19 @@ deliver:
                        continue;
                }
 
+               /* Link is not in state WORKING_WORKING */
                if (msg_user(msg) == LINK_PROTOCOL) {
                        link_recv_proto_msg(l_ptr, buf);
                        head = link_insert_deferred_queue(l_ptr, head);
                        tipc_node_unlock(n_ptr);
                        continue;
                }
+
+               /* Traffic message. Conditionally activate link */
                link_state_event(l_ptr, TRAFFIC_MSG_EVT);
 
                if (link_working_working(l_ptr)) {
-                       /* Re-insert in front of queue */
+                       /* Re-insert buffer in front of queue */
                        buf->next = head;
                        head = buf;
                        tipc_node_unlock(n_ptr);
@@ -1972,21 +1991,13 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,
 
        skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
 
-       /* Defer message if bearer is already congested */
-       if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
-               l_ptr->proto_msg_queue = buf;
-               return;
-       }
-
-       /* Defer message if attempting to send results in bearer congestion */
-       if (!tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
-               tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
+       /* Defer message if bearer is already blocked */
+       if (tipc_bearer_blocked(l_ptr->b_ptr)) {
                l_ptr->proto_msg_queue = buf;
-               l_ptr->stats.bearer_congs++;
                return;
        }
 
-       /* Discard message if it was sent successfully */
+       tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
        l_ptr->unacked_window = 0;
        kfree_skb(buf);
 }
@@ -2057,7 +2068,6 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
                } else {
                        l_ptr->max_pkt = l_ptr->max_pkt_target;
                }
-               l_ptr->owner->bclink.supportable = (max_pkt_info != 0);
 
                /* Synchronize broadcast link info, if not done previously */
                if (!tipc_node_is_up(l_ptr->owner)) {
@@ -2112,7 +2122,7 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
                }
 
                /* Protocol message before retransmits, reduce loss risk */
-               if (l_ptr->owner->bclink.supported)
+               if (l_ptr->owner->bclink.recv_permitted)
                        tipc_bclink_update_link_state(l_ptr->owner,
                                                      msg_last_bcast(msg));
 
@@ -2937,8 +2947,8 @@ static int tipc_link_stats(const char *name, char *buf, const u32 buf_size)
                             s->sent_nacks, s->sent_acks, s->retransmitted);
 
        ret += tipc_snprintf(buf + ret, buf_size - ret,
-                            "  Congestion bearer:%u link:%u  Send queue"
-                            " max:%u avg:%u\n", s->bearer_congs, s->link_congs,
+                            "  Congestion link:%u  Send queue"
+                            " max:%u avg:%u\n", s->link_congs,
                             s->max_queue_sz, s->queue_sz_counts ?
                             (s->accu_queue_sz / s->queue_sz_counts) : 0);
 
index 6e921121be0616f9f1e1bca2a7625adb527fade3..c048ed1cbd765aa3417c945d2913d0c994cb7d3f 100644 (file)
@@ -40,9 +40,6 @@
 #include "msg.h"
 #include "node.h"
 
-#define PUSH_FAILED   1
-#define PUSH_FINISHED 2
-
 /*
  * Out-of-range value for link sequence numbers
  */
@@ -82,7 +79,6 @@ struct tipc_stats {
        u32 recv_fragmented;
        u32 recv_fragments;
        u32 link_congs;         /* # port sends blocked by congestion */
-       u32 bearer_congs;
        u32 deferred_recv;
        u32 duplicates;
        u32 max_queue_sz;       /* send queue size high water mark */
index 55d3928dfd6702c423c04abea5aa7ec6fc870d2e..e0d08055754ea656ab1afdd5dee59a51f16bb163 100644 (file)
@@ -262,7 +262,7 @@ void tipc_named_node_up(unsigned long nodearg)
        named_distribute(&message_list, node, &publ_zone, max_item_buf);
        read_unlock_bh(&tipc_nametbl_lock);
 
-       tipc_link_send_names(&message_list, (u32)node);
+       tipc_link_send_names(&message_list, node);
 }
 
 /**
index d21db204e25a181527822fa02599e391b4c40fe8..48f39dd3eae8801ff2eb1638b7ac4551c4035237 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * net/tipc/node.c: TIPC node management routines
  *
- * Copyright (c) 2000-2006, Ericsson AB
+ * Copyright (c) 2000-2006, 2012 Ericsson AB
  * Copyright (c) 2005-2006, 2010-2011, Wind River Systems
  * All rights reserved.
  *
@@ -263,12 +263,9 @@ void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
 static void node_established_contact(struct tipc_node *n_ptr)
 {
        tipc_k_signal((Handler)tipc_named_node_up, n_ptr->addr);
-
-       if (n_ptr->bclink.supportable) {
-               n_ptr->bclink.acked = tipc_bclink_get_last_sent();
-               tipc_bclink_add_node(n_ptr->addr);
-               n_ptr->bclink.supported = 1;
-       }
+       n_ptr->bclink.oos_state = 0;
+       n_ptr->bclink.acked = tipc_bclink_get_last_sent();
+       tipc_bclink_add_node(n_ptr->addr);
 }
 
 static void node_name_purge_complete(unsigned long node_addr)
@@ -294,7 +291,7 @@ static void node_lost_contact(struct tipc_node *n_ptr)
                tipc_addr_string_fill(addr_string, n_ptr->addr));
 
        /* Flush broadcast link info associated with lost node */
-       if (n_ptr->bclink.supported) {
+       if (n_ptr->bclink.recv_permitted) {
                while (n_ptr->bclink.deferred_head) {
                        struct sk_buff *buf = n_ptr->bclink.deferred_head;
                        n_ptr->bclink.deferred_head = buf->next;
@@ -310,7 +307,7 @@ static void node_lost_contact(struct tipc_node *n_ptr)
                tipc_bclink_remove_node(n_ptr->addr);
                tipc_bclink_acknowledge(n_ptr, INVALID_LINK_SEQ);
 
-               n_ptr->bclink.supported = 0;
+               n_ptr->bclink.recv_permitted = false;
        }
 
        /* Abort link changeover */
index cfcaf4d6e4801c5047f665a8c4b272948e260bf6..3c189b35b102a378914b59236f555438f7c198d4 100644 (file)
@@ -67,8 +67,6 @@
  * @permit_changeover: non-zero if node has redundant links to this system
  * @signature: node instance identifier
  * @bclink: broadcast-related info
- *    @supportable: non-zero if node supports TIPC b'cast link capability
- *    @supported: non-zero if node supports TIPC b'cast capability
  *    @acked: sequence # of last outbound b'cast message acknowledged by node
  *    @last_in: sequence # of last in-sequence b'cast message received from node
  *    @last_sent: sequence # of last b'cast message sent by node
@@ -77,6 +75,7 @@
  *    @deferred_head: oldest OOS b'cast message received from node
  *    @deferred_tail: newest OOS b'cast message received from node
  *    @defragm: list of partially reassembled b'cast message fragments from node
+ *    @recv_permitted: true if node is allowed to receive b'cast messages
  */
 struct tipc_node {
        u32 addr;
@@ -92,8 +91,6 @@ struct tipc_node {
        int permit_changeover;
        u32 signature;
        struct {
-               u8 supportable;
-               u8 supported;
                u32 acked;
                u32 last_in;
                u32 last_sent;
@@ -102,6 +99,7 @@ struct tipc_node {
                struct sk_buff *deferred_head;
                struct sk_buff *deferred_tail;
                struct sk_buff *defragm;
+               bool recv_permitted;
        } bclink;
 };
 
index fd5f042dbff4d8a8a47c2a532e0d0ac5d0a70cd2..1a720c86e80a2e3a698b9d95ea7c7539a7a2f02a 100644 (file)
@@ -62,6 +62,8 @@ struct tipc_sock {
 static int backlog_rcv(struct sock *sk, struct sk_buff *skb);
 static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf);
 static void wakeupdispatch(struct tipc_port *tport);
+static void tipc_data_ready(struct sock *sk, int len);
+static void tipc_write_space(struct sock *sk);
 
 static const struct proto_ops packet_ops;
 static const struct proto_ops stream_ops;
@@ -221,6 +223,8 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol,
        sock_init_data(sock, sk);
        sk->sk_backlog_rcv = backlog_rcv;
        sk->sk_rcvbuf = TIPC_FLOW_CONTROL_WIN * 2 * TIPC_MAX_USER_MSG_SIZE * 2;
+       sk->sk_data_ready = tipc_data_ready;
+       sk->sk_write_space = tipc_write_space;
        tipc_sk(sk)->p = tp_ptr;
        tipc_sk(sk)->conn_timeout = CONN_TIMEOUT_DEFAULT;
 
@@ -408,7 +412,7 @@ static int get_name(struct socket *sock, struct sockaddr *uaddr,
  * socket state                flags set
  * ------------                ---------
  * unconnected         no read flags
- *                     no write flags
+ *                     POLLOUT if port is not congested
  *
  * connecting          POLLIN/POLLRDNORM if ACK/NACK in rx queue
  *                     no write flags
@@ -435,9 +439,13 @@ static unsigned int poll(struct file *file, struct socket *sock,
        struct sock *sk = sock->sk;
        u32 mask = 0;
 
-       poll_wait(file, sk_sleep(sk), wait);
+       sock_poll_wait(file, sk_sleep(sk), wait);
 
        switch ((int)sock->state) {
+       case SS_UNCONNECTED:
+               if (!tipc_sk_port(sk)->congested)
+                       mask |= POLLOUT;
+               break;
        case SS_READY:
        case SS_CONNECTED:
                if (!tipc_sk_port(sk)->congested)
@@ -1125,6 +1133,39 @@ exit:
        return sz_copied ? sz_copied : res;
 }
 
+/**
+ * tipc_write_space - wake up thread if port congestion is released
+ * @sk: socket
+ */
+static void tipc_write_space(struct sock *sk)
+{
+       struct socket_wq *wq;
+
+       rcu_read_lock();
+       wq = rcu_dereference(sk->sk_wq);
+       if (wq_has_sleeper(wq))
+               wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
+                                               POLLWRNORM | POLLWRBAND);
+       rcu_read_unlock();
+}
+
+/**
+ * tipc_data_ready - wake up threads to indicate messages have been received
+ * @sk: socket
+ * @len: the length of messages
+ */
+static void tipc_data_ready(struct sock *sk, int len)
+{
+       struct socket_wq *wq;
+
+       rcu_read_lock();
+       wq = rcu_dereference(sk->sk_wq);
+       if (wq_has_sleeper(wq))
+               wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
+                                               POLLRDNORM | POLLRDBAND);
+       rcu_read_unlock();
+}
+
 /**
  * rx_queue_full - determine if receive queue can accept another message
  * @msg: message to be added to queue
@@ -1222,8 +1263,7 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
                tipc_disconnect_port(tipc_sk_port(sk));
        }
 
-       if (waitqueue_active(sk_sleep(sk)))
-               wake_up_interruptible(sk_sleep(sk));
+       sk->sk_data_ready(sk, 0);
        return TIPC_OK;
 }
 
@@ -1290,8 +1330,7 @@ static void wakeupdispatch(struct tipc_port *tport)
 {
        struct sock *sk = (struct sock *)tport->usr_handle;
 
-       if (waitqueue_active(sk_sleep(sk)))
-               wake_up_interruptible(sk_sleep(sk));
+       sk->sk_write_space(sk);
 }
 
 /**
@@ -1556,10 +1595,11 @@ restart:
 
        case SS_DISCONNECTING:
 
-               /* Discard any unreceived messages; wake up sleeping tasks */
+               /* Discard any unreceived messages */
                discard_rx_queue(sk);
-               if (waitqueue_active(sk_sleep(sk)))
-                       wake_up_interruptible(sk_sleep(sk));
+
+               /* Wake up anyone sleeping in poll */
+               sk->sk_state_change(sk);
                res = 0;
                break;